如何在 Scala 的 Apache Spark 中将数据帧转换为数据集?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/44516627/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:18:12  来源:igfitidea点击:

How to convert a dataframe to dataset in Apache Spark in Scala?

scalaapache-sparkapache-spark-sqlapache-spark-encoders

提问by stefanobaghino

I need to convert my dataframe to a dataset and I used the following code:

我需要将我的数据框转换为数据集,我使用了以下代码:

    val final_df = Dataframe.withColumn(
      "features",
      toVec4(
        // casting into Timestamp to parse the string, and then into Int
        $"time_stamp_0".cast(TimestampType).cast(IntegerType),
        $"count",
        $"sender_ip_1",
        $"receiver_ip_2"
      )
    ).withColumn("label", (Dataframe("count"))).select("features", "label")

    final_df.show()

    val trainingTest = final_df.randomSplit(Array(0.3, 0.7))
    val TrainingDF = trainingTest(0)
    val TestingDF=trainingTest(1)
    TrainingDF.show()
    TestingDF.show()

    ///lets create our liner regression
    val lir= new LinearRegression()
    .setRegParam(0.3)
    .setElasticNetParam(0.8)
    .setMaxIter(100)
    .setTol(1E-6)

    case class df_ds(features:Vector, label:Integer)
    org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

    val Training_ds = TrainingDF.as[df_ds]

My problem is that, I got the following error:

我的问题是,我收到以下错误:

Error:(96, 36) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    val Training_ds = TrainingDF.as[df_ds]

It seems that the number of values in dataframe is different with the number of value in my class. However I am using case class df_ds(features:Vector, label:Integer)on my TrainingDF dataframe since, It has a vector of features and an integer label. Here is TrainingDF dataframe:

似乎数据框中的值数量与我班级中的值数量不同。但是我case class df_ds(features:Vector, label:Integer)在我的 TrainingDF 数据帧上使用,因为它有一个特征向量和一个整数标签。这是 TrainingDF 数据框:

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,10...|   10|
+--------------------+-----+

Also here is my original final_dfdataframe:

这里也是我原来的final_df数据

+------------+-----------+-------------+-----+
|time_stamp_0|sender_ip_1|receiver_ip_2|count|
+------------+-----------+-------------+-----+
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.3|     10.0.0.2|   10|
+------------+-----------+-------------+-----+

However I got the mentioned error! Can anybody help me? Thanks in advance.

但是我得到了提到的错误!有谁能够帮我?提前致谢。

回答by stefanobaghino

The error message you are reading is a pretty good pointer.

您正在阅读的错误消息是一个很好的指针。

When you convert a DataFrameto a Datasetyou have to have a proper Encoderfor whatever is stored in the DataFramerows.

当您将 a 转换DataFrame为 a 时,Dataset您必须对Encoder存储DataFrame在行中的任何内容进行适当的处理。

Encoders for primitive-like types (Ints, Strings, and so on) and case classesare provided by just importing the implicits for your SparkSessionlike follows:

原始类型(Ints、Strings 等)的编码器,case classes只需为您的SparkSession类导入隐式即可提供,如下所示:

case class MyData(intField: Int, boolField: Boolean) // e.g.

val spark: SparkSession = ???
val df: DataFrame = ???

import spark.implicits._

val ds: Dataset[MyData] = df.as[MyData]

If that doesn't work either is because the type you are trying to castthe DataFrameto isn't supported. In that case, you would have to write your own Encoder: you may find more information about it hereand see an example (the Encoderfor java.time.LocalDateTime) here.

如果这也不行,因为类型您要DataFrame,以不被支持。在这种情况下,你会写自己Encoder:你可能会发现更多关于它的信息在这里看到一个例子(Encoderjava.time.LocalDateTime在这里

回答by Shang Gao

Spark 1.6.0

火花 1.6.0

case class MyCase(id: Int, name: String)

val encoder = org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[MyCase]

val dataframe = …

val dataset = dataframe.as(encoder)

Spark 2.0 or above

Spark 2.0 或以上

case class MyCase(id: Int, name: String)

val encoder = org.apache.spark.sql.Encoders.product[MyCase]

val dataframe = …

val dataset = dataframe.as(encoder)