scala 如何在数据集中存储自定义对象?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36648128/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:10:55  来源:igfitidea点击:

How to store custom objects in Dataset?

scalaapache-sparkapache-spark-datasetapache-spark-encoders

提问by zero323

According to Introducing Spark Datasets:

根据介绍 Spark 数据集

As we look forward to Spark 2.0, we plan some exciting improvements to Datasets, specifically: ... Custom encoders – while we currently autogenerate encoders for a wide variety of types, we'd like to open up an API for custom objects.

当我们期待 Spark 2.0 时,我们计划对数据集进行一些激动人心的改进,特别是: ... 自定义编码器——虽然我们目前为各种类型自动生成编码器,但我们希望为自定义对象开放 API。

and attempts to store custom type in a Datasetlead to following error like:

并尝试将自定义类型存储在Dataset导致以下错误中,例如:

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases

无法找到存储在数据集中的类型的编码器。通过导入 sqlContext.implicits 支持原始类型(Int、String 等)和产品类型(case 类)。未来版本中将添加对序列化其他类型的支持

or:

或者:

Java.lang.UnsupportedOperationException: No Encoder found for ....

Java.lang.UnsupportedOperationException:未找到用于...的编码器

Are there any existing workarounds?

是否有任何现有的解决方法?



Note this question exists only as an entry point for a Community Wiki answer. Feel free to update / improve both question and answer.

请注意,此问题仅作为社区 Wiki 答案的入口点存在。随时更新/改进问题和答案。

回答by Alec

Update

更新

This answer is still valid and informative, although things are now better since 2.2/2.3, which adds built-in encoder support for Set, Seq, Map, Date, Timestamp, and BigDecimal. If you stick to making types with only case classes and the usual Scala types, you should be fine with just the implicit in SQLImplicits.

这个答案仍然是有效的和翔实的,但事已至此更好的是,因为2.2 / 2.3,它加入了内置编码器的支持SetSeqMapDateTimestamp,和BigDecimal。如果您坚持只使用 case 类和通常的 Scala 类型来创建类型,那么只使用隐式 in 应该没问题SQLImplicits



Unfortunately, virtually nothing has been added to help with this. Searching for @since 2.0.0in Encoders.scalaor SQLImplicits.scalafinds things mostly to do with primitive types (and some tweaking of case classes). So, first thing to say: there currently is no real good support for custom class encoders. With that out of the way, what follows is some tricks which do as good a job as we can ever hope to, given what we currently have at our disposal. As an upfront disclaimer: this won't work perfectly and I'll do my best to make all limitations clear and upfront.

不幸的是,几乎没有添加任何东西来帮助解决这个问题。搜索@since 2.0.0inEncoders.scalaSQLImplicits.scalafind 主要与原始类型有关(以及对 case 类进行一些调整)。所以,首先要说的是:目前没有对自定义类编码器的真正好的支持。考虑到我们目前可以使用的东西,接下来是一些技巧,它们可以像我们希望的那样出色地完成工作。作为预先免责声明:这不会完美地工作,我会尽我最大的努力使所有限制明确和预先。

What exactly is the problem

究竟是什么问题

When you want to make a dataset, Spark "requires an encoder (to convert a JVM object of type T to and from the internal Spark SQL representation) that is generally created automatically through implicits from a SparkSession, or can be created explicitly by calling static methods on Encoders" (taken from the docs on createDataset). An encoder will take the form Encoder[T]where Tis the type you are encoding. The first suggestion is to add import spark.implicits._(which gives you theseimplicit encoders) and the second suggestion is to explicitly pass in the implicit encoder using thisset of encoder related functions.

当你想要创建一个数据集时,Spark“需要一个编码器(将类型 T 的 JVM 对象与内部 Spark SQL 表示形式相互转换),它通常通过来自 a 的隐式自动创建SparkSession,或者可以通过调用静态方法显式创建上Encoders”(取自 上的文档createDataset)。编码器将采取的形式Encoder[T],其中T是要在编码类型。第一个建议是添加import spark.implicits._(它为您提供这些隐式编码器),第二个建议是使用组编码器相关函数显式传入隐式编码器。

There is no encoder available for regular classes, so

常规类没有可用的编码器,所以

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

will give you the following implicit related compile time error:

会给你以下隐含的相关编译时错误:

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases

无法找到存储在数据集中的类型的编码器。通过导入 sqlContext.implicits 支持原始类型(Int、String 等)和产品类型(case 类)。未来版本中将添加对序列化其他类型的支持

However, if you wrap whatever type you just used to get the above error in some class that extends Product, the error confusingly gets delayed to runtime, so

但是,如果您将刚刚用于获取上述错误的任何类型包装在某个 extends 类中Product,错误就会令人困惑地延迟到运行时,所以

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

Compiles just fine, but fails at runtime with

编译得很好,但在运行时失败

java.lang.UnsupportedOperationException: No Encoder found for MyObj

java.lang.UnsupportedOperationException:未找到 MyObj 的编码器

The reason for this is that the encoders Spark creates with the implicits are actually only made at runtime (via scala relfection). In this case, all Spark checks at compile time is that the outermost class extends Product(which all case classes do), and only realizes at runtime that it still doesn't know what to do with MyObj(the same problem occurs if I tried to make a Dataset[(Int,MyObj)]- Spark waits until runtime to barf on MyObj). These are central problems that are in dire need of being fixed:

这样做的原因是 Spark 使用隐式创建的编码器实际上仅在运行时生成(通过 scala relfection)。在这种情况下,所有 Spark 在编译时检查都是最外层的类扩展Product(所有 case 类都这样做),并且只在运行时意识到它仍然不知道如何处理MyObj(如果我试图使a Dataset[(Int,MyObj)]- Spark 一直等到运行时才开始呕吐MyObj)。这些是急需解决的核心问题:

  • some classes that extend Productcompile despite always crashing at runtime and
  • there is no way of passing in custom encoders for nested types (I have no way of feeding Spark an encoder for just MyObjsuch that it then knows how to encode Wrap[MyObj]or (Int,MyObj)).
  • Product尽管总是在运行时崩溃,但一些扩展了compile 的类
  • 无法为嵌套类型传递自定义编码器(我无法为 Spark 提供一个编码器,MyObj以便它知道如何编码Wrap[MyObj](Int,MyObj))。

Just use kryo

只需使用 kryo

The solution everyone suggests is to use the kryoencoder.

大家建议的解决方案是使用kryo编码器。

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

This gets pretty tedious fast though. Especially if your code is manipulating all sorts of datasets, joining, grouping etc. You end up racking up a bunch of extra implicits. So, why not just make an implicit that does this all automatically?

不过,这很快就会变得非常乏味。特别是如果您的代码正在处理各种数据集、连接、分组等。您最终会积累一堆额外的隐式。那么,为什么不隐含地自动完成这一切呢?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

And now, it seems like I can do almost anything I want (the example below won't work in the spark-shellwhere spark.implicits._is automatically imported)

现在,似乎我几乎可以做任何我想做的事情(下面的例子在自动导入的spark-shell地方不起作用spark.implicits._

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

Or almost. The problem is that using kryoleads to Spark just storing every row in the dataset as a flat binary object. For map, filter, foreachthat is enough, but for operations like join, Spark really needs these to be separated into columns. Inspecting the schema for d2or d3, you see there is just one binary column:

或者差不多。问题是使用kryo导致 Spark 只是将数据集中的每一行存储为平面二进制对象。对于map, filterforeach这就足够了,但是对于像 一样的操作join,Spark 确实需要将它们分成列。检查d2or的架构d3,您会看到只有一个二进制列:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

Partial solution for tuples

元组的部分解决方案

So, using the magic of implicits in Scala (more in 6.26.3 Overloading Resolution), I can make myself a series of implicits that will do as good a job as possible, at least for tuples, and will work well with existing implicits:

因此,使用 Scala 中隐式的魔力(在6.26.3 重载解析中有更多内容),我可以为自己制作一系列的隐式,它们将尽可能地做好工作,至少对于元组,并且可以很好地与现有的隐式配合使用:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

Then, armed with these implicits, I can make my example above work, albeit with some column renaming

然后,有了这些隐含的东西,我可以让我上面的例子工作,尽管有一些列重命名

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

I haven't yet figured out how to get the expected tuple names (_1, _2, ...) by default without renaming them - if someone else wants to play around with this, thisis where the name "value"gets introduced and thisis where the tuple names are usually added. However, the key point is that that I now have a nice structured schema:

我还没有弄清楚如何在默认情况下获得预期的元组名称 ( _1, _2, ...) 而不重命名它们 - 如果其他人想玩这个,就是名称"value"被引入的地方,就是元组的地方通常添加名称。然而,关键是我现在有一个很好的结构化模式:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

So, in summary, this workaround:

所以,总而言之,这个解决方法:

  • allows us to get separate columns for tuples (so we can join on tuples again, yay!)
  • we can again just rely on the implicits (so no need to be passing in kryoall over the place)
  • is almost entirely backwards compatible with import spark.implicits._(with some renaming involved)
  • does notlet us join on the kyroserialized binary columns, let alone on fields those may have
  • has the unpleasant side-effect of renaming some of the tuple columns to "value" (if necessary, this can be undone by converting .toDF, specifying new column names, and converting back to a dataset - and the schema names seem to be preserved through joins, where they are most needed).
  • 允许我们为元组获得单独的列(所以我们可以再次加入元组,是的!)
  • 我们可以再次只依赖于隐式(所以不需要kryo到处传递)
  • 几乎完全向后兼容import spark.implicits._(涉及一些重命名)
  • 没有让我们一起上kyro连载二列,更不用说对这些领域有可能
  • 具有将某些元组列重命名为“值”的令人不快的副作用(如有必要,可以通过转换.toDF、指定新列名并转换回数据集来撤消此操作- 并且模式名称似乎通过联接保留,最需要它们的地方)。

Partial solution for classes in general

一般类的部分解决方案

This one is less pleasant and has no good solution. However, now that we have the tuple solution above, I have a hunch the implicit conversion solution from another answer will be a bit less painful too since you can convert your more complex classes to tuples. Then, after creating the dataset, you'd probably rename the columns using the dataframe approach. If all goes well, this is reallyan improvement since I can now perform joins on the fields of my classes. If I had just used one flat binary kryoserializer that wouldn't have been possible.

这个不太愉快,没有很好的解决方案。但是,既然我们有了上面的元组解决方案,我有一种预感,来自另一个答案的隐式转换解决方案也不会那么痛苦,因为您可以将更复杂的类转换为元组。然后,在创建数据集后,您可能会使用数据框方法重命名列。如果一切顺利,这真的是一个改进,因为我现在可以在我的类的字段上执行连接。如果我只使用一个平面二进制kryo序列化器,那是不可能的。

Here is an example that does a bit of everything: I have a class MyObjwhich has fields of types Int, java.util.UUID, and Set[String]. The first takes care of itself. The second, although I could serialize using kryowould be more useful if stored as a String(since UUIDs are usually something I'll want to join against). The third really just belongs in a binary column.

这里是做了一切位的例子:我有一个类MyObj,其具有的类型的字段Intjava.util.UUID以及Set[String]。第一个照顾自己。第二个,虽然我可以序列化 usingkryo如果存储为 a 会更有用String(因为UUIDs 通常是我想要加入的东西)。第三个实际上只属于一个二进制列。

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

Now, I can create a dataset with a nice schema using this machinery:

现在,我可以使用这种机制创建一个具有良好模式的数据集:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

And the schema shows me I columns with the right names and with the first two both things I can join against.

模式向我显示了具有正确名称的 I 列,以及我可以加入的前两项。

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)

回答by zero323

  1. Using generic encoders.

    There are two generic encoders available for now kryoand javaSerializationwhere the latter one is explicitly described as:

    extremely inefficient and should only be used as the last resort.

    Assuming following class

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    

    you can use these encoders by adding implicit encoder:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    which can be used together as follows:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    

    It stores objects as binarycolumn so when converted to DataFrameyou get following schema:

    root
     |-- value: binary (nullable = true)
    

    It is also possible to encode tuples using kryoencoder for specific field:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    Please note that we don't depend on implicit encoders here but pass encoder explicitly so this most likely won't work with toDSmethod.

  2. Using implicit conversions:

    Provide implicit conversions between representation which can be encoded and custom class, for example:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    
  1. 使用通用编码器。

    有现在两个可用通用的编码器kryojavaSerialization其中后者明确地描述为:

    效率极低,只能作为最后的手段。

    假设以下课程

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    

    您可以通过添加隐式编码器来使用这些编码器:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    可以一起使用,如下所示:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    

    它将对象存储为binary列,因此在转换为DataFrame您时会获得以下架构:

    root
     |-- value: binary (nullable = true)
    

    也可以使用kryo特定字段的编码器对元组进行编码:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    请注意,我们在这里不依赖隐式编码器,而是显式传递编码器,因此这很可能不适用于toDS方法。

  2. 使用隐式转换:

    提供可编码的表示和自定义类之间的隐式转换,例如:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    

Related questions:

相关问题:

回答by ChoppyTheLumberHyman

You can use UDTRegistration and then Case Classes, Tuples, etc... all work correctly with your User Defined Type!

您可以使用 UDTRegistration,然后使用案例类、元组等……所有这些都可以与您的用户定义类型一起正常工作!

Say you want to use a custom Enum:

假设您要使用自定义 Enum:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

Register it like this:

像这样注册:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

Then USE IT!

然后使用它!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

Say you want to use a Polymorphic Record:

假设您要使用多态记录:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

... and the use it like this:

...并像这样使用它:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

You can write a custom UDT that encodes everything to bytes (I'm using java serialization here but it's probably better to instrument Spark's Kryo context).

您可以编写一个自定义 UDT,将所有内容编码为字节(我在此处使用 java 序列化,但检测 Spark 的 Kryo 上下文可能更好)。

First define the UDT class:

首先定义UDT类:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

Then register it:

然后注册它:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

Then you can use it!

然后就可以使用了!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

回答by Sarvesh Kumar Singh

Encoders work more or less the same in Spark2.0. And Kryois still the recommended serializationchoice.

编码器在Spark2.0. 并且Kryo仍然是推荐的serialization选择。

You can look at following example with spark-shell

您可以使用 spark-shell 查看以下示例

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

Till now] there were no appropriate encodersin present scope so our persons were not encoded as binaryvalues. But that will change once we provide some implicitencoders using Kryoserialization.

到目前为止]appropriate encoders目前范围内没有,所以我们的人没有被编码为binary值。但是一旦我们implicit使用Kryo序列化提供了一些编码器,这种情况就会改变。

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.

回答by Akash Mahajan

In case of Java Bean class, this can be useful

在 Java Bean 类的情况下,这可能很有用

import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

Now you can simply read the dataFrame as custom DataFrame

现在您可以简单地将数据帧读取为自定义数据帧

dataFrame.as[MyClass]

This will create a custom class encoder and not a binary one.

这将创建一个自定义类编码器而不是二进制编码器。

回答by Jimmy Da

My examples will be in Java, but I don't imagine it to be difficult adapting to Scala.

我的示例将使用 Java,但我认为适应 Scala 并不困难。

I have been quite successful converting RDD<Fruit>to Dataset<Fruit>using spark.createDatasetand Encoders.beanas long as Fruitis a simple Java Bean.

只要是一个简单的Java Bean,我就非常成功地转换RDD<Fruit>Dataset<Fruit>使用spark.createDatasetEncoders.beanFruit

Step 1: Create the simple Java Bean.

步骤 1:创建简单的 Java Bean。

public class Fruit implements Serializable {
    private String name  = "default-fruit";
    private String color = "default-color";

    // AllArgsConstructor
    public Fruit(String name, String color) {
        this.name  = name;
        this.color = color;
    }

    // NoArgsConstructor
    public Fruit() {
        this("default-fruit", "default-color");
    }

    // ...create getters and setters for above fields
    // you figure it out
}

I'd stick to classes with primitive types and String as fields before the DataBricks folks beef up their Encoders. If you have a class with nested object, create another simple Java Bean with all of its fields flattened, so you can use RDD transformations to map the complex type to the simpler one.Sure it's a little extra work, but I imagine it'll help a lot on performance working with a flat schema.

在 DataBricks 的人加强他们的编码器之前,我会坚持使用原始类型和 String 作为字段的类。如果您有一个带有嵌套对象的类,请创建另一个简单的 Java Bean,并将其所有字段都展平,这样您就可以使用 RDD 转换将复杂类型映射到更简单的类型。当然这是一些额外的工作,但我想它会对使用平面模式的性能有很大帮助。

Step 2: Get your Dataset from the RDD

第 2 步:从 RDD 中获取数据集

SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();

List<Fruit> fruitList = ImmutableList.of(
    new Fruit("apple", "red"),
    new Fruit("orange", "orange"),
    new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);


RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

And voila! Lather, rinse, repeat.

瞧!起泡,冲洗,重复。

回答by Taeheon Kwon

For those who may in my situation I put my answer here, too.

对于那些可能在我的情况下的人,我也把我的答案放在这里。

To be specific,

再具体一点,

  1. I was reading 'Set typed data' from SQLContext. So original data format is DataFrame.

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. Then convert it into RDD using rdd.map() with mutable.WrappedArray type.

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    Result:

    (1,Set(1))

  1. 我正在从 SQLContext 读取“设置类型数据”。所以原始数据格式是DataFrame。

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. 然后使用具有 mutable.WrappedArray 类型的 rdd.map() 将其转换为 RDD。

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    结果:

    (1,Set(1))

回答by Matt

In addition to the suggestions already given, another option I recently discovered is that you can declare your custom class including the trait org.apache.spark.sql.catalyst.DefinedByConstructorParams.

除了已经给出的建议之外,我最近发现的另一个选项是您可以声明您的自定义类,包括 trait org.apache.spark.sql.catalyst.DefinedByConstructorParams

This works if the class has a constructor that uses types the ExpressionEncoder can understand, i.e. primitive values and standard collections. It can come in handy when you're not able to declare the class as a case class, but don't want to use Kryo to encode it every time it's included in a Dataset.

如果类具有使用 ExpressionEncoder 可以理解的类型(即原始值和标准集合)的构造函数,则此方法有效。当您无法将类声明为案例类,但又不想在每次将其包含在数据集中时使用 Kryo 对其进行编码时,它会派上用场。

For example, I wanted to declare a case class that included a Breeze vector. The only encoder that would be able to handle that would normally be Kryo. But if I declared a subclass that extended the Breeze DenseVector and DefinedByConstructorParams, the ExpressionEncoder understood that it could be serialized as an array of Doubles.

例如,我想声明一个包含 Breeze 向量的案例类。唯一能够处理这种情况的编码器通常是 Kryo。但是如果我声明了一个扩展 Breeze DenseVector 和 DefinedByConstructorParams 的子类,ExpressionEncoder 就会明白它可以被序列化为一个双精度数组。

Here's how I declared it:

我是这样声明的:

class SerializableDenseVector(values: Array[Double]) extends breeze.linalg.DenseVector[Double](values) with DefinedByConstructorParams
implicit def BreezeVectorToSerializable(bv: breeze.linalg.DenseVector[Double]): SerializableDenseVector = bv.asInstanceOf[SerializableDenseVector]

Now I can use SerializableDenseVectorin a Dataset (directly, or as part of a Product) using a simple ExpressionEncoder and no Kryo. It works just like a Breeze DenseVector but serializes as an Array[Double].

现在我可以使用SerializableDenseVector简单的 ExpressionEncoder 而没有 Kryo 在数据集中使用(直接或作为产品的一部分)。它的工作方式与 Breeze DenseVector 类似,但序列化为 Array[Double]。