scala 在 Spark 数据集中滚动你自己的 reduceByKey

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/38383207/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:29:27  来源:igfitidea点击:

Rolling your own reduceByKey in Spark Dataset

scalaapache-sparkmapreduce

提问by Carlos Bribiescas

I'm trying to learn to use DataFrames and DataSets more in addition to RDDs. For an RDD, I know I can do someRDD.reduceByKey((x,y) => x + y), but I don't see that function for Dataset. So I decided to write one.

除了 RDDs 之外,我正在尝试学习更多地使用 DataFrames 和 DataSets。对于 RDD,我知道我可以做someRDD.reduceByKey((x,y) => x + y),但我没有看到 Dataset 的那个功能。所以我决定写一篇。

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
  val result = mutable.HashMap.empty[(Long,Long),Int]
  val keys = mutable.HashSet.empty[(Long,Long)]
  y.keys.foreach(z => keys += z)
  x.keys.foreach(z => keys += z)
  for (elem <- keys) {
    val s1 = if(x.contains(elem)) x(elem) else 0
    val s2 = if(y.contains(elem)) y(elem) else 0
    result(elem) = s1 + s2
  }
  result
})

However, this returns everything to the driver. How would you write this to return a Dataset? Maybe mapPartition and do it there?

但是,这会将所有内容返回给驱动程序。你会如何写这个来返回 a Dataset?也许 mapPartition 并在那里做?

Note this compiles but does not run because it doesn't have encoders for Mapyet

注意此编译但无法运行,因为它没有对编码器Map

回答by bluenote10

I assume your goal is to translate this idiom to Datasets:

我假设您的目标是将此习语转换为数据集:

rdd.map(x => (x.someKey, x.someField))
   .reduceByKey(_ + _)

// => returning an RDD of (KeyType, FieldType)

Currently, the closest solution I have found with the Dataset API looks like this:

目前,我使用 Dataset API 找到的最接近的解决方案如下所示:

ds.map(x => (x.someKey, x.someField))          // [1]
  .groupByKey(_._1)                            
  .reduceGroups((a, b) => (a._1, a._2 + b._2))
  .map(_._2)                                   // [2]

// => returning a Dataset of (KeyType, FieldType)

// Comments:
// [1] As far as I can see, having a map before groupByKey is required
//     to end up with the proper type in reduceGroups. After all, we do
//     not want to reduce over the original type, but the FieldType.
// [2] required since reduceGroups converts back to Dataset[(K, V)]
//     not knowing that our V's are already key-value pairs.

Doesn't look very elegant and according to a quick benchmark it is also much less performant, so maybe we are missing something here...

看起来不是很优雅,根据快速基准测试,它的性能也低得多,所以也许我们在这里遗漏了一些东西......

Note: An alternative might be to use groupByKey(_.someKey)as a first step. The problem is that using groupByKeychanges the type from a regular Datasetto a KeyValueGroupedDataset. The latter does not have a regular mapfunction. Instead it offers an mapGroups, which does not seem very convenient because it wraps the values into an Iteratorand performs a shuffle according to the docstring.

注意:另一种方法可能是groupByKey(_.someKey)用作第一步。问题是 usinggroupByKey将类型从常规Dataset更改为KeyValueGroupedDataset. 后者没有常规map功能。相反,它提供了一个mapGroups,这看起来不太方便,因为它将值包装到 an 中Iterator并根据文档字符串执行 shuffle。

回答by Justin Raymond

A more efficient solution uses mapPartitionsbefore groupByKeyto reduce the amount of shuffling (note this is not the exact same signature as reduceByKeybut I think it is more flexible to pass a function than require the dataset consist of a tuple).

一个更有效的解决方案使用mapPartitionsbeforegroupByKey来减少改组的数量(注意这与签名不完全相同,reduceByKey但我认为传递函数比要求数据集由元组组成更灵活)。

def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V)
  (implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = {
  def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = {
    iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator
  }
  ds.mapPartitions(h(f, g, _))
    .groupByKey(f)(encK)
    .reduceGroups(g)
}

Depending on the shape/size of your data, this is within 1 second of the performance of reduceByKey, and about 2xas fast as a groupByKey(_._1).reduceGroups. There is still room for improvements, so suggestions would be welcome.

根据您的数据的形状/大小,这是表现的1秒钟之内reduceByKey,并约2x一样快groupByKey(_._1).reduceGroups。仍有改进的空间,因此欢迎提出建议。