Python 如何使用JDBC源在(Py)Spark中读写数据?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/30983982/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 09:16:04  来源:igfitidea点击:

How to use JDBC source to write and read data in (Py)Spark?

pythonscalaapache-sparkapache-spark-sqlpyspark

提问by zero323

The goal of this question is to document:

这个问题的目标是记录:

  • steps required to read and write data using JDBC connections in PySpark

  • possible issues with JDBC sources and know solutions

  • 在 PySpark 中使用 JDBC 连接读取和写入数据所需的步骤

  • JDBC 源的可能问题并知道解决方案

With small changes these methods should work with other supported languages including Scala and R.

只需稍加改动,这些方法就应该适用于其他支持的语言,包括 Scala 和 R。

采纳答案by zero323

Writing data

写入数据

  1. Include applicable JDBC driver when you submit the application or start shell. You can use for example --packages:

    bin/pyspark --packages group:name:version  
    

    or combining driver-class-pathand jars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
    

    These properties can be also set using PYSPARK_SUBMIT_ARGSenvironment variable before JVM instance has been started or using conf/spark-defaults.confto set spark.jars.packagesor spark.jars/ spark.driver.extraClassPath.

  2. Choose desired mode. Spark JDBC writer supports following modes:

    • append: Append contents of this :class:DataFrameto existing data.
    • overwrite: Overwrite existing data.
    • ignore: Silently ignore this operation if data already exists.
    • error(default case): Throw an exception if data already exists.

    Upserts or other fine-grained modifications are not supported

    mode = ...
    
  3. Prepare JDBC URI, for example:

    # You can encode credentials in URI or pass
    # separately using properties argument
    # of jdbc method or options
    
    url = "jdbc:postgresql://localhost/foobar"
    
  4. (Optional) Create a dictionary of JDBC arguments.

    properties = {
        "user": "foo",
        "password": "bar"
    }
    

    properties/ optionscan be also used to set supported JDBC connection properties.

  5. Use DataFrame.write.jdbc

    df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

    to save the data (see pyspark.sql.DataFrameWriterfor details).

  1. 提交应用程序或启动 shell 时包括适用的 JDBC 驱动程序。您可以使用例如--packages

    bin/pyspark --packages group:name:version  
    

    或结合driver-class-pathjars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
    

    这些属性也可以PYSPARK_SUBMIT_ARGS在 JVM 实例启动之前使用环境变量conf/spark-defaults.conf设置,spark.jars.packages或者使用to set或spark.jars/ spark.driver.extraClassPath

  2. 选择所需的模式。Spark JDBC writer 支持以下模式:

    • append: 将此 :class: 的内容附加DataFrame到现有数据中。
    • overwrite:覆盖现有数据。
    • ignore: 如果数据已经存在,则静默忽略此操作。
    • error(默认情况):如果数据已经存在,则抛出异常。

    不支持Upserts 或其他细粒度的修改

    mode = ...
    
  3. 准备 JDBC URI,例如:

    # You can encode credentials in URI or pass
    # separately using properties argument
    # of jdbc method or options
    
    url = "jdbc:postgresql://localhost/foobar"
    
  4. (可选)创建 JDBC 参数字典。

    properties = {
        "user": "foo",
        "password": "bar"
    }
    

    properties/options还可用于设置受支持的 JDBC 连接属性

  5. DataFrame.write.jdbc

    df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

    以保存数据(pyspark.sql.DataFrameWriter有关详细信息,请参阅)。

Known issues:

已知问题

  • Suitable driver cannot be found when driver has been included using --packages(java.sql.SQLException: No suitable driver found for jdbc: ...)

    Assuming there is no driver version mismatch to solve this you can add driverclass to the properties. For example:

    properties = {
        ...
        "driver": "org.postgresql.Driver"
    }
    
  • using df.write.format("jdbc").options(...).save()may result in:

    java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.

    Solution unknown.

  • in Pyspark 1.3 you can try calling Java method directly:

    df._jdf.insertIntoJDBC(url, "baz", True)
    
  • 使用--packages( java.sql.SQLException: No suitable driver found for jdbc: ...)包含驱动程序时,找不到合适的驱动程序

    假设没有驱动程序版本不匹配来解决这个问题,您可以将driver类添加到properties. 例如:

    properties = {
        ...
        "driver": "org.postgresql.Driver"
    }
    
  • 使用df.write.format("jdbc").options(...).save()可能会导致:

    java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource 不允许创建表作为选择。

    解决方案未知。

  • 在 Pyspark 1.3 中,您可以尝试直接调用 Java 方法:

    df._jdf.insertIntoJDBC(url, "baz", True)
    

Reading data

读取数据

  1. Follow steps 1-4 from Writing data
  2. Use sqlContext.read.jdbc:

    sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

    or sqlContext.read.format("jdbc"):

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())
    
  1. 按照写入数据中的步骤 1-4
  2. 使用sqlContext.read.jdbc

    sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

    sqlContext.read.format("jdbc")

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())
    

Known issues and gotchas:

已知问题和陷阱

Where to find suitable drivers:

在哪里可以找到合适的驱动程序:

Other options

其他选项

Depending on the database specialized source might exist, and be preferred in some cases:

根据可能存在的数据库专用源,并且在某些情况下是首选:

回答by y durga prasad

Download mysql-connector-javadriver and keep in spark jar folder,observe the bellow python code here writing data into "acotr1",we have to create acotr1 table structure in mysql database

下载mysql-connector-java驱动并保存在spark jar文件夹中,观察下面的python代码将数据写入“acotr1”,我们必须在mysql数据库中创建acotr1表结构

    spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()

    sc = spark.sparkContext

    from pyspark.sql import SQLContext

    sqlContext = SQLContext(sc)

    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()

    mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"

    df.write.jdbc(mysql_url,table="actor1",mode="append")

回答by anand ml

Refer this link to download the jdbc for postgres and follow the steps to download jar file

参考此链接下载 postgres 的 jdbc 并按照步骤下载 jar 文件

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.htmljar file will be download in the path like this. "/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.htmljar 文件将在这样的路径中下载。“/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar”

If your spark version is 2

如果你的 spark 版本是 2

from pyspark.sql import SparkSession

spark = SparkSession.builder
        .appName("sparkanalysis")
        .config("spark.driver.extraClassPath",
         "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
        .getOrCreate()

//for localhost database//

pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()


print(pgDF)

pgDF.filter(pgDF["user_id"]>5).show()

save the file as python and run "python respectivefilename.py"

将文件另存为 python 并运行“python各自的文件名.py”