你如何使用 Python 在 Spark 中执行两个 RDD 表的基本连接?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31257077/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 09:42:51  来源:igfitidea点击:

How do you perform basic joins of two RDD tables in Spark using Python?

pythonjoinapache-sparkpysparkrdd

提问by invoketheshell

How would you perform basic joins in Spark using python? In R you could use merg() to do this. What is the syntax using python on spark for:

您将如何使用 python 在 Spark 中执行基本连接?在 R 中,您可以使用 merg() 来执行此操作。在 spark 上使用 python 的语法是什么:

  1. Inner Join
  2. Left Outer Join
  3. Cross Join
  1. 内部联接
  2. 左外连接
  3. 交叉连接

With two tables (RDD) with a single column in each that has a common key.

有两个表 (RDD),每个表中都有一个具有公共键的列。

RDD(1):(key,U)
RDD(2):(key,V)

I think an inner join is something like this:

我认为内部联接是这样的:

rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));

Is that right? I have searched the internet and can't find a good example of joins. Thanks in advance.

那正确吗?我在互联网上搜索过,但找不到一个很好的连接示例。提前致谢。

采纳答案by zero323

It can be done either using PairRDDFunctionsor Spark Data Frames. Since data frame operations benefit from Catalyst Optimizerthe second option is worth considering.

它可以使用PairRDDFunctions或 Spark 数据帧来完成。由于数据帧操作受益于Catalyst Optimizer,因此第二个选项值得考虑。

Assuming your data looks as follows:

假设您的数据如下所示:

rdd1 =  sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 =  sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])

With PairRDDs:

使用 PairRDD:

Inner join:

内部联接:

rdd1.join(rdd2)

Left outer join:

左外连接:

rdd1.leftOuterJoin(rdd2)

Cartesian product (doesn't require RDD[(T, U)]):

笛卡尔积(不需要RDD[(T, U)]):

rdd1.cartesian(rdd2)

Broadcast join (doesn't require RDD[(T, U)]):

广播加入(不需要RDD[(T, U)]):

Finally there is cogroupwhich has no direct SQL equivalent but can be useful in some situations:

最后,cogroup它没有直接的 SQL 等价物,但在某些情况下很有用:

cogrouped = rdd1.cogroup(rdd2)

cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]

With Spark Data Frames

使用 Spark 数据帧

You can use either SQL DSL or execute raw SQL using sqlContext.sql.

您可以使用 SQL DSL 或使用sqlContext.sql.

df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))

# Register temporary tables to be able to use `sparkSession.sql`
df1.createOrReplaceTempView('df1')
df2.createOrReplaceTempView('df2')

Inner join:

内部联接:

# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, how='inner') 
spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')

Left outer join:

左外连接:

df1.join(df2, df1.k == df2.k, how='left_outer')
spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')

Cross join (explicit cross join or configuration changes are required in Spark. 2.0 - spark.sql.crossJoin.enabled for Spark 2.x):

交叉联接(Spark.2.0中需要显式交叉联接或配置更改 - spark.sql.crossJoin.enabled for Spark 2.x):

df1.crossJoin(df2)
spark.sql('SELECT * FROM df1 CROSS JOIN df2')

df1.join(df2)
sqlContext.sql('SELECT * FROM df JOIN df2')
df1.join(df2)
sqlContext.sql('SELECT * FROM df JOIN df2')

Since 1.6 (1.5 in Scala) each of these can be combined with broadcastfunction:

从 1.6(Scala 中的 1.5)开始,每一个都可以与broadcast函数结合使用:

from pyspark.sql.functions import broadcast

df1.join(broadcast(df2), df1.k == df2.k)

to perform broadcast join. See also Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark

执行广播加入。另请参阅为什么我的 BroadcastHashJoin 比 Spark 中的 ShuffledHashJoin 慢