Java Apache Spark - foreach 与 foreachPartitions 何时使用什么?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/30484701/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Apache Spark - foreach Vs foreachPartitions When to use What?
提问by Beniamino Del Pizzo
I would like to know if the foreachPartitions
will results in better performance, due to an higher level of parallelism, compared to the foreach
method considering the case in which I'm flowing through an RDD
in order to perform some sums into an accumulator variable.
我想知道foreachPartitions
由于更高级别的并行性,与foreach
考虑到我流过 anRDD
以便对累加器变量执行一些总和的情况的方法相比,这是否会导致更好的性能。
回答by Justin Pihony
There is really not that much of a difference between foreach
and foreachPartitions
. Under the covers, all that foreach
is doing is calling the iterator's foreach
using the provided function. foreachPartition
just gives you the opportunity to do something outside of the looping of the iterator, usually something expensive like spinning up a database connection or something along those lines. So, if you don't have anything that could be done once for each node's iterator and reused throughout, then I would suggest using foreach
for improved clarity and reduced complexity.
foreach
和之间真的没有太大区别foreachPartitions
。在幕后,foreach
所做的就是foreach
使用提供的函数调用迭代器。foreachPartition
只是让您有机会在迭代器的循环之外做一些事情,通常是一些昂贵的事情,比如启动数据库连接或沿着这些路线做的事情。因此,如果您没有任何可以为每个节点的迭代器完成一次并在整个过程中重复使用的事情,那么我建议使用它foreach
来提高清晰度并降低复杂性。
回答by Bin Wang
foreach
auto run the loop on many nodes.
foreach
在许多节点上自动运行循环。
However, sometimes you want to do some operations on each node. For example, make a connection to database. You can not just make a connection and pass it into the foreach
function: the connection is only made on one node.
但是,有时您想对每个节点进行一些操作。例如,建立到数据库的连接。您不能只是建立连接并将其传递给foreach
函数:连接仅在一个节点上建立。
So with foreachPartition
, you can make a connection to database on each node before running the loop.
因此,使用foreachPartition
,您可以在运行循环之前连接到每个节点上的数据库。
回答by deenbandhu
The foreachPartition
does not mean it is per node activity rather it is executed for each partition and it is possible you may have large number of partition compared to number of nodes in that case your performance may be degraded. If you intend to do a activity at node level the solution explained heremay be useful although it is not tested by me
这foreachPartition
并不意味着它是针对每个节点活动的,而是针对每个分区执行的,并且与节点数量相比,您可能拥有大量分区,在这种情况下,您的性能可能会下降。如果您打算在节点级别进行活动,此处解释的解决方案可能很有用,尽管我未对其进行测试
回答by Oren
foreachPartition
is only helpful when you're iterating through data which you are aggregating by partition.
foreachPartition
仅当您遍历按分区聚合的数据时才有用。
A good example is processing clickstreams per user. You'd want to clear your calculation cache every time you finish a user's stream of events, but keep it between records of the same user in order to calculate some user behavior insights.
一个很好的例子是处理每个用户的点击流。每次完成用户的事件流时,您都希望清除计算缓存,但将其保存在同一用户的记录之间,以便计算一些用户行为洞察。
回答by Ram Ghadiyaram
foreach
and foreachPartitions
are actions.
foreach
并且foreachPartitions
是动作。
foreach(function): Unit
foreach(function): 单位
A generic function for invoking operations with side effects. For each element in the RDD, it invokes the passed function . This is generally used for manipulating accumulators or writing to external stores.
用于调用具有副作用的操作的通用函数。对于 RDD 中的每个元素,它调用传递的函数。这通常用于操作累加器或写入外部存储。
Note: modifying variables other than Accumulators outside of the foreach()
may result in undefined behavior. See Understanding closuresfor more details.
注意:修改除 Accumulators 之外的变量foreach()
可能会导致未定义的行为。有关更多详细信息,请参阅了解闭包。
例子:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
foreachPartition(function): Unit
foreachPartition(function): 单位
Similar to
foreach()
, but instead of invoking function for each element, it calls it for each partition. The function should be able to accept an iterator. This is more efficient thanforeach()
because it reduces the number of function calls (just likemapPartitions
() ).
与 类似
foreach()
,但不是为每个元素调用函数,而是为每个分区调用它。该函数应该能够接受迭代器。这比foreach()
因为它减少了函数调用的数量(就像mapPartitions
()一样)更有效。
Usage of foreachPartition
examples:
用法foreachPartition
示例:
- Example1 : for each partition one database connection (Inside for each partition block) you want to use then this is an example usage of how it can be done using scala.
- 示例 1:对于每个分区,您要使用一个数据库连接(每个分区块的内部),然后这是如何使用 Scala 完成的示例用法。
/** * Insert in to database using foreach partition. * * @param sqlDatabaseConnectionString * @param sqlTableName */ def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = { //numPartitions = number of simultaneous DB connections you can planning to give datframe.repartition(numofpartitionsyouwant) val tableHeader: String = dataFrame.columns.mkString(",") dataFrame.foreachPartition { partition => // Note : Each partition one connection (more better way is to use connection pools) val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString) //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql partition.grouped(1000).foreach { group => val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder() group.foreach { record => insertString.append("('" + record.mkString(",") + "'),") } sqlExecutorConnection.createStatement() .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES " + insertString.stripSuffix(",")) } sqlExecutorConnection.close() // close the connection so that connections wont exhaust. } }
- Example2 :
- 示例2:
Usage of foreachPartition
with sparkstreaming (dstreams) and kafka producer
使用foreachPartition
sparkstreaming (dstreams) 和 kafka 生产者
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
val producer = createKafkaProducer()
partitionOfRecords.foreach { message =>
producer.send(message)
}
producer.close()
}
}
Note :If you want to avoid this way of creating producer once per partition, betterway is to broadcast producer using
sparkContext.broadcast
since Kafka producer is asynchronous and buffers data heavily before sending.
注意:如果您想避免这种为每个分区创建一次生产者的方式,更好的方法是使用广播生产者,
sparkContext.broadcast
因为 Kafka 生产者是异步的,并且在发送前会大量缓冲数据。
Accumulator samples snippet to play around with it... through which you can test the performance
可以使用累加器示例代码片段...通过它您可以测试性能
test("Foreach - Spark") { import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x)) assert(accum.value == 6L) } test("Foreach partition - Spark") { import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_))) assert(accum.value == 6L) }
Conclusion :
结论 :
foreachPartition
operations on partitions so obviously it would be better edge thanforeach
foreachPartition
对分区的操作显然比边缘更好foreach
Rule of Thumb :
经验法则:
foreachPartition
should be used when you are accessing costly resources such as database connections or kafka producer etc.. which would initialize one per partition rather than one per element(foreach
). when it comes to accumulators you can measure the performance by above test methods, which should work faster in case of accumulators as well..
foreachPartition
当您访问昂贵的资源(例如数据库连接或 kafka 生产者等)时,应该使用它。这将为每个分区初始化一个而不是每个元素一个(foreach
)。当涉及到蓄能器时,您可以通过上述测试方法来衡量性能,在蓄能器的情况下,这种方法也应该工作得更快。
Also... see map vs mappartitionswhich has similar concept but they are tranformations.
另外...请参阅map vs mappartitions,它们具有相似的概念,但它们是转换。