如何使用 scala 将 postgreSQL 数据库连接到 Apache Spark?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/24916852/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How can I connect to a postgreSQL database into Apache Spark using scala?
提问by febinsathar
I want to know how can I do following things in scala?
我想知道如何在 Scala 中执行以下操作?
- Connect to a postgreSQL database using Spark scala.
- Write SQL queries like SELECT , UPDATE etc. to modify a table in that database.
- 使用 Spark scala 连接到 postgreSQL 数据库。
- 编写 SQL 查询(如 SELECT 、 UPDATE 等)以修改该数据库中的表。
I know to do it using scala but how to import the connector jar of psql scala into sbt while packaging it?
我知道使用 scala 来做,但是如何在打包时将 psql scala 的连接器 jar 导入到 sbt 中?
回答by Daniel Darabos
Our goal is to run parallel SQL queries from the Spark workers.
我们的目标是从 Spark 工作线程运行并行 SQL 查询。
Build setup
构建设置
Add the connector and JDBC to the libraryDependenciesin build.sbt. I've only tried this with MySQL, so I'll use that in my examples, but Postgres should be much the same.
添加连接器和JDBC的libraryDependencies在build.sbt。我只在 MySQL 上尝试过这个,所以我会在我的例子中使用它,但 Postgres 应该是一样的。
libraryDependencies ++= Seq(
jdbc,
"mysql" % "mysql-connector-java" % "5.1.29",
"org.apache.spark" %% "spark-core" % "1.0.1",
// etc
)
Code
代码
When you create the SparkContextyou tell it which jars to copy to the executors. Include the connector jar. A good-looking way to do this:
创建时,SparkContext您告诉它要将哪些 jar 复制到执行程序。包括连接器罐。一个好看的方法来做到这一点:
val classes = Seq(
getClass, // To get the jar with our own code.
classOf[mysql.jdbc.Driver] // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)
Now Spark is ready to connect to the database. Each executor will run part of the query, so that the results are ready for distributed computation.
现在 Spark 已准备好连接到数据库。每个执行器将运行部分查询,以便结果为分布式计算做好准备。
There are two options for this. The older approach is to use org.apache.spark.rdd.JdbcRDD:
对此有两种选择。较旧的方法是使用org.apache.spark.rdd.JdbcRDD:
val rdd = new org.apache.spark.rdd.JdbcRDD(
sc,
() => {
sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
},
"SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
0, 1000, 10,
row => row.getString("BOOK_TITLE")
)
Check out the documentation for the parameters. Briefly:
查看参数的文档。简要地:
- You have the
SparkContext. - Then a function that creates the connection. This will be called on each worker to connect to the database.
- Then the SQL query. This has to be similar to the example, and contain placeholders for the starting and ending key.
- Then you specify the range of keys (0 to 1000 in my example) and the number of partitions. The range will be divided among the partitions. So one executor thread will end up executing
SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100in the example. - And at last we have a function that converts the
ResultSetinto something. In the example we convert it into aString, so you end up with anRDD[String].
- 你有
SparkContext. - 然后是创建连接的函数。这将在每个工作人员上调用以连接到数据库。
- 然后是SQL查询。这必须与示例类似,并包含开始和结束键的占位符。
- 然后指定键的范围(在我的示例中为 0 到 1000)和分区数。范围将在分区之间划分。因此,一个执行器线程最终将
SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100在示例中执行。 - 最后,我们有了一个将 转换
ResultSet为某些东西的函数。在示例中,我们将其转换为 aString,因此您最终得到了RDD[String].
Since Apache Spark version 1.3.0 another method is available through the DataFrame API. Instead of the JdbcRDDyou would create an org.apache.spark.sql.DataFrame:
从 Apache Spark 版本 1.3.0 开始,可以通过 DataFrame API 使用另一种方法。而不是JdbcRDD你会创建一个org.apache.spark.sql.DataFrame:
val df = sqlContext.load("jdbc", Map(
"url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
"dbtable" -> "BOOKS"))
See https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databasesfor the full list of options (the key range and number of partitions can be set just like with JdbcRDD).
有关选项的完整列表,请参阅https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases(可以设置键范围和分区数)喜欢JdbcRDD)。
Updates
更新
JdbcRDDdoes not support updates. But you can simply do them in a foreachPartition.
JdbcRDD不支持更新。但是您可以简单地在foreachPartition.
rdd.foreachPartition { it =>
val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
for (bookTitle <- it) {
del.setString(1, bookTitle)
del.executeUpdate
}
}
(This creates one connection per partition. If that is a concern, use a connection pool!)
(这会为每个分区创建一个连接。如果这是一个问题,请使用连接池!)
DataFrames support updates through the createJDBCTableand insertIntoJDBCmethods.
DataFrames 通过createJDBCTable和insertIntoJDBC方法支持更新。

