使用 Python 计算 Spark 中 Pairwise (K,V) RDD 中每个 KEY 的平均值
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29930110/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Calculating the averages for each KEY in a Pairwise (K,V) RDD in Spark with Python
提问by NYCeyes
I want to share this particular Apache Spark with Python solution because documentation for it is quite poor.
我想与 Python 解决方案共享这个特殊的 Apache Spark,因为它的文档非常糟糕。
I wanted to calculate the average value of K/V pairs (stored in a Pairwise RDD), by KEY. Here is what the sample data looks like:
我想通过 KEY 计算 K/V 对的平均值(存储在 Pairwise RDD 中)。示例数据如下所示:
>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]
Now the following code sequence is a less than optimalway to do it, but it does work. It is what I was doing before I figured out a better solution. It's not terrible but -- as you'll see in the answer section -- there is a more concise, efficient way.
现在,以下代码序列不是最佳方法,但确实有效。这是我在想出更好的解决方案之前所做的。这并不可怕,但是 - 正如您将在答案部分看到的那样 - 有一种更简洁、更有效的方法。
>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
[(u'2013-10-09', 11.235365503035176),
(u'2013-10-07', 23.39500642456595),
... snip ...
]
采纳答案by NYCeyes
Now a much better way to do this is to use the rdd.aggregateByKey()
method. Because that method is so poorly documented in the Apache Spark with Python documentation -- and is why I wrote this Q&A-- until recently I had been using the above code sequence. But again, it's less efficient, so avoiddoing it that way unless necessary.
现在更好的方法是使用该rdd.aggregateByKey()
方法。因为该方法在带有 Python 文档的 Apache Spark 中的记录非常少——这也是我编写此问答的原因——直到最近我一直在使用上述代码序列。但同样,它的效率较低,因此除非必要,否则请避免这样做。
Here's how to do the same using the rdd.aggregateByKey()
method (recommended) ...
以下是使用rdd.aggregateByKey()
方法(推荐)执行相同操作的方法...
By KEY, simultaneously calculate the SUM (the numerator for the average that we want to compute), and COUNT (the denominator for the average that we want to compute):
通过 KEY,同时计算 SUM(我们要计算的平均值的分子)和 COUNT(我们要计算的平均值的分母):
>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function.
>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1),
lambda a,b: (a[0] + b[0], a[1] + b[1]))
Where the following is true about the meaning of each a
and b
pair above (so you can visualize what's happening):
以下关于上述 eacha
和b
pair的含义是正确的(因此您可以想象正在发生的事情):
First lambda expression for Within-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a SCALAR that holds the next Value
Second lambda expression for Cross-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).
Finally, calculate the average for each KEY, and collect results.
最后,计算每个KEY的平均值,并收集结果。
>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
[(u'2013-09-09', 11.235365503035176),
(u'2013-09-01', 23.39500642456595),
(u'2013-09-03', 13.53240060820617),
(u'2013-09-05', 13.141148418977687),
... snip ...
]
I hope this question and answer with aggregateByKey()
will help.
我希望这个问题和答案aggregateByKey()
会有所帮助。
回答by pat
To my mind a more readable equivalent to an aggregateByKey with two lambdas is:
在我看来,一个更易读的等价于带有两个 lambda 的 aggregateByKey 是:
rdd1 = rdd1 \
.mapValues(lambda v: (v, 1)) \
.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))
In this way the whole average calculation would be:
这样,整个平均计算将是:
avg_by_key = rdd1 \
.mapValues(lambda v: (v, 1)) \
.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
.mapValues(lambda v: v[0]/v[1]) \
.collectAsMap()
回答by arun
Just adding a note about an intuitive and shorter (but a bad) solution to this problem. The book Sam's Teach Yourself Apache Spark in 24 Hourshas explained this problem well in the last chapter.
只需添加有关此问题的直观且更短(但很糟糕)的解决方案的注释。Sam's Teach Yourself Apache Spark in 24 Hours一书在上一章中很好地解释了这个问题。
Using groupByKey
one can solve the problem easily like this:
使用groupByKey
一个可以像这样轻松解决问题:
rdd = sc.parallelize([
(u'2013-10-09', 10),
(u'2013-10-09', 10),
(u'2013-10-09', 13),
(u'2013-10-10', 40),
(u'2013-10-10', 45),
(u'2013-10-10', 50)
])
rdd \
.groupByKey() \
.mapValues(lambda x: sum(x) / len(x)) \
.collect()
Output:
输出:
[('2013-10-10', 45.0), ('2013-10-09', 11.0)]
This is intuitive and appealing, but don't use it! groupByKey
does not do any combining on the mappers and brings all the individual key value pairs to the reducer.
这是直观和吸引人的,但不要使用它!groupByKey
不对映射器进行任何组合,并将所有单独的键值对带到化简器。
Avoid groupByKey
as much as possible. Go with the reduceByKey
solution like @pat's.
尽量避免groupByKey
。使用reduceByKey
@pat 之类的解决方案。
回答by Sandeep Giri
A slight enhancement to the answer of prismalytics.io.
对prismalytics.io 的回答略有改进。
There could be a case where computing the sum might overflow number because we are summing huge number of values. We could instead keep the average values and keep computing the average from the average and counts of two parts getting reduced.
可能存在计算总和可能会溢出数字的情况,因为我们正在对大量值求和。我们可以改为保留平均值并继续根据平均值计算平均值,并减少两个部分的数量。
If you have two parts having average and counts as (a1, c1) and (a2, c2), the overall average is: total/counts = (total1 + total2)/ (count1 + counts2) = (a1*c1 + a2*c2)/(c1+c2)
如果您有两部分具有平均值并计为 (a1, c1) 和 (a2, c2),则总体平均值为:total/counts = (total1 + total2)/ (count1 + counts2) = (a1*c1 + a2* c2)/(c1+c2)
If we mark R = c2/c1, It can be re-written further as a1/(1+R) + a2*R/(1+R) If we further mark Ri as 1/(1+R), we can write it as a1*Ri + a2*R*Ri
如果我们标记 R = c2/c1, 可以进一步改写为 a1/(1+R) + a2*R/(1+R) 如果我们进一步将 Ri 标记为 1/(1+R),我们可以写成 a1*Ri + a2*R*Ri
myrdd = sc.parallelize([1.1, 2.4, 5, 6.0, 2, 3, 7, 9, 11, 13, 10])
sumcount_rdd = myrdd.map(lambda n : (n, 1))
def avg(A, B):
R = 1.0*B[1]/A[1]
Ri = 1.0/(1+R);
av = A[0]*Ri + B[0]*R*Ri
return (av, B[1] + A[1]);
(av, counts) = sumcount_rdd.reduce(avg)
print(av)
This approach can be converted for key-value by simply using mapValues instead of map and reduceByKey instead of reduce.
这种方法可以通过简单地使用mapValues代替map和reduceByKey代替reduce来转换为键值。
This is from: https://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2
这是来自:https: //www.knowbigdata.com/blog/interview-questions-apache-spark-part-2