scala Apache Spark:地图与地图分区?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/21185092/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Apache Spark: map vs mapPartitions?
提问by Nicholas White
What's the difference between an RDD'smapand mapPartitionsmethod? And does flatMapbehave like mapor like mapPartitions? Thanks.
RDDmap和mapPartitions方法之间有什么区别?并且flatMap表现得像map还是像mapPartitions?谢谢。
(edit) i.e. what is the difference (either semantically or in terms of execution) between
(编辑)即有什么区别(在语义上或在执行方面)
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}
And:
和:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
采纳答案by Alexey Romanov
What's the difference between an RDD's map and mapPartitions method?
RDD 的 map 和 mapPartitions 方法有什么区别?
The method mapconverts each elementof the source RDD into a single element of the result RDD by applying a function. mapPartitionsconverts each partitionof the source RDD into multiple elements of the result (possibly none).
方法map通过应用函数将源 RDD 的每个元素转换为结果 RDD 的单个元素。mapPartitions将源 RDD 的每个分区转换为结果的多个元素(可能没有)。
And does flatMap behave like map or like mapPartitions?
flatMap 的行为是像 map 还是像 mapPartitions?
Neither, flatMapworks on a single element (as map) and produces multiple elements of the result (as mapPartitions).
两者都不是,flatMap对单个元素 (as map)起作用并产生结果的多个元素 (as mapPartitions)。
回答by Ram Ghadiyaram
Imp. TIP :
进出口。小费 :
Whenever you have heavyweight initialization that should be done once for many
RDDelements rather than once perRDDelement, and if this initialization, such as creation of objects from a third-party library, cannot be serialized (so that Spark can transmit it across the cluster to the worker nodes), usemapPartitions()instead ofmap().mapPartitions()provides for the initialization to be done once per worker task/thread/partition instead of once perRDDdata element for example :see below.
每当你有重量级的初始化应该对许多
RDD元素进行一次而不是每个RDD元素一次,并且如果这个初始化,例如从第三方库创建对象,不能被序列化(这样 Spark 可以将它跨集群传输到工作节点),使用mapPartitions()而不是map().mapPartitions()规定每个工作任务/线程/分区执行一次初始化,而不是每个RDD数据元素执行一次,例如:见下文。
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close() // close dbconnection here
newPartition.iterator // create a new iterator
})
Q2. does
flatMapbehave like map or likemapPartitions?
Q2。不
flatMap循规蹈矩像地图或类似mapPartitions?
Yes. please see example 2 of flatmap.. its self explanatory.
是的。请参阅示例 2 的flatmap.. 其不言自明。
Q1. What's the difference between an RDD's
mapandmapPartitions
mapworks the function being utilized at a per element level whilemapPartitionsexercises the function at the partition level.
一季度。RDD
map和RDD 之间有什么区别mapPartitions
map在每个元素级别运行正在使用mapPartitions的功能,同时 在分区级别运行该功能。
Example Scenario:if we have 100K elements in a particular RDDpartition then we will fire off the function being used by the mapping transformation 100K times when we use map.
示例场景:如果我们在特定RDD分区中有 100K 个元素,那么当我们使用map.
Conversely, if we use mapPartitionsthen we will only call the particular function one time, but we will pass in all 100K records and get back all responses in one function call.
相反,如果我们使用mapPartitionsthen 我们只会调用特定的函数一次,但是我们将传入所有 100K 记录并在一次函数调用中返回所有响应。
There will be performance gain since mapworks on a particular function so many times, especially if the function is doing something expensive each time that it wouldn't need to do if we passed in all the elements at once(in case of mappartitions).
由于map在特定函数上工作了很多次,因此性能会有所提高,特别是如果该函数每次都在做一些昂贵的事情,而如果我们一次传入所有元素(在 的情况下mappartitions)则不需要这样做。
map
地图
Applies a transformation function on each item of the RDD and returns the result as a new RDD.
Listing Variants
def map[U: ClassTag](f: T => U): RDD[U]
对 RDD 的每一项应用转换函数,并将结果作为新的 RDD 返回。
列出变体
def map[U: ClassTag](f: T => U): RDD[U]
Example :
例子 :
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
mapPartitions
地图分区
This is a specialized map that is called only once for each partition. The entire content of the respective partitions is available as a sequential stream of values via the input argument (Iterarator[T]). The custom function must return yet another Iterator[U]. The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose.
preservesPartitioningindicates whether the input function preserves the partitioner, which should befalseunless this is a pair RDD and the input function doesn't modify the keys.Listing Variants
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
这是一个专门的映射,每个分区只调用一次。各个分区的全部内容可通过输入参数 (Iterarator[T]) 作为连续值流使用。自定义函数必须返回另一个 Iterator[U]。组合的结果迭代器会自动转换为新的 RDD。请注意,由于我们选择了分区,以下结果中缺少元组 (3,4) 和 (6,7)。
preservesPartitioning指示输入函数是否保留分区器,false除非这是一对 RDD 并且输入函数不修改键,否则应该保留分区器。列出变体
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
Example 1
示例 1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
Example 2
示例 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
// some of the number are not outputted at all. This is because the random number generated for it is zero.
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
The above program can also be written using flatMap as follows.
上面的程序也可以用 flatMap 写成如下。
Example 2 using flatmap
示例 2 使用 flatmap
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
Conclusion :
结论 :
mapPartitionstransformation is faster than mapsince it calls your function once/partition, not once/element..
mapPartitions转换比map因为它调用您的函数一次/分区,而不是一次/元素而更快。
Further reading : foreach Vs foreachPartitions When to use What?
回答by KrazyGautam
Map:
地图:
- It processes one row at a time , very similar to map() method of MapReduce.
- You return from the transformation after every row.
- 它一次处理一行,非常类似于 MapReduce 的 map() 方法。
- 您在每一行之后从转换中返回。
MapPartitions
地图分区
- It processes the complete partition in one go.
- You can return from the function only once after processing the whole partition.
- All intermediate results needs to be held in memory till you process the whole partition.
- Provides you like setup() map() and cleanup() function of MapReduce
Map Vs mapPartitionshttp://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/
Spark Maphttp://bytepadding.com/big-data/spark/spark-map/
Spark mapPartitionshttp://bytepadding.com/big-data/spark/spark-mappartitions/
- 它一次性处理完整的分区。
- 处理整个分区后,您只能从该函数返回一次。
- 所有中间结果都需要保存在内存中,直到您处理整个分区。
- 提供你喜欢的MapReduce的setup()、map()和cleanup()函数
Map Vs mapPartitionshttp://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/
Spark Maphttp://bytepadding.com/big-data/spark/spark-map/
Spark mapPartitionshttp://bytepadding.com/big-data/spark/spark-mappartitions/

