scala 如何使用 Spark 计算累积总和

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35154267/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:59:46  来源:igfitidea点击:

How to compute cumulative sum using Spark

scalaapache-spark

提问by Knight71

I have an rdd of (String,Int) which is sorted by key

我有一个按键排序的 (String,Int) rdd

val data = Array(("c1",6), ("c2",3),("c3",4))
val rdd = sc.parallelize(data).sortByKey

Now I want to start the value for the first key with zero and the subsequent keys as sum of the previous keys.

现在我想用零开始第一个键的值,随后的键作为先前键的总和。

Eg: c1 = 0 , c2 = c1's value , c3 = (c1 value +c2 value) , c4 = (c1+..+c3 value) expected output:

例如:c1 = 0,c2 = c1 的值,c3 = (c1 value +c2 value),c4 = (c1+..+c3 value) 预期输出:

(c1,0), (c2,6), (c3,9)...

Is it possible to achieve this ? I tried it with map but the sum is not preserved inside the map.

有可能实现这一目标吗?我用地图尝试过,但总和没有保留在地图内。

var sum = 0 ;
val t = keycount.map{ x => { val temp = sum; sum = sum + x._2 ; (x._1,temp); }}

回答by zero323

  1. Compute partial results for each partition:

    val partials = rdd.mapPartitionsWithIndex((i, iter) => {
      val (keys, values) = iter.toSeq.unzip
      val sums  = values.scanLeft(0)(_ + _)
      Iterator((keys.zip(sums.tail), sums.last))
    })
    
  2. Collect partials sums

    val partialSums = partials.values.collect
    
  3. Compute cumulative sum over partitions and broadcast it:

    val sumMap = sc.broadcast(
      (0 until rdd.partitions.size)
        .zip(partialSums.scanLeft(0)(_ + _))
        .toMap
    )
    
  4. Compute final results:

    val result = partials.keys.mapPartitionsWithIndex((i, iter) => {
      val offset = sumMap.value(i)
      if (iter.isEmpty) Iterator()
      else iter.next.map{case (k, v) => (k, v + offset)}.toIterator
    })
    
  1. 计算每个分区的部分结果:

    val partials = rdd.mapPartitionsWithIndex((i, iter) => {
      val (keys, values) = iter.toSeq.unzip
      val sums  = values.scanLeft(0)(_ + _)
      Iterator((keys.zip(sums.tail), sums.last))
    })
    
  2. 收集部分总和

    val partialSums = partials.values.collect
    
  3. 计算分区的累积总和并广播它:

    val sumMap = sc.broadcast(
      (0 until rdd.partitions.size)
        .zip(partialSums.scanLeft(0)(_ + _))
        .toMap
    )
    
  4. 计算最终结果:

    val result = partials.keys.mapPartitionsWithIndex((i, iter) => {
      val offset = sumMap.value(i)
      if (iter.isEmpty) Iterator()
      else iter.next.map{case (k, v) => (k, v + offset)}.toIterator
    })
    

回答by Rahul Sharma

Spark has buit-in supports for hive ANALYTICS/WINDOWINGfunctions and the cumulative sum could be achieved easily using ANALYTICS functions.

Spark 内置了对 hive ANALYTICS/WINDOWING函数的支持,并且可以使用 ANALYTICS 函数轻松实现累积和。

Hive wiki ANALYTICS/WINDOWINGfunctions.

Hive wiki分析/窗口功能。

Example:

例子:

Assuming you have sqlContext object-

假设您有 sqlContext 对象-

val datardd = sqlContext.sparkContext.parallelize(Seq(("a",1),("b",2), ("c",3),("d",4),("d",5),("d",6)))
import sqlContext.implicits._

//Register as test table
datardd.toDF("id","val").createOrReplaceTempView("test")

//Calculate Cumulative sum
sqlContext.sql("select id,val, " +
  "SUM(val) over (  order by id  rows between unbounded preceding and current row ) cumulative_Sum " +
  "from test").show()

This approach cause to below warning. In case executor runs outOfMemory, tune job's memory parameters accordingly to work with huge dataset.

这种方法导致以下警告。如果执行程序运行 outOfMemory,请相应地调整作业的内存参数以处理庞大的数据集。

WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation

警告 WindowExec:没有为窗口操作定义分区!将所有数据移动到单个分区,这会导致严重的性能下降

I hope this helps.

我希望这有帮助。

回答by Paul

Here is a solution in PySpark. Internally it's essentially the same as @zero323's Scala solution, but it provides a general-purpose function with a Spark-like API.

这是 PySpark 中的解决方案。在内部,它本质上与@zero323 的 Scala 解决方案相同,但它提供了一个具有类似 Spark 的 API 的通用功能。

import numpy as np
def cumsum(rdd, get_summand):
    """Given an ordered rdd of items, computes cumulative sum of
    get_summand(row), where row is an item in the RDD.
    """
    def cumsum_in_partition(iter_rows):
        total = 0
        for row in iter_rows:
            total += get_summand(row)
            yield (total, row)
    rdd = rdd.mapPartitions(cumsum_in_partition)

    def last_partition_value(iter_rows):
        final = None
        for cumsum, row in iter_rows:
            final = cumsum
        return (final,)

    partition_sums = rdd.mapPartitions(last_partition_value).collect()
    partition_cumsums = list(np.cumsum(partition_sums))
    partition_cumsums = [0] + partition_cumsums
    partition_cumsums = sc.broadcast(partition_cumsums)

    def add_sums_of_previous_partitions(idx, iter_rows):
        return ((cumsum + partition_cumsums.value[idx], row)
            for cumsum, row in iter_rows)
    rdd = rdd.mapPartitionsWithIndex(add_sums_of_previous_partitions)
    return rdd

# test for correctness by summing numbers, with and without Spark
rdd = sc.range(10000,numSlices=10).sortBy(lambda x: x)
cumsums, values = zip(*cumsum(rdd,lambda x: x).collect())
assert all(cumsums == np.cumsum(values))

回答by Melanie Chen

I came across a similar problem and implemented @Paul 's solution. I wanted to do cumsum on a integer frequency table sorted by key(the integer), and there was a minor problem with np.cumsum(partition_sums), error being unsupported operand type(s) for +=: 'int' and 'NoneType'.

我遇到了类似的问题并实施了 @Paul 的解决方案。我想在按键(整数)排序​​的整数频率表上执行 cumsum,并且np.cumsum(partition_sums), 错误是unsupported operand type(s) for +=: 'int' and 'NoneType'.

Because if the range is big enough, the probability of each partition having something is thus big enough(no None values). However, if the range is much smaller than count, and number of partitions remains the same, some of the partitions would be empty. Here comes the modified solution:

因为如果范围足够大,那么每个分区具有某些东西的概率就足够大(没有 None 值)。但是,如果范围远小于计数,并且分区数保持不变,则某些分区将是空的。这是修改后的解决方案:

def cumsum(rdd, get_summand):
    """Given an ordered rdd of items, computes cumulative sum of
    get_summand(row), where row is an item in the RDD.
    """
    def cumsum_in_partition(iter_rows):
        total = 0
        for row in iter_rows:
            total += get_summand(row)
            yield (total, row)
    rdd = rdd.mapPartitions(cumsum_in_partition)
    def last_partition_value(iter_rows):
        final = None
        for cumsum, row in iter_rows:
            final = cumsum
        return (final,)
    partition_sums = rdd.mapPartitions(last_partition_value).collect()
    # partition_cumsums = list(np.cumsum(partition_sums))

    #----from here are the changed lines
    partition_sums = [x for x in partition_sums if x is not None] 
    temp = np.cumsum(partition_sums)
    partition_cumsums = list(temp)
    #----

    partition_cumsums = [0] + partition_cumsums   
    partition_cumsums = sc.broadcast(partition_cumsums)
    def add_sums_of_previous_partitions(idx, iter_rows):
        return ((cumsum + partition_cumsums.value[idx], row)
            for cumsum, row in iter_rows)
    rdd = rdd.mapPartitionsWithIndex(add_sums_of_previous_partitions)
    return rdd

#test on random integer frequency
x = np.random.randint(10, size=1000)
D = sqlCtx.createDataFrame(pd.DataFrame(x.tolist(),columns=['D']))
c = D.groupBy('D').count().orderBy('D')
c_rdd =  c.rdd.map(lambda x:x['count'])
cumsums, values = zip(*cumsum(c_rdd,lambda x: x).collect())

回答by keepscoding

you can want to try out with windows over using rowsBetween. hope still helpful.

您可以使用rowsBetween 来尝试使用windows。希望还是有帮助的。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val data = Array(("c1",6), ("c2",3),("c3",4))
val df = sc.parallelize(data).sortByKey().toDF("c", "v")
val w = Window.orderBy("c")
val r = df.select( $"c", sum($"v").over(w.rowsBetween(-2, -1)).alias("cs"))
display(r)