你如何使用 Python 在 Spark 中执行两个 RDD 表的基本连接?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/31257077/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How do you perform basic joins of two RDD tables in Spark using Python?
提问by invoketheshell
How would you perform basic joins in Spark using python? In R you could use merg() to do this. What is the syntax using python on spark for:
您将如何使用 python 在 Spark 中执行基本连接?在 R 中,您可以使用 merg() 来执行此操作。在 spark 上使用 python 的语法是什么:
- Inner Join
- Left Outer Join
- Cross Join
- 内部联接
- 左外连接
- 交叉连接
With two tables (RDD) with a single column in each that has a common key.
有两个表 (RDD),每个表中都有一个具有公共键的列。
RDD(1):(key,U)
RDD(2):(key,V)
I think an inner join is something like this:
我认为内部联接是这样的:
rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));
Is that right? I have searched the internet and can't find a good example of joins. Thanks in advance.
那正确吗?我在互联网上搜索过,但找不到一个很好的连接示例。提前致谢。
采纳答案by zero323
It can be done either using PairRDDFunctions
or Spark Data Frames. Since data frame operations benefit from Catalyst Optimizerthe second option is worth considering.
它可以使用PairRDDFunctions
或 Spark 数据帧来完成。由于数据帧操作受益于Catalyst Optimizer,因此第二个选项值得考虑。
Assuming your data looks as follows:
假设您的数据如下所示:
rdd1 = sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 = sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])
With PairRDDs:
使用 PairRDD:
Inner join:
内部联接:
rdd1.join(rdd2)
Left outer join:
左外连接:
rdd1.leftOuterJoin(rdd2)
Cartesian product (doesn't require RDD[(T, U)]
):
笛卡尔积(不需要RDD[(T, U)]
):
rdd1.cartesian(rdd2)
Broadcast join (doesn't require RDD[(T, U)]
):
广播加入(不需要RDD[(T, U)]
):
Finally there is cogroup
which has no direct SQL equivalent but can be useful in some situations:
最后,cogroup
它没有直接的 SQL 等价物,但在某些情况下很有用:
cogrouped = rdd1.cogroup(rdd2)
cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]
With Spark Data Frames
使用 Spark 数据帧
You can use either SQL DSL or execute raw SQL using sqlContext.sql
.
您可以使用 SQL DSL 或使用sqlContext.sql
.
df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))
# Register temporary tables to be able to use `sparkSession.sql`
df1.createOrReplaceTempView('df1')
df2.createOrReplaceTempView('df2')
Inner join:
内部联接:
# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, how='inner')
spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')
Left outer join:
左外连接:
df1.join(df2, df1.k == df2.k, how='left_outer')
spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')
Cross join (explicit cross join or configuration changes are required in Spark. 2.0 - spark.sql.crossJoin.enabled for Spark 2.x):
交叉联接(Spark.2.0中需要显式交叉联接或配置更改 - spark.sql.crossJoin.enabled for Spark 2.x):
df1.crossJoin(df2)
spark.sql('SELECT * FROM df1 CROSS JOIN df2')
df1.join(df2)
sqlContext.sql('SELECT * FROM df JOIN df2')
df1.join(df2)
sqlContext.sql('SELECT * FROM df JOIN df2')
Since 1.6 (1.5 in Scala) each of these can be combined with broadcast
function:
从 1.6(Scala 中的 1.5)开始,每一个都可以与broadcast
函数结合使用:
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), df1.k == df2.k)
to perform broadcast join. See also Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark
执行广播加入。另请参阅为什么我的 BroadcastHashJoin 比 Spark 中的 ShuffledHashJoin 慢