scala 从 spark DataFrame 中提取 `Seq[(String,String,String)]`

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37553059/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:20:17  来源:igfitidea点击:

Extracting `Seq[(String,String,String)]` from spark DataFrame

scalaapache-sparkdataframeapache-spark-sql

提问by Matti Lyra

I have a spark DF with rows of Seq[(String, String, String)]. I'm trying to do some kind of a flatMapwith that but anything I do try ends up throwing

我有一个带有Seq[(String, String, String)]. 我正在尝试做一些flatMap事情,但我尝试做的任何事情最终都会抛出

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple3

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 不能转换为 scala.Tuple3

I can take a single row or multiple rows from the DF just fine

我可以从 DF 中取出一行或多行就好了

df.map{ r => r.getSeq[Feature](1)}.first

returns

回报

Seq[(String, String, String)] = WrappedArray([ancient,jj,o], [olympia_greece,nn,location] .....

and the data type of the RDD seems correct.

并且 RDD 的数据类型似乎是正确的。

org.apache.spark.rdd.RDD[Seq[(String, String, String)]]

org.apache.spark.rdd.RDD[Seq[(String, String, String)]]

The schema of the df is

df 的架构是

root
 |-- article_id: long (nullable = true)
 |-- content_processed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- lemma: string (nullable = true)
 |    |    |-- pos_tag: string (nullable = true)
 |    |    |-- ne_tag: string (nullable = true)

I know this problem is related to spark sql treating the RDD rows as org.apache.spark.sql.Roweven though they idiotically say that it's a Seq[(String, String, String)]. There's a related question (link below) but the answer to that question doesn't work for me. I am also not familiar enough with spark to figure out how to turn it into a working solution.

我知道这个问题与 spark sql 处理 RDD 行有关,org.apache.spark.sql.Row即使他们愚蠢地说它是一个Seq[(String, String, String)]. 有一个相关的问题(下面的链接),但该问题的答案对我不起作用。我对 spark 也不够熟悉,无法弄清楚如何将它变成一个有效的解决方案。

Are the rows Row[Seq[(String, String, String)]]or Row[(String, String, String)]or Seq[Row[(String, String, String)]]or something even crazier.

Row[Seq[(String, String, String)]]Row[(String, String, String)]Seq[Row[(String, String, String)]]或什至更疯狂。

I'm trying to do something like

我正在尝试做类似的事情

df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1)

which appears to work but doesn't actually

这似乎有效但实际上不起作用

df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1).first

throws the above error. So how am I supposed to (for instance) get the first element of the second tuple on each row?

抛出上述错误。那么我应该如何(例如)获得每行第二个元组的第一个元素?

Also WHYhas spark been designed to do this, it seems idiotic to claim that something is of one type when in fact it isn't and can not be converted to the claimed type.

另外为什么spark被设计成这样做,声称某物是一种类型而实际上它不是并且不能转换为声称的类型似乎是愚蠢的。



Related question: GenericRowWithSchema exception in casting ArrayBuffer to HashSet in DataFrame to RDD from Hive table

相关问题:GenericRowWithSchema 异常,将 DataFrame 中的 ArrayBuffer 转换为 HashSet 到 Hive 表中的 RDD

Related bug report: http://search-hadoop.com/m/q3RTt2bvwy19Dxuq1&subj=ClassCastException+when+extracting+and+collecting+DF+array+column+type

相关bug报告:http: //search-hadoop.com/m/q3RTt2bvwy19Dxuq1&subj=ClassCastException+when+extracting+and+collecting+DF+array+column+type

回答by zero323

Well, it doesn't claim that it is a tuple. It claims it is a structwhich maps to Row:

好吧,它并没有声称它是一个元组。它声称它是一个struct映射到Row

import org.apache.spark.sql.Row

case class Feature(lemma: String, pos_tag: String, ne_tag: String)
case class Record(id: Long, content_processed: Seq[Feature])

val df = Seq(
  Record(1L, Seq(
    Feature("ancient", "jj", "o"),
    Feature("olympia_greece", "nn", "location")
  ))
).toDF

val content = df.select($"content_processed").rdd.map(_.getSeq[Row](0))

You'll find exact mapping rules in the Spark SQL programming guide.

您将在Spark SQL 编程指南 中找到确切的映射规则。

Since Rowis not exactly pretty structure you'll probably want to map it to something useful:

由于Row不是完全漂亮的结构,您可能希望将其映射到有用的东西:

content.map(_.map {
  case Row(lemma: String, pos_tag: String, ne_tag: String) => 
    (lemma, pos_tag, ne_tag)
})

or:

或者:

content.map(_.map ( row => (
  row.getAs[String]("lemma"),
  row.getAs[String]("pos_tag"),
  row.getAs[String]("ne_tag")
)))

Finally a slightly more concise approach with Datasets:

最后一个稍微更简洁的方法Datasets

df.as[Record].rdd.map(_.content_processed)

or

或者

df.select($"content_processed").as[Seq[(String, String, String)]]

although this seems to be slightly buggy at this moment.

虽然目前这似乎有点问题。

There is important difference the first approach (Row.getAs) and the second one (Dataset.as). The former one extract objects as Anyand applies asInstanceOf. The latter one is using encoders to transform between internal types and desired representation.

第一种方法 ( Row.getAs) 和第二种方法 ( )有重要区别Dataset.as。前一种提取对象为Any和适用asInstanceOf。后者是使用编码器在内部类型和所需表示之间进行转换。

回答by linehrr

object ListSerdeTest extends App {

  implicit val spark: SparkSession = SparkSession
    .builder
    .master("local[2]")
    .getOrCreate()


  import spark.implicits._
  val myDS = spark.createDataset(
    Seq(
      MyCaseClass(mylist = Array(("asd", "aa"), ("dd", "ee")))
    )
  )

  myDS.toDF().printSchema()

  myDS.toDF().foreach(
    row => {
      row.getSeq[Row](row.fieldIndex("mylist"))
        .foreach {
          case Row(a, b) => println(a, b)
        }
    }
  )
}

case class MyCaseClass (
                 mylist: Seq[(String, String)]
               )

Above code is yet another way to deal with nested structure. Spark default Encoder will encode TupleX, making them nested struct, that's why you are seeing this strange behaviour. and like others said in the comment, you can't just do getAs[T]()since it's just a cast(x.asInstanceOf[T]), therefore will give you runtime exceptions.

上面的代码是另一种处理嵌套结构的方法。Spark 默认编码器将对 TupleX 进行编码,使它们成为嵌套结构,这就是您看到这种奇怪行为的原因。就像其他人在评论中所说的那样,你不能这样做,getAs[T]()因为它只是一个 cast( x.asInstanceOf[T]),因此会给你运行时异常。