Scala 中的并发映射/foreach

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/1751953/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 01:44:41  来源:igfitidea点击:

Concurrent map/foreach in scala

scalaconcurrencyfunctional-programming

提问by David Crawshaw

I have an iteration vals: Iterable[T]and a long-running function without any relevant side effects: f: (T => Unit). Right now this is applied to valsin the obvious way:

我有一个迭代vals: Iterable[T]和一个长时间运行的函数,没有任何相关的副作用:f: (T => Unit). 现在这以vals明显的方式应用于:

vals.foreach(f)

I would like the calls to fto be done concurrently (within reasonable limits). Is there an obvious function somewhere in the Scala base library? Something like:

我希望同时进行调用f(在合理范围内)。Scala 基础库中某处是否有明显的函数?就像是:

Concurrent.foreach(8 /* Number of threads. */)(vals, f)

While fis reasonably long running, it is short enough that I don't want the overhead of invoking a thread for each call, so I am looking for something based on a thread pool.

虽然f运行时间相当长,但它足够短,我不希望为每次调用调用一个线程的开销,所以我正在寻找基于线程池的东西。

采纳答案by Daniel Spiewak

I like the Futuresanswer. However, while it will execute concurrently, it will also return asynchronously, which is probably not what you want. The correct approach would be as follows:

我喜欢这个Futures答案。但是,虽然它会并发执行,但也会异步返回,这可能不是您想要的。正确的方法如下:

import scala.actors.Futures._

vals map { x => future { f(x) } } foreach { _() }

回答by Kei-ven

Many of the answers from 2009 still use the old scala.actors.Futures._, which are no longer in the newer Scala. While Akka is the preferred way, a much more readable way is to just use parallel (.par) collections:

2009 年的许多答案仍然使用旧的 scala.actors.Futures._,它们不再出现在较新的 Scala 中。虽然 Akka 是首选方式,但更易读的方式是仅使用并行 ( .par) 集合:

vals.foreach { v => f(v) }

vals.foreach { v => f(v) }

becomes

变成

vals.par.foreach { v => f(v) }

vals.par.foreach { v => f(v) }

Alternatively, using parMap can appear more succinct though with the caveat that you need to remember to import the usual Scalaz*. As usual, there's more than one way to do the same thing in Scala!

或者,使用 parMap 可能看起来更简洁,但需要注意的是,您需要记住导入通常的 Scalaz*。像往常一样,在 Scala 中有不止一种方法可以做同样的事情!

回答by Apocalisp

Scalazhas parMap. You would use it as follows:

斯卡拉兹parMap。您可以按如下方式使用它:

import scalaz.Scalaz._
import scalaz.concurrent.Strategy.Naive

This will equip every functor (including Iterable) with a parMapmethod, so you can just do:

这将为每个函子(包括Iterable)配备一个parMap方法,因此您可以这样做:

vals.parMap(f)

You also get parFlatMap, parZipWith, etc.

你也得到parFlatMap, parZipWith, 等等。

回答by Jonathan Graehl

I had some issues using scala.actors.Futures in Scala 2.8 (it was buggy when I checked). Using java libs directly worked for me, though:

我在 Scala 2.8 中使用 scala.actors.Futures 时遇到了一些问题(我检查时有问题)。不过,直接使用 java libs 对我有用:

final object Parallel {
  val cpus=java.lang.Runtime.getRuntime().availableProcessors
  import java.util.{Timer,TimerTask}
  def afterDelay(ms: Long)(op: =>Unit) = new Timer().schedule(new TimerTask {override def run = op},ms)
  def repeat(n: Int,f: Int=>Unit) = {
    import java.util.concurrent._
    val e=Executors.newCachedThreadPool //newFixedThreadPool(cpus+1)
    (0 until n).foreach(i=>e.execute(new Runnable {def run = f(i)}))
    e.shutdown
    e.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
  }
}

回答by Daniel C. Sobral

I'd use scala.actors.Futures:

我会用scala.actors.Futures

vals.foreach(t => scala.actors.Futures.future(f(t)))

回答by Apocalisp

The latest release of Functional Javahas some higher-order concurrency features that you can use.

Functional Java的最新版本具有一些您可以使用的高阶并发特性。

import fjs.F._
import fj.control.parallel.Strategy._
import fj.control.parallel.ParModule._
import java.util.concurrent.Executors._

val pool = newCachedThreadPool
val par = parModule(executorStrategy[Unit](pool))

And then...

接着...

par.parMap(vals, f)

Remember to shutdownthe pool.

记住shutdownpool

回答by Roland

You can use the Parallel Collectionsfrom the Scala standard library. They're just like ordinary collections, but their operations run in parallel. You just need to put a parcall before you invoke some collections operation.

您可以使用Scala 标准库中的并行集合。它们就像普通的集合,但它们的操作是并行运行的。您只需要par在调用某些集合操作之前进行调用。

import scala.collection._

val array = new Array[String](10000)
for (i <- (0 until 10000).par) array(i) = i.toString