scala 如何在 Spark SQL 中为自定义类型定义架构?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32440461/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:34:39  来源:igfitidea点击:

How to define schema for custom type in Spark SQL?

scalaapache-sparkapache-spark-sqlcase-class

提问by Martin Senne

The following example code tries to put some case objects into a dataframe. The code includes the definition of a case object hierarchy and a case class using this trait:

以下示例代码尝试将一些案例对象放入数据帧中。该代码包括使用此特征的案例对象层次结构和案例类的定义:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

sealed trait Some
case object AType extends Some
case object BType extends Some

case class Data( name : String, t: Some)

object Example {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName( "Example" )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
    df.show()
  }
}    

When executing the code, I unfortunately encounter the following exception:

执行代码时,不幸遇到以下异常:

java.lang.UnsupportedOperationException: Schema for type Some is not supported

Questions

问题

  • Is there a possibility to add or define a schema for certain types (here type Some)?
  • Does another approach exist to represent this kind of enumerations?
    • I tried to use Enumerationdirectly, but also without success. (see below)
  • 是否有可能为某些类型添加或定义模式(这里是 type Some)?
  • 是否存在另一种方法来表示这种枚举?
    • 我试过Enumeration直接用,也没有成功。(见下文)

Code for Enumeration:

代码Enumeration

object Some extends Enumeration {
  type Some = Value
  val AType, BType = Value
}

Thanks in advance. I hope, that the best approach is not to use strings instead.

提前致谢。我希望最好的方法不是使用字符串。

采纳答案by zero323

Spark 2.0.0+:

火花 2.0.0+

UserDefinedTypehas been made private in Spark 2.0.0 and as for now it has no Datasetfriendly replacement.

UserDefinedType已在 Spark 2.0.0 中私有化,目前还没有Dataset友好的替代品。

See: SPARK-14155 (Hide UserDefinedType in Spark 2.0)

请参阅:SPARK-14155(在 Spark 2.0 中隐藏 UserDefinedType)

Most of the time statically typed Datasetcan serve as replacement There is a pending Jira SPARK-7768to make UDT API public again with target version 2.4.

大多数时候静态类型Dataset可以作为替代品 有一个待定的 Jira SPARK-7768使 UDT API 在目标版本 2.4 中再次公开。

See also How to store custom objects in Dataset?

另请参阅如何在数据集中存储自定义对象?

Spark < 2.0.0

火花 < 2.0.0

Is there a possibility to add or define a schema for certain types (here type Some)?

是否有可能为某些类型添加或定义模式(此处键入 Some)?

I guess the answer depends on how badly you need this. It looks like it is possible to create an UserDefinedTypebut it requires access to DeveloperApiand is not exactly straightforward or well documented.

我想答案取决于你有多需要这个。看起来可以创建一个,UserDefinedType但它需要访问DeveloperApi并且不完全直接或有据可查。

import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some

class SomeUDT extends UserDefinedType[Some] {
  override def sqlType: DataType = IntegerType

  override def serialize(obj: Any) = {
    obj match {
      case AType => 0
      case BType => 1
    }
  }

  override def deserialize(datum: Any): Some = {
    datum match {
      case 0 => AType
      case 1 => BType
    }
  }

  override def userClass: Class[Some] = classOf[Some]
}

You should probably override hashCodeand equalsas well.

您可能也应该覆盖hashCodeequals

Its PySpark counterpart can look like this:

它的 PySpark 对应物可能如下所示:

from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType

class SomeUDT(UserDefinedType):
    @classmethod
    def sqlType(self):
        return IntegerType()

    @classmethod
    def module(cls):
        return cls.__module__

    @classmethod 
    def scalaUDT(cls): # Required in Spark < 1.5
        return 'net.zero323.enum.SomeUDT'

    def serialize(self, obj):
        return obj.value

    def deserialize(self, datum):
        return {x.value: x for x in Some}[datum]

@unique
class Some(Enum):
    __UDT__ = SomeUDT()
    AType = 0
    BType = 1

In Spark < 1.5 Python UDT requires a paired Scala UDT, but it look like it is no longer the case in 1.5.

在 Spark < 1.5 中,Python UDT 需要成对的 Scala UDT,但在 1.5 中似乎不再如此。

For a simple UDT like you can use simple types (for example IntegerTypeinstead of whole Struct).

对于像您这样的简单 UDT,您可以使用简单类型(例如,IntegerType而不是整体Struct)。