scala 如何在多列上加入数据集?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37851606/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:23:14  来源:igfitidea点击:

How to join Datasets on multiple columns?

scalaapache-sparkapache-spark-sql

提问by d80tb7

Given two Spark Datasets, A and B I can do a join on single column as follows:

给定两个 Spark Datasets,A 和 BI 可以对单列进行连接,如下所示:

a.joinWith(b, $"a.col" === $"b.col", "left")

My question is whether you can do a join using multiple columns. Essentially the equivalent of the following DataFramesapi code:

我的问题是您是否可以使用多列进行连接。本质上相当于以下DataFramesapi 代码:

a.join(b, a("col") === b("col") && a("col2") === b("col2"), "left")

采纳答案by zero323

You can do it exactly the same way as with Dataframe:

您可以按照与使用完全相同的方式进行操作Dataframe

val xs = Seq(("a", "foo", 2.0), ("x", "bar", -1.0)).toDS
val ys = Seq(("a", "foo", 2.0), ("y", "bar", 1.0)).toDS

xs.joinWith(ys, xs("_1") === ys("_1") && xs("_2") === ys("_2"), "left").show
// +------------+-----------+
// |          _1|         _2|
// +------------+-----------+
// | [a,foo,2.0]|[a,foo,2.0]|
// |[x,bar,-1.0]|       null|
// +------------+-----------+

In Spark < 2.0.0 you can use something like this:

在 Spark < 2.0.0 中,您可以使用以下内容:

xs.as("xs").joinWith(
  ys.as("ys"), ($"xs._1" === $"ys._1") && ($"xs._2" === $"ys._2"), "left")

回答by Jacek Laskowski

There's another way of joining by chaining whereone after another. You first specify a join (and optionally its type) followed by whereoperator(s), i.e.

还有另一种通过链接where一个接一个地加入的方式。您首先指定一个连接(和可选的类型),然后是where操作符,即

scala> case class A(id: Long, name: String)
defined class A

scala> case class B(id: Long, name: String)
defined class B

scala> val as = Seq(A(0, "zero"), A(1, "one")).toDS
as: org.apache.spark.sql.Dataset[A] = [id: bigint, name: string]

scala> val bs = Seq(B(0, "zero"), B(1, "jeden")).toDS
bs: org.apache.spark.sql.Dataset[B] = [id: bigint, name: string]

scala> as.join(bs).where(as("id") === bs("id")).show
+---+----+---+-----+
| id|name| id| name|
+---+----+---+-----+
|  0|zero|  0| zero|
|  1| one|  1|jeden|
+---+----+---+-----+


scala> as.join(bs).where(as("id") === bs("id")).where(as("name") === bs("name")).show
+---+----+---+----+
| id|name| id|name|
+---+----+---+----+
|  0|zero|  0|zero|
+---+----+---+----+

The reason for such a goodie is that the Spark optimizer will join (no pun intended) consecutive wheres into one with join. Use explainoperator to see the underlying logical and physical plans.

如此好用的原因是 Spark 优化器会将(无双关语)连续的wheres 与join. 使用explain运算符查看底层逻辑和物理计划。

scala> as.join(bs).where(as("id") === bs("id")).where(as("name") === bs("name")).explain(extended = true)
== Parsed Logical Plan ==
Filter (name#31 = name#36)
+- Filter (id#30L = id#35L)
   +- Join Inner
      :- LocalRelation [id#30L, name#31]
      +- LocalRelation [id#35L, name#36]

== Analyzed Logical Plan ==
id: bigint, name: string, id: bigint, name: string
Filter (name#31 = name#36)
+- Filter (id#30L = id#35L)
   +- Join Inner
      :- LocalRelation [id#30L, name#31]
      +- LocalRelation [id#35L, name#36]

== Optimized Logical Plan ==
Join Inner, ((name#31 = name#36) && (id#30L = id#35L))
:- Filter isnotnull(name#31)
:  +- LocalRelation [id#30L, name#31]
+- Filter isnotnull(name#36)
   +- LocalRelation [id#35L, name#36]

== Physical Plan ==
*BroadcastHashJoin [name#31, id#30L], [name#36, id#35L], Inner, BuildRight
:- *Filter isnotnull(name#31)
:  +- LocalTableScan [id#30L, name#31]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false], input[0, bigint, false]))
   +- *Filter isnotnull(name#36)
      +- LocalTableScan [id#35L, name#36]

回答by ForeverLearner

In Java, the &&operator does not work. The correct way to join based on multiple columns in Spark-Java is as below:

在 Java 中,&&运算符不起作用。Spark-Java中基于多列join的正确方法如下:

            Dataset<Row> datasetRf1 = joinedWithDays.join(
                    datasetFreq, 
                    datasetFreq.col("userId").equalTo(joinedWithDays.col("userId"))
                    .and(datasetFreq.col("artistId").equalTo(joinedWithDays.col("artistId"))),
                            "inner"
                    );

The andfunction works like the &&operator.

and函数的工作方式类似于&&运算符。