scala 从配置单元表读取并使用 spark sql 写回它
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38746773/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Read from a hive table and write back to it using spark sql
提问by Avi
I am reading a Hive table using Spark SQL and assigning it to a scala val
我正在使用 Spark SQL 读取 Hive 表并将其分配给 scala val
val x = sqlContext.sql("select * from some_table")
Then I am doing some processing with the dataframe x and finally coming up with a dataframe y , which has the exact schema as the table some_table.
然后我对数据框 x 进行一些处理,最后得到一个数据框 y ,它具有与表 some_table 完全相同的模式。
Finally I am trying to insert overwrite the y dataframe to the same hive table some_table
最后我试图插入覆盖 y 数据帧到同一个配置单元表 some_table
y.write.mode(SaveMode.Overwrite).saveAsTable().insertInto("some_table")
Then I am getting the error
然后我收到错误
org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from
org.apache.spark.sql.AnalysisException:无法将覆盖插入也在读取的表中
I tried creating an insert sql statement and firing it using sqlContext.sql() but it too gave me the same error.
我尝试创建一个插入 sql 语句并使用 sqlContext.sql() 触发它,但它也给了我同样的错误。
Is there any way I can bypass this error? I need to insert the records back to the same table.
有什么办法可以绕过这个错误吗?我需要将记录插入回同一个表。
Hi I tried doing as suggested , but still getting the same error .
嗨,我尝试按照建议进行操作,但仍然出现相同的错误。
val x = sqlContext.sql("select * from incremental.test2")
val y = x.limit(5)
y.registerTempTable("temp_table")
val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("incremental.test2")
scala> dy.write.mode("overwrite").insertInto("incremental.test2")
org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from.;
回答by cheseaux
You should first save your DataFrame yin a temporary table
您应该首先将 DataFrame 保存y在临时表中
y.write.mode("overwrite").saveAsTable("temp_table")
Then you can overwrite rows in your target table
然后您可以覆盖目标表中的行
val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("some_table")
回答by nsanglar
Actually you can also use checkpointing to achieve this. Since it breaks data lineage, Spark is not able to detect that you are reading and overwriting in the same table:
实际上,您也可以使用检查点来实现这一点。由于它破坏了数据沿袭,Spark 无法检测到您正在同一个表中读取和覆盖:
sqlContext.sparkContext.setCheckpointDir(checkpointDir)
val ds = sqlContext.sql("select * from some_table").checkpoint()
ds.write.mode("overwrite").saveAsTable("some_table")
回答by matteus silva
You should first save your DataFrame ylike a parquet file:
您应该首先保存您DataFrame y喜欢的镶木地板文件:
y.write.parquet("temp_table")
After you load this like:
在你加载这个之后:
val parquetFile = sqlContext.read.parquet("temp_table")
And finish you insert your data in your table
并完成将数据插入表中
parquetFile.write.insertInto("some_table")
回答by Dinesh Sachdev 108
In context to Spark 2.2
在 Spark 2.2 的上下文中
- This error means that our process is reading from same table and writing to same table.
- Normally, this should work as process writes to directory .hiveStaging...
- This error occurs in case of saveAsTable method, as it overwrites entire table instead of individual partitions.
- This error should not occur with insertInto method, as it overwrites partitions not the table.
- A reason why this happening is because Hive table has following Spark TBLProperties in its definition. This problem will solve for insertInto method if you remove following Spark TBLProperties -
- 这个错误意味着我们的进程正在从同一个表中读取并写入同一个表。
- 通常,这应该在进程写入目录 .hiveStaging 时起作用...
- 在 saveAsTable 方法的情况下会发生此错误,因为它会覆盖整个表而不是单个分区。
- insertInto 方法不应发生此错误,因为它会覆盖分区而不是表。
- 发生这种情况的一个原因是因为 Hive 表在其定义中具有以下 Spark TBLProperties。如果删除以下 Spark TBLProperties,此问题将解决 insertInto 方法 -
'spark.sql.partitionProvider' 'spark.sql.sources.provider' 'spark.sql.sources.schema.numPartCols 'spark.sql.sources.schema.numParts' 'spark.sql.sources.schema.part.0' 'spark.sql.sources.schema.part.1' 'spark.sql.sources.schema.part.2' 'spark.sql.sources.schema.partCol.0' 'spark.sql.sources.schema.partCol.1'
'spark.sql.partitionProvider' 'spark.sql.sources.provider' 'spark.sql.sources.schema.numPartCols 'spark.sql.sources.schema.numParts' 'spark.sql.sources.schema.part.0' 'spark.sql.sources.schema.part.1' 'spark.sql.sources.schema.part.2' 'spark.sql.sources.schema.partCol.0' 'spark.sql.sources.schema.partCol. 1'
https://querydb.blogspot.com/2019/07/read-from-hive-table-and-write-back-to.html
https://querydb.blogspot.com/2019/07/read-from-hive-table-and-write-back-to.html
回答by Sai Kranthi
Read the data from hive table in spark:
在spark中从hive表中读取数据:
val hconfig = new org.apache.hadoop.conf.Configuration()
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(hconfig , "dbname", "tablename")
val inputFormat = (new HCatInputFormat).asInstanceOf[InputFormat[WritableComparable[_],HCatRecord]].getClass
val data = sc.newAPIHadoopRDD(hconfig,inputFormat,classOf[WritableComparable[_]],classOf[HCatRecord])

![scala Spark:错误:值拆分不是 org.apache.spark.rdd.RDD[String] 的成员](/res/img/loading.gif)