scala 如何将 Spark DataFrame 插入 Hive 内部表?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/42219210/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:05:15  来源:igfitidea点击:

How to insert Spark DataFrame to Hive Internal table?

scalahiveapache-spark-sqlspark-dataframe

提问by Shankar

What's the right way to insert DF to Hive Internal table in Append Mode. It seems we can directly write the DF to Hive using "saveAsTable" method OR store the DF to temp table then use the query.

在追加模式下将 DF 插入 Hive 内部表的正确方法是什么。似乎我们可以使用“saveAsTable”方法直接将 DF 写入 Hive 或将 DF 存储到临时表然后使用查询。

df.write().mode("append").saveAsTable("tableName")

OR

或者

df.registerTempTable("temptable") 
sqlContext.sql("CREATE TABLE IF NOT EXISTS mytable as select * from temptable")

Will the second approach append the records or overwrite it?

第二种方法是追加记录还是覆盖记录?

Is there any other way to effectively write the DF to Hive Internal table?

有没有其他方法可以有效地将 DF 写入 Hive 内部表?

采纳答案by Sandeep Singh

df.saveAsTable("tableName", "append")is deprecated. Instead you should the second approach.

df.saveAsTable("tableName", "append")已弃用。相反,您应该采用第二种方法。

sqlContext.sql("CREATE TABLE IF NOT EXISTS mytable as select * from temptable")

It will create table if the table doesnot exist. When you will run your code second time you need to drop the existing table otherwise your code will exit with exception.

如果表不存在,它将创建表。当您第二次运行代码时,您需要删除现有表,否则您的代码将异常退出。

Another approach, If you don't want to drop table. Create a table separately, then insert your data into that table.

另一种方法,如果你不想删除表。单独创建一个表,然后将您的数据插入该表中。

The below code will append data into existing table

下面的代码将数据附加到现有表中

sqlContext.sql("insert into table mytable select * from temptable")

And the below code will overwrite the data into existing table

下面的代码会将数据覆盖到现有表中

sqlContext.sql("insert overwrite table mytable select * from temptable")

This answer is based on Spark 1.6.2. In case you are using other version of Spark I would suggests to check the appropriate documentation.

这个答案基于 Spark 1.6.2。如果您使用其他版本的 Spark,我建议您查看相应的文档。

回答by Spcogg the second

Neither of the options here worked for me/probably depreciated since the answer was written.

自从写出答案以来,这里的任何选项都不适合我/可能会贬值。

According to the latest spark API docs(for Spark 2.1), it's using the insertInto()method from the DataFrameWriterclass

根据最新的spark API 文档(对于 Spark 2.1),它使用insertInto()DataFrameWriter类中的方法

I'm using the Python PySpark APIbut it would be the same in Scala:

我使用的是Python PySpark API,但它在 Scala 中是一样的:

df.write.insertInto(target_db.target_table,overwrite = False)

The above worked for me.

以上对我有用。

回答by uh_big_mike_boi

You could also insert and just overwrite the partition you are inserting into and you could do it with dynamic partitioning.

您也可以插入并覆盖您要插入的分区,您可以使用动态分区来完成。

spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

temp_table = "tmp_{}".format(table)
df.createOrReplaceTempView(temp_table)
spark.sql("""
    insert overwrite table `{schema}`.`{table}`
    partition (partCol1, partCol2)
      select col1       
           , col2       
           , col3       
           , col4   
           , partCol1
           , partCol2
    from {temp_table}
""".format(schema=schema, table=table, temp_table=temp_table))