Python PySpark reduceByKey?添加键/元组

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29833576/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 05:04:23  来源:igfitidea点击:

PySpark reduceByKey? to add Key/Tuple

pythonapache-sparkpyspark

提问by theMadKing

I have the following data and what I want to do is

我有以下数据,我想做的是

[(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]

Is for each key count the instances of the value (a 1 string character). So I first did a map:

是为每个键计数值的实例(1 个字符串字符)。所以我先做了一张地图:

.map(lambda x: (x[0], [x[1], 1]))

Making it now a key/tuple of:

现在使它成为以下项的键/元组:

[(13, ['D', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['T', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['T', 1]), (53, ['2', 1]), (54, ['0', 1]), (13, ['A', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['A', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['X', 1])]

I just cant for the last part figure out how to for each key count the instances of that letter. For instance Key 13 will have 1 D and 1 A. While 14 will have 2 T's, etc.

我只是无法在最后一部分弄清楚如何为每个键计算该字母的实例。例如,键 13 将有 1 个 D 和 1 个 A。而 14 将有 2 个 T,依此类推。

采纳答案by ohruunuruus

I'm much more familiar with Spark in Scala, so there may be better ways than Counterto count the characters in the iterable produced by groupByKey, but here's an option:

我更熟悉 Scala 中的 Spark,因此可能有比Counter计算由 生成的迭代中的字符更好的方法groupByKey,但这里有一个选项:

from collections import Counter

rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.groupByKey().mapValues(Counter).collect()

[(48, Counter({'0': 2})),
 (32, Counter({'6': 2})),
 (49, Counter({'2': 2})),
 (50, Counter({'0': 2})),
 (51, Counter({'X': 1, 'T': 1})),
 (53, Counter({'2': 1})),
 (13, Counter({'A': 1, 'D': 1})),
 (45, Counter({'A': 1, 'T': 1})),
 (14, Counter({'T': 2})),
 (54, Counter({'0': 1})),
 (47, Counter({'2': 2}))]

回答by Nikita

If i understood you right, you can do it in one operation combineByKey:

如果我理解正确,您可以在一个操作combineByKey 中完成

from collections import Counter
x = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
result = x.combineByKey(lambda value:  {value: 1}, 
...                     lambda x, value:  value.get(x,0) + 1,
...                     lambda x, y: dict(Counter(x) + Counter(y)))
result.collect()
[(32, {'6': 2}), (48, {'0': 2}), (49, {'2': 2}), (53, {'2': 1}), (13, {'A': 1, 'D': 1}), (45, {'A': 1, 'T': 1}), (50, {'0': 2}), (54, {'0': 1}), (14, {'T': 2}), (51, {'X': 1, 'T': 1}), (47, {'2': 2})]

回答by srctaha

Instead of:

代替:

.map(lambda x: (x[0], [x[1], 1]))

We could do this:

我们可以这样做:

.map(lambda x: ((x[0], x[1]), 1))

And in the last step, we could use reduceByKeyand add. Note that add comes from the operatorpackage.

在最后一步,我们可以使用reduceByKeyadd。请注意, add 来自operator包。

Putting it together:

把它放在一起:

from operator import add
rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add).collect()