postgresql Spark 数据帧 UPSERT 到 Postgres 表
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/34643200/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark Dataframes UPSERT to Postgres Table
提问by void
I am using Apache Spark DataFrames to join two data sources and get the result as another DataFrame. I want to write the result to another Postgres table. I see this option :
我正在使用 Apache Spark DataFrames 连接两个数据源,并将结果作为另一个 DataFrame。我想将结果写入另一个 Postgres 表。我看到这个选项:
myDataFrame.write.jdbc(url, table, connectionProperties)
But, what I want to do is UPSERT the dataframe into table based on the Primary Key of the Table. How is this to be done? I am using Spark 1.6.0.
但是,我想要做的是根据表的主键将数据帧 UPSERT 放入表中。这是怎么做的?我正在使用 Spark 1.6.0。
回答by zero323
It is not supported. DataFrameWriter
can either append to or overwrite existing table. If your application requires more complex logic you'll have to deal with this manually.
不支持。DataFrameWriter
可以附加到或覆盖现有表。如果您的应用程序需要更复杂的逻辑,则必须手动处理。
One option is to use an action (foreach
, foreachPartition
) with standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database.
一种选择是将操作 ( foreach
, foreachPartition
) 与标准 JDBC 连接一起使用。另一种是写入临时文件并直接在数据库中处理其余部分。
See also SPARK-19335(Spark should support doing an efficient DataFrame Upsert via JDBC) and related proposals.
另请参阅SPARK-19335(Spark 应支持通过 JDBC 执行高效的 DataFrame Upsert)和相关建议。
回答by jstuartmill
KrisPhas the right of it. The best way to do an upsert is not through a prepared statement. It's important to note that this method will insert one at a time with as many partitions as the number of workers you have. If you want to do this in batch you can as well
KrisP拥有它的权利。执行 upsert 的最佳方法不是通过准备好的语句。需要注意的是,此方法将一次插入一个,分区数量与您拥有的工作人员数量相同。如果您想批量执行此操作,您也可以
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach { session =>
session.foreach { x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
}
st.executeBatch()
}
dbc.close()
}
This will execute batches for each worker and close the DB connection. It gives you control over how many workers, how many batches and allows you to work within those confines.
这将为每个工作人员执行批处理并关闭数据库连接。它让您可以控制多少工人、多少批次,并允许您在这些范围内工作。
回答by KrisP
If you are going to do it manually and via option 1 mentioned by zero323, you should take a look at Spark source code for the insert statement here
如果您要手动执行并通过 zero323 提到的选项 1,您应该在此处查看插入语句的Spark源代码
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
val columns = rddSchema.fields.map(_.name).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
}
The PreparedStatement
is part of java.sql
and it has methods like execute()
and executeUpdate()
. You still have to modify the sql
accordingly, of course.
该PreparedStatement
是的一部分java.sql
,它有方法,如execute()
和executeUpdate()
。sql
当然,您仍然需要相应地修改。
回答by Soumitra
To insert JDBC you can use
要插入 JDBC,您可以使用
dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)
dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)
Also,Dataframe.write gives you a DataFrameWriter and it has some methods to insert the dataframe.
此外,Dataframe.write 为您提供了一个 DataFrameWriter 并且它有一些方法来插入数据帧。
def insertInto(tableName: String): Unit
def insertInto(tableName: String): Unit
Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table.
将 DataFrame 的内容插入到指定的表中。它要求 DataFrame 的 schema 与 table 的 schema 相同。
Because it inserts data to an existing table, format or options will be ignored.
因为它将数据插入到现有表中,所以格式或选项将被忽略。
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
Nothing yet to update individual records out of the box from spark though
尽管如此,还没有什么东西可以从 spark 中更新开箱即用的个人记录