Python 如何加入 Pyspark 中的多个列?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33745964/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 13:53:48  来源:igfitidea点击:

How to join on multiple columns in Pyspark?

pythonapache-sparkjoinpysparkapache-spark-sql

提问by user3803714

I am using Spark 1.3 and would like to join on multiple columns using python interface (SparkSQL)

我正在使用 Spark 1.3 并想使用 python 接口(SparkSQL)加入多个列

The following works:

以下工作:

I first register them as temp tables.

我首先将它们注册为临时表。

numeric.registerTempTable("numeric")
Ref.registerTempTable("Ref")

test  = numeric.join(Ref, numeric.ID == Ref.ID, joinType='inner')

I would now like to join them based on multiple columns.

我现在想根据多个列加入它们。

I get SyntaxError: invalid syntax with this:

我得到SyntaxError:无效的语法:

test  = numeric.join(Ref,
   numeric.ID == Ref.ID AND numeric.TYPE == Ref.TYPE AND
   numeric.STATUS == Ref.STATUS ,  joinType='inner')

采纳答案by zero323

You should use &/ |operators and be careful about operator precedence(==has lower precedence than bitwise ANDand OR):

您应该使用&/|运算符并注意运算符优先级==优先级低于按位ANDOR):

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x3"))

df = df1.join(df2, (df1.x1 == df2.x1) & (df1.x2 == df2.x2))
df.show()

## +---+---+---+---+---+---+
## | x1| x2| x3| x1| x2| x3|
## +---+---+---+---+---+---+
## |  2|  b|3.0|  2|  b|0.0|
## +---+---+---+---+---+---+

回答by Florian

An alternative approach would be:

另一种方法是:

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x4"))

df = df1.join(df2, ['x1','x2'])
df.show()

which outputs:

输出:

+---+---+---+---+
| x1| x2| x3| x4|
+---+---+---+---+
|  2|  b|3.0|0.0|
+---+---+---+---+

With the main advantage being that the columns on which the tables are joined are not duplicated in the output, reducing the risk of encountering errors such as org.apache.spark.sql.AnalysisException: Reference 'x1' is ambiguous, could be: x1#50L, x1#57L.

主要优点是表所连接的列在输出不会重复,从而降低了遇到错误的风险,例如org.apache.spark.sql.AnalysisException: Reference 'x1' is ambiguous, could be: x1#50L, x1#57L.



Whenever the columns in the two tables have different names, (let's say in the example above, df2has the columns y1, y2and y4), you could use the following syntax:

每当两个表中的列具有不同的名称,(让我们在例如上面说的,df2有柱y1y2y4),可以使用下面的语法:

df = df1.join(df2.withColumnRenamed('y1','x1').withColumnRenamed('y2','x2'), ['x1','x2'])