scala 如何在 Spark 2.X 数据集中创建自定义编码器?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37706420/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:21:28  来源:igfitidea点击:

How to create a custom Encoder in Spark 2.X Datasets?

scalaapache-sparkapache-spark-datasetapache-spark-encoders

提问by javadba

Spark Datasets move away from Row's to Encoder's for Pojo's/primitives. The Catalystengine uses an ExpressionEncoderto convert columns in a SQL expression. However there do not appear to be other subclasses of Encoderavailable to use as a template for our own implementations.

Spark 数据集从 Row's 移到Encoder's for Pojo's/primitives。该Catalyst引擎使用ExpressionEncoder的列转换成SQL表达式。然而,似乎没有其他子类Encoder可用作我们自己实现的模板。

Here is an example of code that is happy in Spark 1.X / DataFrames that does not compile in the new regime:

这是一个在 Spark 1.X / DataFrames 中没有在新机制中编译的代码示例:

//mapping each row to RDD tuple
df.map(row => {
    var id: String = if (!has_id) "" else row.getAs[String]("id")
    var label: String = row.getAs[String]("label")
    val channels  : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
    val height  : Int = if (!has_height) 0 else row.getAs[Int]("height")
    val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
    val data : Array[Byte] = row.getAs[Any]("data") match {
      case str: String => str.getBytes
      case arr: Array[Byte@unchecked] => arr
      case _ => {
        log.error("Unsupport value type")
        null
      }
    }
    (id, label, channels, height, width, data)
  }).persist(StorageLevel.DISK_ONLY)

}

}

We get a compiler error of

我们得到一个编译器错误

Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported 
by importing spark.implicits._  Support for serializing other types will be added in future releases.
    df.map(row => {
          ^

So then somehow/somewhere there should be a means to

那么不知何故/某处应该有办法

  • Define/implement our custom Encoder
  • Apply it when performing a mapping on the DataFrame(which is now a Dataset of type Row)
  • Register the Encoder for use by other custom code
  • 定义/实现我们的自定义编码器
  • 在对DataFrame(现在是类型的数据集Row)执行映射时应用它
  • 注册编码器以供其他自定义代码使用

I am looking for code that successfully performs these steps.

我正在寻找成功执行这些步骤的代码。

回答by zero323

As far as I am aware nothing really changed since 1.6 and the solutions described in How to store custom objects in Dataset?are the only available options. Nevertheless your current code should work just fine with default encoders for product types.

据我所知,自 1.6 和如何在数据集中存储自定义对象?是唯一可用的选项。尽管如此,您当前的代码应该可以与产品类型的默认编码器配合使用。

To get some insight why your code worked in 1.x and may not work in 2.0.0 you'll have to check the signatures. In 1.x DataFrame.mapis a method which takes function Row => Tand transforms RDD[Row]into RDD[T].

要了解为什么您的代码可以在 1.x 中工作而在 2.0.0 中可能无法工作,您必须检查签名。在 1.x 中DataFrame.map有一个方法,它接受函数Row => T并转换RDD[Row]RDD[T].

In 2.0.0 DataFrame.maptakes a function of type Row => Tas well, but transforms Dataset[Row](a.k.a DataFrame) into Dataset[T]hence Trequires an Encoder. If you want to get the "old" behavior you should use RDDexplicitly:

在 2.0.0 中也DataFrame.map采用 type 的函数Row => T,但是将Dataset[Row](又名DataFrame)转换为Dataset[T]因此T需要一个Encoder. 如果您想获得“旧”行为,您应该RDD明确使用:

df.rdd.map(row => ???)

For Dataset[Row]mapsee Encoder error while trying to map dataframe row to updated row

对于Dataset[Row]map编码器故障而试图数据帧的行映射到更新的行

回答by eyal edelman

回答by Valan Aravind

I imported spark.implicits._ Where spark is the SparkSession and it solved the error and custom encoders got imported.

我导入了 spark.implicits._ 其中 spark 是 SparkSession,它解决了错误并导入了自定义编码器。

Also, writing a custom encoder is a way out which I've not tried.

此外,编写自定义编码器是一种我没有尝试过的出路。

Working solution:- Create SparkSession and import the following

工作解决方案:- 创建 SparkSession 并导入以下内容

import spark.implicits._

导入 spark.implicits._