Python 如何使用JDBC源在(Py)Spark中读写数据?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/30983982/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to use JDBC source to write and read data in (Py)Spark?
提问by zero323
The goal of this question is to document:
这个问题的目标是记录:
steps required to read and write data using JDBC connections in PySpark
possible issues with JDBC sources and know solutions
在 PySpark 中使用 JDBC 连接读取和写入数据所需的步骤
JDBC 源的可能问题并知道解决方案
With small changes these methods should work with other supported languages including Scala and R.
只需稍加改动,这些方法就应该适用于其他支持的语言,包括 Scala 和 R。
采纳答案by zero323
Writing data
写入数据
Include applicable JDBC driver when you submit the application or start shell. You can use for example
--packages
:bin/pyspark --packages group:name:version
or combining
driver-class-path
andjars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
These properties can be also set using
PYSPARK_SUBMIT_ARGS
environment variable before JVM instance has been started or usingconf/spark-defaults.conf
to setspark.jars.packages
orspark.jars
/spark.driver.extraClassPath
.Choose desired mode. Spark JDBC writer supports following modes:
append
: Append contents of this :class:DataFrame
to existing data.overwrite
: Overwrite existing data.ignore
: Silently ignore this operation if data already exists.error
(default case): Throw an exception if data already exists.
Upserts or other fine-grained modifications are not supported
mode = ...
Prepare JDBC URI, for example:
# You can encode credentials in URI or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar"
(Optional) Create a dictionary of JDBC arguments.
properties = { "user": "foo", "password": "bar" }
properties
/options
can be also used to set supported JDBC connection properties.Use
DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
to save the data (see
pyspark.sql.DataFrameWriter
for details).
提交应用程序或启动 shell 时包括适用的 JDBC 驱动程序。您可以使用例如
--packages
:bin/pyspark --packages group:name:version
或结合
driver-class-path
和jars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
这些属性也可以
PYSPARK_SUBMIT_ARGS
在 JVM 实例启动之前使用环境变量conf/spark-defaults.conf
设置,spark.jars.packages
或者使用to set或spark.jars
/spark.driver.extraClassPath
。选择所需的模式。Spark JDBC writer 支持以下模式:
append
: 将此 :class: 的内容附加DataFrame
到现有数据中。overwrite
:覆盖现有数据。ignore
: 如果数据已经存在,则静默忽略此操作。error
(默认情况):如果数据已经存在,则抛出异常。
不支持Upserts 或其他细粒度的修改
mode = ...
准备 JDBC URI,例如:
# You can encode credentials in URI or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar"
(可选)创建 JDBC 参数字典。
properties = { "user": "foo", "password": "bar" }
properties
/options
还可用于设置受支持的 JDBC 连接属性。用
DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
以保存数据(
pyspark.sql.DataFrameWriter
有关详细信息,请参阅)。
Known issues:
已知问题:
Suitable driver cannot be found when driver has been included using
--packages
(java.sql.SQLException: No suitable driver found for jdbc: ...
)Assuming there is no driver version mismatch to solve this you can add
driver
class to theproperties
. For example:properties = { ... "driver": "org.postgresql.Driver" }
using
df.write.format("jdbc").options(...).save()
may result in:java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.
Solution unknown.
in Pyspark 1.3 you can try calling Java method directly:
df._jdf.insertIntoJDBC(url, "baz", True)
使用
--packages
(java.sql.SQLException: No suitable driver found for jdbc: ...
)包含驱动程序时,找不到合适的驱动程序假设没有驱动程序版本不匹配来解决这个问题,您可以将
driver
类添加到properties
. 例如:properties = { ... "driver": "org.postgresql.Driver" }
使用
df.write.format("jdbc").options(...).save()
可能会导致:java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource 不允许创建表作为选择。
解决方案未知。
在 Pyspark 1.3 中,您可以尝试直接调用 Java 方法:
df._jdf.insertIntoJDBC(url, "baz", True)
Reading data
读取数据
- Follow steps 1-4 from Writing data
Use
sqlContext.read.jdbc
:sqlContext.read.jdbc(url=url, table="baz", properties=properties)
or
sqlContext.read.format("jdbc")
:(sqlContext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load())
- 按照写入数据中的步骤 1-4
使用
sqlContext.read.jdbc
:sqlContext.read.jdbc(url=url, table="baz", properties=properties)
或
sqlContext.read.format("jdbc")
:(sqlContext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load())
Known issues and gotchas:
已知问题和陷阱:
- Suitable driver cannot be found - see: Writing data
Spark SQL supports predicate pushdown with JDBC sources although not all predicates can pushed down. It also doesn't delegate limits nor aggregations. Possible workaround is to replace
dbtable
/table
argument with a valid subquery. See for example:By default JDBC data sources loads data sequentially using a single executor thread. To ensure distributed data loading you can:
- Provide partitioning
column
(must beIntegeType
),lowerBound
,upperBound
,numPartitions
. - Provide a list of mutually exclusive predicates
predicates
, one for each desired partition.
See:
- Provide partitioning
In a distributed mode (with partitioning column or predicates) each executor operates in its own transaction. If the source database is modified at the same time there is no guarantee that the final view will be consistent.
- 找不到合适的驱动程序 - 请参阅:写入数据
Spark SQL 支持使用 JDBC 源进行谓词下推,尽管并非所有谓词都可以下推。它也不委托限制或聚合。可能的解决方法是用有效的子查询替换
dbtable
/table
参数。见例如:默认情况下,JDBC 数据源使用单个执行程序线程按顺序加载数据。为确保分布式数据加载,您可以:
- 提供分区
column
(必须是IntegeType
)、lowerBound
、upperBound
、numPartitions
。 - 提供一个互斥谓词列表,
predicates
每个所需的分区一个。
看:
- 提供分区
在分布式模式下(带有分区列或谓词),每个执行器都在自己的事务中运行。如果同时修改源数据库,则无法保证最终视图保持一致。
Where to find suitable drivers:
在哪里可以找到合适的驱动程序:
Maven Repository(to obtain required coordinates for
--packages
select desired version and copy data from a Gradle tab in a formcompile-group:name:version
substituting respective fields) or Maven Central Repository:
Maven 存储库(获取
--packages
选择所需版本所需的坐标,并从 Gradle 选项卡中以compile-group:name:version
替换相应字段的形式复制数据)或Maven 中央存储库:
Other options
其他选项
Depending on the database specialized source might exist, and be preferred in some cases:
根据可能存在的数据库专用源,并且在某些情况下是首选:
- Greenplum - Pivotal Greenplum-Spark Connector
- Apache Phoenix - Apache Spark Plugin
- Microsoft SQL Server - Spark connector for Azure SQL Databases and SQL Server
- Amazon Redshift - Databricks Redshift connector(current versions available only in a proprietary Databricks Runtime. Discontinued open source version, available on GitHub).
- Greenplum -关键的 Greenplum-Spark 连接器
- Apache Phoenix - Apache Spark 插件
- Microsoft SQL Server -用于 Azure SQL 数据库和 SQL Server 的 Spark 连接器
- Amazon Redshift - Databricks Redshift 连接器(当前版本仅在专有 Databricks 运行时中可用。已停产的开源版本,可在 GitHub 上获取)。
回答by y durga prasad
Download mysql-connector-javadriver and keep in spark jar folder,observe the bellow python code here writing data into "acotr1",we have to create acotr1 table structure in mysql database
下载mysql-connector-java驱动并保存在spark jar文件夹中,观察下面的python代码将数据写入“acotr1”,我们必须在mysql数据库中创建acotr1表结构
spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()
mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"
df.write.jdbc(mysql_url,table="actor1",mode="append")
回答by anand ml
Refer this link to download the jdbc for postgres and follow the steps to download jar file
参考此链接下载 postgres 的 jdbc 并按照步骤下载 jar 文件
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.htmljar file will be download in the path like this. "/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.htmljar 文件将在这样的路径中下载。“/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar”
If your spark version is 2
如果你的 spark 版本是 2
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("sparkanalysis")
.config("spark.driver.extraClassPath",
"/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
.getOrCreate()
//for localhost database//
pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()
print(pgDF)
pgDF.filter(pgDF["user_id"]>5).show()
save the file as python and run "python respectivefilename.py"
将文件另存为 python 并运行“python各自的文件名.py”