scala 将 ArrayBuffer 转换为 DataFrame 中的 HashSet 到 Hive 表中的 RDD 时的 GenericRowWithSchema 异常
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32727518/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
GenericRowWithSchema exception in casting ArrayBuffer to HashSet in DataFrame to RDD from Hive table
提问by Glenn Strycker
I have a Hive table in parquet format that was generated using
我有一个镶木地板格式的 Hive 表,它是使用生成的
create table myTable (var1 int, var2 string, var3 int, var4 string, var5 array<struct<a:int,b:string>>) stored as parquet;
I am able to verify that it was filled -- here is a sample value
我能够验证它是否已填充——这是一个示例值
[1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])]
I wish to put this into a Spark RDD of the form
我希望将其放入表单的 Spark RDD
((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello"))))
Now, using spark-shell (I get the same problem in spark-submit), I made a test RDD with these values
现在,使用 spark-shell(我在 spark-submit 中遇到了同样的问题),我用这些值做了一个测试 RDD
scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), ArrayBuffer[(Int,String)]((1,"hello"))))))
tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[44] at parallelize at <console>:85
using an iterator, I can cast the ArrayBuffer as a HashSet in the following new RDD:
使用迭代器,我可以将 ArrayBuffer 转换为以下新 RDD 中的 HashSet:
scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[46] at map at <console>:87
scala> tempRDD2.collect.foreach(println)
((1,abcdef),((2,ghijkl),Set((1,hello))))
But when I attempt to do the EXACT SAME THING with a DataFrame with a HiveContext / SQLContext, I get the following error:
但是,当我尝试对带有 HiveContext / SQLContext 的 DataFrame 执行完全相同的操作时,出现以下错误:
scala> val hc = new HiveContext(sc)
scala> import hc._
scala> import hc.implicits._
scala> val tempHiveQL = hc.sql("""select var1, var2, var3, var4, var5 from myTable""")
scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt, a(1).toString), ((a(2).toString.toInt, a(3).toString), a(4).asInstanceOf[ArrayBuffer[(Int,String)]] )))
scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD3: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[47] at map at <console>:91
scala> tempRDD3.collect.foreach(println)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage 14.0 (TID 5211, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$$anonfun$apply.apply(<console>:91)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:91)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:91)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1503)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1503)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1191)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
Note that I get this same error "GenericRowWithSchema cannot be cast to scala.Tuple2" when I run this in a compiled program using spark-submit. The program crashes at RUN TIME when it encounters the conversion step, and I had no compiler errors.
请注意,当我使用 spark-submit 在已编译的程序中运行它时,我得到了同样的错误“GenericRowWithSchema 无法转换为 scala.Tuple2”。程序在遇到转换步骤时在运行时崩溃,我没有编译器错误。
It seems very strange to me that my artificially generated RDD "tempRDD" would work with the conversion, whereas the Hive query DataFrame->RDD did not. I checked, and both of the RDDs have the same form:
对我来说,我人工生成的 RDD“tempRDD”可以用于转换似乎很奇怪,而 Hive 查询 DataFrame->RDD 则不能。我查了一下,两个RDD的形式都一样:
scala> tempRDD
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = MapPartitionsRDD[21] at map at DataFrame.scala:776
scala> tempRDDfromHive
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[25] at parallelize at <console>:70
the only difference is where their last step originated. I even tried persisting, checkpointing, and materializing these RDDs before running the steps for tempRDD2 and tempRDD3. All got the same error message.
唯一的区别是他们最后一步的起源。在运行 tempRDD2 和 tempRDD3 的步骤之前,我什至尝试持久化、检查点和物化这些 RDD。所有人都收到了相同的错误消息。
I also read though related stackoverflow questions and Apache Spark Jira issues, and from those I attempted casting the ArrayBuffer as an Iterator instead, but that also failed on the second step with the same error.
我还阅读了相关的 stackoverflow 问题和 Apache Spark Jira 问题,并且从这些问题中我尝试将 ArrayBuffer 转换为迭代器,但在第二步中也失败了,并出现了相同的错误。
Does anyone know how to properly convert ArrayBuffers to HashSets for DataFrames originating from Hive tables? Since the error seems to be only for the Hive table version, I'm tempted to think that this is an issue with Spark/Hive integration in SparkQL.
有谁知道如何将 ArrayBuffers 正确转换为源自 Hive 表的 DataFrames 的 HashSets?由于错误似乎只针对 Hive 表版本,我很想认为这是 SparkQL 中 Spark/Hive 集成的问题。
Any ideas?
有任何想法吗?
Thanks in advance.
提前致谢。
[edited] BTW, my Spark version is 1.3.0 CDH.
[编辑] 顺便说一句,我的 Spark 版本是 1.3.0 CDH。
[edited: here are the printSchema results]
[编辑:这里是 printSchema 结果]
scala> tempRDDfromHive.printSchema()
root
|-- var1: integer (nullable = true)
|-- var2: string (nullable = true)
|-- var3: integer (nullable = true)
|-- var4: string (nullable = true)
|-- var5: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: integer (nullable = true)
| | |-- b: string (nullable = true)
回答by zero323
What you actually get during mapphase is not an ArrayBuffer[(Int, String)]but an ArrayBuffer[Row]hence the error. Ignoring other columns what you need is something like this:
你在map阶段中实际得到的不是一个ArrayBuffer[(Int, String)]而是一个ArrayBuffer[Row]因此错误。忽略您需要的其他列是这样的:
import org.apache.spark.sql.Row
tempHiveQL.map((a: Row) =>
a.getAs[Seq[Row]](4).map{case Row(k: Int, v: String) => (k, v)}.toSet)
It looks like this issue has been fixed in Spark 1.5.0.
看起来这个问题已在 Spark 1.5.0 中修复。

![scala setMaster`local[*]` 在火花中是什么意思?](/res/img/loading.gif)