scala Spark 数据集 API - 加入

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36462674/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:09:06  来源:igfitidea点击:

Spark Dataset API - join

scalaapache-sparkapache-spark-sqlapache-spark-dataset

提问by mastro

I am trying to use the Spark DatasetAPI but I am having some issues doing a simple join.

我正在尝试使用 Spark数据集API,但在进行简单连接时遇到了一些问题。

Let's say I have two dataset with fields: date | value, then in the case of DataFramemy join would look like:

假设我有两个带有字段的数据集: date | value,那么在DataFrame我加入的情况下看起来像:

val dfA : DataFrame
val dfB : DataFrame

dfA.join(dfB, dfB("date") === dfA("date") )

However for Datasetthere is the .joinWithmethod, but the same approach does not work:

然而对于Dataset.joinWith方法,但同样的方法不起作用:

val dfA : Dataset
val dfB : Dataset

dfA.joinWith(dfB, ? )

What is the argument required by .joinWith?

需要的参数是.joinWith什么?

回答by David Griffin

To use joinWithyou first have to create a DataSet, and most likely two of them. To create a DataSet, you need to create a case class that matches your schema and call DataFrame.as[T]where Tis your case class. So:

要使用,joinWith您首先必须创建一个DataSet,而且很可能是其中两个。要创建一个DataSet,您需要创建您的架构和呼叫一致的情况下类DataFrame.as[T]哪里T是你的案件类。所以:

case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]

You could also skip the case class and use a tuple:

您也可以跳过案例类并使用元组:

val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

Then if you had another case class / DF, like this say:

然后,如果您有另一个案例类/ DF,就像这样说:

case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]

Then, while the syntax of joinand joinWithare similar, the results are different:

然后,在语法joinjoinWith相似,结果是不同的:

df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// |  1| asdf|  1| 7.7| 101|
// |  2|34234|  2| 1.2|  10|
// +---+-----+---+----+----+

ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// |       _1|         _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+

As you can see, joinWithleaves the objects intact as parts of a tuple, while joinflattens out the columns into a single namespace. (Which will cause problems in the above case because the column name "key" is repeated.)

如您所见,joinWith将对象作为元组的一部分保持完整,同时join将列展平为单个命名空间。(在上述情况下会导致问题,因为列名“key”重复了。)

Curiously enough, I have to use df.col("key")and df2.col("key")to create the conditions for joining dsand ds2-- if you use just col("key")on either side it does not work, and ds.col(...)doesn't exist. Using the original df.col("key")does the trick, however.

奇怪的是,我不得不使用df.col("key")df2.col("key")创造加盟条件dsds2-如果你只使用col("key")在任何一方不工作,并且ds.col(...)不存在。但是,使用原始版本df.col("key")可以解决问题。

回答by Raghuram Onti Srinivasan

回答by Syntax

In above example you can try below option -

在上面的示例中,您可以尝试以下选项 -

  • Define a case class for your output

    case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)

  • Join two Datasets with "Seq("key")", this will help you to avoid two duplicate key columns in the output. Which will help to apply the case class or fetch the data in next step

    ds.join(ds2, Seq("key")).as[JoinOutput] res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]

    scala> ds.join(ds2, Seq("key")).as[JoinOutput].show +---+-----+----+----+ |key|value|num1|num2| +---+-----+----+----+ | 1| asdf| 7.7| 101| | 2|34234| 1.2| 10| +---+-----+----+----+

  • 为您的输出定义一个案例类

    case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)

  • 使用 "Seq("key")" 连接两个数据集,这将帮助您避免输出中出现两个重复的键列。这将有助于在下一步中应用案例类或获取数据

    ds.join(ds2, Seq("key")).as[JoinOutput] res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]

    scala> ds.join(ds2, Seq("key")).as[JoinOutput].show +---+-----+----+----+ |key|value|num1|num2| +---+-----+----+----+ | 1| asdf| 7.7| 101| | 2|34234| 1.2| 10| +---+-----+----+----+