Python 使用 Apache Spark 将键值对减少为键列表对

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/27002161/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 01:17:13  来源:igfitidea点击:

Reduce a key-value pair into a key-list pair with Apache Spark

pythonapache-sparkmapreducepysparkrdd

提问by TravisJ

I am writing a Spark application and want to combine a set of Key-Value pairs (K, V1), (K, V2), ..., (K, Vn)into one Key-Multivalue pair (K, [V1, V2, ..., Vn]). I feel like I should be able to do this using the reduceByKeyfunction with something of the flavor:

我正在编写一个 Spark 应用程序,并希望将一组键值对组合(K, V1), (K, V2), ..., (K, Vn)成一个键多值对(K, [V1, V2, ..., Vn])。我觉得我应该能够使用reduceByKey具有某种风味的函数来做到这一点:

My_KMV = My_KV.reduce(lambda a, b: a.append([b]))

The error that I get when this occurs is:

发生这种情况时我得到的错误是:

'NoneType' object has no attribue 'append'.

'NoneType' 对象没有属性 'append'。

My keys are integers and values V1,...,Vn are tuples. My goal is to create a single pair with the key and a list of the values (tuples).

我的键是整数,值 V1,...,Vn 是元组。我的目标是使用键和值列表(元组)创建一对。

采纳答案by Christian Strempfer

Map and ReduceByKey

Map 和 ReduceByKey

Input type and output type of reducemust be the same, therefore if you want to aggregate a list, you have to mapthe input to lists. Afterwards you combine the lists into one list.

输入类型和输出类型reduce必须相同,因此如果你想聚合一个列表,你必须map将输入到列表。之后,您将这些列表合并为一个列表。

Combining lists

组合列表

You'll need a method to combine lists into one list. Python provides some methods to combine lists.

您需要一种将列表合并为一个列表的方法。Python 提供了一些组合列表的方法

appendmodifies the first list and will always return None.

append修改第一个列表并将始终返回None.

x = [1, 2, 3]
x.append([4, 5])
# x is [1, 2, 3, [4, 5]]

extenddoes the same, but unwraps lists:

extend做同样的事情,但解开列表:

x = [1, 2, 3]
x.extend([4, 5])
# x is [1, 2, 3, 4, 5]

Both methods return None, but you'll need a method that returns the combined list, therefore just use the plus sign.

两种方法都返回None,但您需要一个返回组合列表的方法,因此只需使用加号

x = [1, 2, 3] + [4, 5]
# x is [1, 2, 3, 4, 5]

Spark

火花

file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
         .map(lambda actor: (actor.split(",")[0], actor)) \ 

         # transform each value into a list
         .map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) \

         # combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]
         .reduceByKey(lambda a, b: a + b)


CombineByKey

组合键

It's also possible to solve this with combineByKey, which is used internally to implement reduceByKey, but it's more complex and "using one of the specialized per-key combiners in Spark can be much faster". Your use case is simple enough for the upper solution.

也可以使用 来解决这个问题combineByKey,它在内部用于实现reduceByKey,但它更复杂,并且“在 Spark 中使用专门的每键组合器之一可以更快”。您的用例对于上层解决方案来说足够简单。

GroupByKey

按关键字分组

It's also possible to solve this with groupByKey, but it reduces parallelizationand therefore could be much slower for big data sets.

也可以使用 解决这个问题groupByKey但它减少了并行化,因此对于大数据集可能会慢得多。

回答by Dave J

Ok. I hope, I got this right. Your input is something like this:

好的。我希望,我做对了。你的输入是这样的:

kv_input = [("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 5)]

and you want to get something like this:

你想得到这样的东西:

kmv_output = [("a", [1, 2, 3]), ("b", [1, 5])]

Then this might do the job (see here):

那么这可能会完成这项工作(请参阅此处):

d = dict()
for k, v in kv_input:
    d.setdefault(k, list()).append(v)
kmv_output = list(d.items())

If I got this wrong, please tell me, so I might adjust this to your needs.

如果我弄错了,请告诉我,以便我可以根据您的需要进行调整。

P.S.: a.append([b])returns always None. You might want to observe either [b]or abut not the result of append.

PS:a.append([b])总是返回None。您可能想观察[b]a不观察的结果append

回答by TravisJ

If you want to do a reduceByKey where the type in the reduced KV pairs is different than the type in the original KV pairs, then one can use the function combineByKey. What the function does is take KV pairs and combine them (by Key) into KC pairs where C is a different type than V.

如果你想做一个reduceByKey,其中减少的KV对中的类型与原始KV对中的类型不同,那么可以使用函数combineByKey。该函数的作用是获取 KV 对并将它们(通过 Key)组合成 KC 对,其中 C 是与 V 不同的类型。

One specifies 3 functions, createCombiner, mergeValue, mergeCombiners. The first specifies how to transform a type V into a type C, the second describes how to combine a type C with a type V, and the last specifies how to combine a type C with another type C. My code creates the K-V pairs:

一个指定了 3 个函数,createCombiner、mergeValue、mergeCombiners。第一个指定如何将类型 V 转换为类型 C,第二个描述如何将类型 C 与类型 V 组合,最后一个指定如何将类型 C 与另一个类型 C 组合。我的代码创建了 KV 对:

Define the 3 functions as follows:

定义 3 个函数如下:

def Combiner(a):    #Turns value a (a tuple) into a list of a single tuple.
    return [a]

def MergeValue(a, b): #a is the new type [(,), (,), ..., (,)] and b is the old type (,)
    a.extend([b])
    return a

def MergeCombiners(a, b): #a is the new type [(,),...,(,)] and so is b, combine them
    a.extend(b)
    return a

Then, My_KMV = My_KV.combineByKey(Combiner, MergeValue, MergeCombiners)

然后, My_KMV = My_KV.combineByKey(Combiner, MergeValue, MergeCombiners)

The best resource I found on using this function is: http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

我发现使用此功能的最佳资源是:http: //abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

As others have pointed out, a.append(b)or a.extend(b)return None. So the reduceByKey(lambda a, b: a.append(b))returns None on the first pair of KV pairs, then fails on the second pair because None.append(b) fails. You could work around this by defining a separate function:

正如其他人所指出的,a.append(b)或者a.extend(b)return None。所以在reduceByKey(lambda a, b: a.append(b))第一对 KV 对上返回 None ,然后在第二对上失败,因为 None.append(b) 失败。您可以通过定义一个单独的函数来解决这个问题:

 def My_Extend(a,b):
      a.extend(b)
      return a

Then call reduceByKey(lambda a, b: My_Extend(a,b))(The use of the lambda function here may be unnecessary, but I have not tested this case.)

然后调用reduceByKey(lambda a, b: My_Extend(a,b))(这里使用 lambda 函数可能是不必要的,但我没有测试过这种情况。)

回答by alreich

I'm kind of late to the conversation, but here's my suggestion:

我的谈话有点晚了,但这是我的建议:

>>> foo = sc.parallelize([(1, ('a','b')), (2, ('c','d')), (1, ('x','y'))])
>>> foo.map(lambda (x,y): (x, [y])).reduceByKey(lambda p,q: p+q).collect()
[(1, [('a', 'b'), ('x', 'y')]), (2, [('c', 'd')])]

回答by Thamme Gowda

I hit this page while looking for java example for the same problem. (If your case is similar, here is my example)

我在寻找同样问题的 java 示例时点击了这个页面。(如果你的情况类似,这是我的例子)

The trick is - You need to group for keys.

诀窍是 - 您需要对键进行分组。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class SparkMRExample {

    public static void main(String[] args) {
        // spark context initialisation
        SparkConf conf = new SparkConf()
                .setAppName("WordCount")
                .setMaster("local");
        JavaSparkContext context = new JavaSparkContext(conf);

        //input for testing;
        List<String> input = Arrays.asList("Lorem Ipsum is simply dummy text of the printing and typesetting industry.",
                "Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book.",
                "It has survived not only for centuries, but also the leap into electronic typesetting, remaining essentially unchanged.",
                "It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing");
        JavaRDD<String> inputRDD = context.parallelize(input);


        // the map phase of word count example
        JavaPairRDD<String, Integer> mappedRDD =
                inputRDD.flatMapToPair( line ->                      // for this input, each string is a line
                        Arrays.stream(line.split("\s+"))            // splitting into words, converting into stream
                                .map(word -> new Tuple2<>(word, 1))  // each word is assigned with count 1
                                .collect(Collectors.toList()));      // stream to iterable

        // group the tuples by key
        // (String,Integer) -> (String, Iterable<Integer>)
        JavaPairRDD<String, Iterable<Integer>> groupedRDD = mappedRDD.groupByKey();

        // the reduce phase of word count example
        //(String, Iterable<Integer>) -> (String,Integer)
        JavaRDD<Tuple2<String, Integer>> resultRDD =
                groupedRDD.map(group ->                                      //input is a tuple (String, Iterable<Integer>)
                        new Tuple2<>(group._1,                              // the output key is same as input key
                        StreamSupport.stream(group._2.spliterator(), true)  // converting to stream
                                .reduce(0, (f, s) -> f + s)));              // the sum of counts
        //collecting the RRD so that we can print
        List<Tuple2<String, Integer>> result = resultRDD.collect();
        // print each tuple
        result.forEach(System.out::println);
    }
}

回答by Marius Ion

You can use the RDD groupByKeymethod.

您可以使用 RDD groupByKey方法。

Input:

输入:

data = [(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')]
rdd = sc.parallelize(data)
result = rdd.groupByKey().collect()

Output:

输出:

[(1, ['a', 'b']), (2, ['c', 'd', 'e']), (3, ['f'])]

回答by Seung-Hwan Lim

The error message stems from the type for 'a' in your closure.

错误消息源于闭包中 'a' 的类型。

 My_KMV = My_KV.reduce(lambda a, b: a.append([b]))

Let pySpark explicitly evaluate a as a list. For instance,

让 pySpark 将 a 显式评估为列表。例如,

My_KMV = My_KV.reduceByKey(lambda a,b:[a].extend([b]))

In many cases, reduceByKey will be preferable to groupByKey, refer to: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

在很多情况下,reduceByKey 会比 groupByKey 更可取,参考:http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

回答by krishna rachur

I tried with combineByKey ,here are my steps

我尝试使用 combineByKey ,这是我的步骤

combineddatardd=sc.parallelize([("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)])

combineddatardd.combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y).collect()

Output:

输出:

[('A', [3, 9, 12]), ('B', [4, 10, 11])]
  1. Define a function for combiner which sets accumulator to first key value pair which it encounters inside the partition convert the value to list in this step

  2. Define a function which mergers the new value of the same key to the accumulator value captured in step 1 Note:-convert the value to list in this function as accumulator value was converted to list in first step

  3. Define function to merge combiners outputs of individual partitions.

  1. 为组合器定义一个函数,它将累加器设置为它在分区内遇到的第一个键值对,在此步骤中将值转换为列表

  2. 定义一个函数,将相同键的新值合并到步骤 1 中捕获的累加器值 注意:-将值转换为该函数中的列表,因为累加器值在第一步中被转换为列表

  3. 定义函数以合并单个分区的组合器输出。

回答by zero323

tl;drIf you really require operation like this use groupByKeyas suggestedby @MariusIon. Every other solution proposed here is either bluntly inefficient are at least suboptimal compared to direct grouping.

tl;dr如果你真的需要像@MariusIongroupByKey建议的那样使用这样操作。与直接分组相比,这里提出的所有其他解决方案要么效率低下,要么至少次优。

reduceByKeywith list concatenation is not an acceptable solution because:

reduceByKeywith list concatenation 不是一个可接受的解决方案,因为:

  • Requires initialization of O(N)lists.
  • Each application of +to a pair of lists requires full copy of both lists (O(N)) effectively increasing overall complexity to O(N2).
  • Doesn't address any of the problems introduced by groupByKey. Amount of data that has to be shuffled as well as the size of the final structure are the same.
  • Unlike suggested by one of the answersthere is no difference in a level of parallelism between implementation using reduceByKeyand groupByKey.
  • 需要初始化O(N)列表。
  • +对一对列表的每个应用都需要两个列表的完整副本(O(N)),从而有效地将整体复杂度增加到O(N 2)
  • 不解决由groupByKey. 必须打乱的数据量以及最终结构的大小是相同的。
  • 其中一个答案所建议的不同,使用reduceByKey和 的实现之间的并行级别没有差异groupByKey

combineByKeywith list.extendis a suboptimal solution because:

combineByKeywithlist.extend是次优解决方案,因为:

  • Creates O(N)list objects in MergeValue(this could be optimized by using list.appenddirectly on the new item).
  • If optimized with list.appendit is exactly equivalent to an old (Spark <= 1.3) implementation of a groupByKeyand ignores all the optimizations introduced by SPARK-3074 which enables external (on-disk) grouping of the larger-than-memory structures.
  • 在中创建O(N)列表对象MergeValue(这可以通过list.append直接在新项目上使用来优化)。
  • 如果使用list.append它进行优化,则完全等同于 a 的旧(Spark <= 1.3)实现,groupByKey并忽略 SPARK-3074 引入的所有优化,这些优化支持对大于内存的结构进行外部(磁盘上)分组。