Python 多个 RDD 的 Spark 联合

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33743978/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 13:53:33  来源:igfitidea点击:

Spark union of multiple RDDs

pythonapache-sparkpysparkrdd

提问by user3803714

In my pig code I do this:

在我的猪代码中,我这样做:

all_combined = Union relation1, relation2, 
    relation3, relation4, relation5, relation 6.

I want to do the same with spark. However, unfortunately, I see that I have to keep doing it pairwise:

我想用火花做同样的事情。但是,不幸的是,我发现我必须成对地做:

first = rdd1.union(rdd2)
second = first.union(rdd3)
third = second.union(rdd4)
# .... and so on

Is there a union operator that will let me operate on multiple rdds at a time:

是否有一个联合运算符可以让我一次对多个 rdd 进行操作:

e.g. union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)

例如 union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)

It is a matter on convenience.

这是一个方便的问题。

采纳答案by zero323

If these are RDDs you can use SparkContext.unionmethod:

如果这些是 RDD,您可以使用SparkContext.union方法:

rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
rdd3 = sc.parallelize([7, 8, 9])

rdd = sc.union([rdd1, rdd2, rdd3])
rdd.collect()

## [1, 2, 3, 4, 5, 6, 7, 8, 9]

There is no DataFrameequivalent but it is just a matter of a simple one-liner:

没有DataFrame等价物,但它只是一个简单的单行问题:

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

df1 = sqlContext.createDataFrame([(1, "foo1"), (2, "bar1")], ("k", "v"))
df2 = sqlContext.createDataFrame([(3, "foo2"), (4, "bar2")], ("k", "v"))
df3 = sqlContext.createDataFrame([(5, "foo3"), (6, "bar3")], ("k", "v"))

unionAll(df1, df2, df3).show()

## +---+----+
## |  k|   v|
## +---+----+
## |  1|foo1|
## |  2|bar1|
## |  3|foo2|
## |  4|bar2|
## |  5|foo3|
## |  6|bar3|
## +---+----+

If number of DataFramesis large using SparkContext.unionon RDDs and recreating DataFramemay be a better choice to avoid issues related to the cost of preparing an execution plan:

如果在 RDD 上DataFrames使用的数量很大SparkContext.union,则重新创建DataFrame可能是更好的选择,以避免与准备执行计划的成本相关的问题

def unionAll(*dfs):
    first, *_ = dfs  # Python 3.x, for 2.x you'll have to unpack manually
    return first.sql_ctx.createDataFrame(
        first.sql_ctx._sc.union([df.rdd for df in dfs]),
        first.schema
    )

回答by Nhor

Unfortunately it's the only way to UNIONtables in Spark. However instead of

不幸的是,这是UNION在 Spark 中创建表格的唯一方法。然而,而不是

first = rdd1.union(rdd2)
second = first.union(rdd3)
third = second.union(rdd4)
...

you can perform it in a little bit cleaner way like this:

您可以像这样以更简洁的方式执行它:

result = rdd1.union(rdd2).union(rdd3).union(rdd4)

回答by Ryuuk

You can also use addition for UNION between RDDs

您还可以在 RDD 之间对 UNION 使用加法

rdd = sc.parallelize([1, 1, 2, 3])
(rdd + rdd).collect()
## [1, 1, 2, 3, 1, 1, 2, 3]