Python 如何加入 Pyspark 中的多个列?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/33745964/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to join on multiple columns in Pyspark?
提问by user3803714
I am using Spark 1.3 and would like to join on multiple columns using python interface (SparkSQL)
我正在使用 Spark 1.3 并想使用 python 接口(SparkSQL)加入多个列
The following works:
以下工作:
I first register them as temp tables.
我首先将它们注册为临时表。
numeric.registerTempTable("numeric")
Ref.registerTempTable("Ref")
test = numeric.join(Ref, numeric.ID == Ref.ID, joinType='inner')
I would now like to join them based on multiple columns.
我现在想根据多个列加入它们。
I get SyntaxError
: invalid syntax with this:
我得到SyntaxError
:无效的语法:
test = numeric.join(Ref,
numeric.ID == Ref.ID AND numeric.TYPE == Ref.TYPE AND
numeric.STATUS == Ref.STATUS , joinType='inner')
采纳答案by zero323
You should use &
/ |
operators and be careful about operator precedence(==
has lower precedence than bitwise AND
and OR
):
您应该使用&
/|
运算符并注意运算符优先级(==
优先级低于按位AND
和OR
):
df1 = sqlContext.createDataFrame(
[(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
("x1", "x2", "x3"))
df2 = sqlContext.createDataFrame(
[(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x3"))
df = df1.join(df2, (df1.x1 == df2.x1) & (df1.x2 == df2.x2))
df.show()
## +---+---+---+---+---+---+
## | x1| x2| x3| x1| x2| x3|
## +---+---+---+---+---+---+
## | 2| b|3.0| 2| b|0.0|
## +---+---+---+---+---+---+
回答by Florian
An alternative approach would be:
另一种方法是:
df1 = sqlContext.createDataFrame(
[(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
("x1", "x2", "x3"))
df2 = sqlContext.createDataFrame(
[(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x4"))
df = df1.join(df2, ['x1','x2'])
df.show()
which outputs:
输出:
+---+---+---+---+
| x1| x2| x3| x4|
+---+---+---+---+
| 2| b|3.0|0.0|
+---+---+---+---+
With the main advantage being that the columns on which the tables are joined are not duplicated in the output, reducing the risk of encountering errors such as org.apache.spark.sql.AnalysisException: Reference 'x1' is ambiguous, could be: x1#50L, x1#57L.
主要优点是表所连接的列在输出中不会重复,从而降低了遇到错误的风险,例如org.apache.spark.sql.AnalysisException: Reference 'x1' is ambiguous, could be: x1#50L, x1#57L.
Whenever the columns in the two tables have different names, (let's say in the example above, df2
has the columns y1
, y2
and y4
), you could use the following syntax:
每当两个表中的列具有不同的名称,(让我们在例如上面说的,df2
有柱y1
,y2
和y4
),可以使用下面的语法:
df = df1.join(df2.withColumnRenamed('y1','x1').withColumnRenamed('y2','x2'), ['x1','x2'])