scala Spark SQL:如何将新行附加到数据帧表(来自另一个表)

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36926856/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:14:10  来源:igfitidea点击:

Spark SQL: How to append new row to dataframe table (from another table)

scalaapache-sparkapache-spark-sql

提问by stackoverflowuser2010

I am using Spark SQL with dataframes. I have an input dataframe, and I would like to append (or insert) its rows to a larger dataframe that has more columns. How would I do that?

我正在将 Spark SQL 与数据帧一起使用。我有一个输入数据框,我想将它的行附加(或插入)到一个更大的具有更多列的数据框。我该怎么做?

If this were SQL, I would use INSERT INTO OUTPUT SELECT ... FROM INPUT, but I don't know how to do that with Spark SQL.

如果这是 SQL,我会使用INSERT INTO OUTPUT SELECT ... FROM INPUT,但我不知道如何使用 Spark SQL 做到这一点。

For concreteness:

对于具体性:

var input = sqlContext.createDataFrame(Seq(
        (10L, "Joe Doe", 34),
        (11L, "Jane Doe", 31),
        (12L, "Alice Jones", 25)
        )).toDF("id", "name", "age")

var output = sqlContext.createDataFrame(Seq(
        (0L, "Hyman Smith", 41, "yes", 1459204800L),
        (1L, "Jane Jones", 22, "no", 1459294200L),
        (2L, "Alice Smith", 31, "", 1459595700L)
        )).toDF("id", "name", "age", "init", "ts")


scala> input.show()
+---+-----------+---+
| id|       name|age|
+---+-----------+---+
| 10|    Joe Doe| 34|
| 11|   Jane Doe| 31|
| 12|Alice Jones| 25|
+---+-----------+---+

scala> input.printSchema()
root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)


scala> output.show()
+---+-----------+---+----+----------+
| id|       name|age|init|        ts|
+---+-----------+---+----+----------+
|  0| Hyman Smith| 41| yes|1459204800|
|  1| Jane Jones| 22|  no|1459294200|
|  2|Alice Smith| 31|    |1459595700|
+---+-----------+---+----+----------+

scala> output.printSchema()
root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- init: string (nullable = true)
 |-- ts: long (nullable = false)

I would like to append all the rows of inputto the end of output. At the same time, I would like to set the outputcolumn of initto be an empty string ''and the tscolumn to be the current timestamp, e.g. 1461883875L.

我想将 的所有行附加inputoutput. 同时,我想将output列设置为init空字符串''ts列设置为当前时间戳,例如1461883875L。

Any help would be appreciated.

任何帮助,将不胜感激。

回答by zero323

Spark DataFramesare immutable so it is not possible to append / insert rows. Instead you can just add missing columns and use UNION ALL:

SparkDataFrames是不可变的,因此无法追加/插入行。相反,您可以添加缺少的列并使用UNION ALL

output.unionAll(input.select($"*", lit(""), current_timestamp.cast("long")))

回答by Fabian

I had a similar problem matching to your SQL-Question:

我有一个与您的 SQL 问题匹配的类似问题:

I wanted to append a dataframe to an existing hive table, which is also larger (more columns). To keep your example: outputis my existing table and inputcould be the dataframe. My solution uses simply SQL and for the sake of completeness I want to provide it:

我想将数据框附加到现有的 hive 表中,该表也更大(更多列)。保持您的示例:output是我现有的表,input可能是数据框。我的解决方案只使用 SQL,为了完整起见,我想提供它:

import org.apache.spark.sql.SaveMode

var input = spark.createDataFrame(Seq(
        (10L, "Joe Doe", 34),
        (11L, "Jane Doe", 31),
        (12L, "Alice Jones", 25)
        )).toDF("id", "name", "age")

//--> just for a running example: In my case the table already exists
var output = spark.createDataFrame(Seq(
        (0L, "Hyman Smith", 41, "yes", 1459204800L),
        (1L, "Jane Jones", 22, "no", 1459294200L),
        (2L, "Alice Smith", 31, "", 1459595700L)
        )).toDF("id", "name", "age", "init", "ts")

output.write.mode(SaveMode.Overwrite).saveAsTable("appendTest");
//<--

input.createOrReplaceTempView("inputTable");

spark.sql("INSERT INTO TABLE appendTest SELECT id, name, age, null, null FROM inputTable");
val df = spark.sql("SELECT * FROM appendTest")
df.show()

which outputs:

输出:

+---+-----------+---+----+----------+
| id|       name|age|init|        ts|
+---+-----------+---+----+----------+
|  0| Hyman Smith| 41| yes|1459204800|
|  1| Jane Jones| 22|  no|1459294200|
|  2|Alice Smith| 31|    |1459595700|
| 12|Alice Jones| 25|null|      null|
| 11|   Jane Doe| 31|null|      null|
| 10|    Joe Doe| 34|null|      null|
+---+-----------+---+----+----------+

If you may have the problem, that you don't know how much fields are missing, you could use a difflike

如果您可能遇到问题,即您不知道缺少多少字段,您可以使用diff类似

val missingFields = output.schema.toSet.diff(input.schema.toSet)

and then (in bad pseudo code)

然后(在错误的伪代码中)

val sqlQuery = "INSERT INTO TABLE appendTest SELECT " + commaSeparatedColumnNames + commaSeparatedNullsForEachMissingField + " FROM inputTable"

Hope to help people with future problems like that!

希望能帮助有类似问题的人!

P.S.: In your special case (current timestamp + empty field for init) you could even use

PS:在你的特殊情况下(当前时间戳 + init 的空字段)你甚至可以使用

spark.sql("INSERT INTO TABLE appendTest SELECT id, name, age, '' as init, current_timestamp as ts FROM inputTable");

which results in

这导致

+---+-----------+---+----+----------+
| id|       name|age|init|        ts|
+---+-----------+---+----+----------+
|  0| Hyman Smith| 41| yes|1459204800|
|  1| Jane Jones| 22|  no|1459294200|
|  2|Alice Smith| 31|    |1459595700|
| 12|Alice Jones| 25|    |1521128513|
| 11|   Jane Doe| 31|    |1521128513|
| 10|    Joe Doe| 34|    |1521128513|
+---+-----------+---+----+----------+