scala 尝试将数据帧行映射到更新行时出现编码器错误

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39433419/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:37:45  来源:igfitidea点击:

Encoder error while trying to map dataframe row to updated row

scalaapache-sparkapache-spark-sqlapache-spark-datasetapache-spark-encoders

提问by Advika

When I m trying to do the same thing in my code as mentioned below

当我尝试在我的代码中做同样的事情时,如下所述

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

I have taken the above reference from here: Scala: How can I replace value in Dataframs using scalaBut I am getting encoder error as

我从这里获取了上面的参考: Scala: How can I replace value in Dataframes using scala但是我收到了编码器错误

Unable to find encoder for type stored in a Dataset. Primitive types (Int, S tring, etc) and Product types (case classes) are supported by importing spark.im plicits._ Support for serializing other types will be added in future releases.

无法找到存储在数据集中的类型的编码器。导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持。

Note: I am using spark 2.0!

注意:我使用的是 spark 2.0!

回答by zero323

There is nothing unexpected here. You're trying to use code which has been written with Spark 1.x and is no longer supported in Spark 2.0:

这里没有什么出乎意料的。您正在尝试使用用 Spark 1.x 编写且在 Spark 2.0 中不再支持的代码:

  • in 1.x DataFrame.mapis ((Row) ? T)(ClassTag[T]) ? RDD[T]
  • in 2.x Dataset[Row].mapis ((Row) ? T)(Encoder[T]) ? Dataset[T]
  • 在 1.xDataFrame.map((Row) ? T)(ClassTag[T]) ? RDD[T]
  • 在 2.xDataset[Row].map((Row) ? T)(Encoder[T]) ? Dataset[T]

To be honest it didn't make much sense in 1.x either. Independent of version you can simply use DataFrameAPI:

老实说,它在 1.x 中也没有多大意义。独立于版本,您可以简单地使用DataFrameAPI:

import org.apache.spark.sql.functions.{when, lower}

val df = Seq(
  (2012, "Tesla", "S"), (1997, "Ford", "E350"),
  (2015, "Chevy", "Volt")
).toDF("year", "make", "model")

df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))

If you really want to use mapyou should use statically typed Dataset:

如果你真的想使用map你应该使用静态类型Dataset

import spark.implicits._

case class Record(year: Int, make: String, model: String)

df.as[Record].map {
  case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
  case rec => rec
}

or at least return an object which will have implicit encoder:

或者至少返回一个具有隐式编码器的对象:

df.map {
  case Row(year: Int, make: String, model: String) => 
    (year, if(make.toLowerCase == "tesla") "S" else make, model)
}

Finally if for some completely crazyreason you really want to map over Dataset[Row]you have to provide required encoder:

最后,如果出于某种完全疯狂的原因您真的想映射,Dataset[Row]则必须提供所需的编码器:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
  StructField("year", IntegerType),
  StructField("make", StringType),
  StructField("model", StringType)
))

val encoder = RowEncoder(schema)

df.map {
  case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
    Row(year, "S", model)
  case row => row
} (encoder)

回答by PoojanKothari

For scenario where dataframe schema is known in advance answer given by @zero323 is the solution

对于预先知道数据帧模式的场景,@zero323 给出的答案是解决方案

but for scenario with dynamic schema / or passing multiple dataframe to a generic function: Following code has worked for us while migrating from 1.6.1 from 2.2.0

但是对于具有动态模式/或将多个数据帧传递给通用函数的场景:从 2.2.0 从 1.6.1 迁移时,以下代码对我们有用

import org.apache.spark.sql.Row

val df = Seq(
   (2012, "Tesla", "S"), (1997, "Ford", "E350"),
   (2015, "Chevy", "Volt")
 ).toDF("year", "make", "model")

val data = df.rdd.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

this code executes on both the versions of spark.

这段代码在两个版本的 spark 上都执行。

disadvantage : optimization provided by spark on dataframe/datasets api wont be applied.

缺点:spark 在数据帧/数据集 api 上提供的优化不会被应用。

回答by user 3317704

In my case of spark 2.4.4 version, I had to import implicits. This is a general answer

在我的 spark 2.4.4 版本的情况下,我不得不导入隐式。这是一个普遍的答案

val spark2 = spark
import spark2.implicits._

val data = df.rdd.map(row => my_func(row))

where my_func did some operation.

其中 my_func 做了一些操作。