multithreading 如何在 Scala 中执行多个任务?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/4511078/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How can I execute multiple tasks in Scala?
提问by yura
I have 50,000 tasks and want to execute them with 10 threads. In Java I should create Executers.threadPool(10) and pass runnable to is then wait to process all. Scala as I understand especially useful for that task, but I can't find solution in docs.
我有 50,000 个任务,想用 10 个线程执行它们。在 Java 中,我应该创建 Executers.threadPool(10) 并将 runnable 传递给然后等待处理所有。据我所知,Scala 对这项任务特别有用,但我在文档中找不到解决方案。
回答by mpilquist
Scala 2.9.3 and later
Scala 2.9.3 及更高版本
THe simplest approach is to use the scala.concurrent.Future
class and associated infrastructure. The scala.concurrent.future
method asynchronously evaluates the block passed to it and immediately returns a Future[A]
representing the asynchronous computation. Futures can be manipulated in a number of non-blocking ways, including mapping, flatMapping, filtering, recovering errors, etc.
最简单的方法是使用scala.concurrent.Future
类和关联的基础结构。该scala.concurrent.future
方法异步评估传递给它的块并立即返回一个Future[A]
表示异步计算的 。Futures 可以通过多种非阻塞方式进行操作,包括映射、平面映射、过滤、错误恢复等。
For example, here's a sample that creates 10 tasks, where each tasks sleeps an arbitrary amount of time and then returns the square of the value passed to it.
例如,这里有一个创建 10 个任务的示例,其中每个任务休眠任意时间,然后返回传递给它的值的平方。
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
val tasks: Seq[Future[Int]] = for (i <- 1 to 10) yield future {
println("Executing task " + i)
Thread.sleep(i * 1000L)
i * i
}
val aggregated: Future[Seq[Int]] = Future.sequence(tasks)
val squares: Seq[Int] = Await.result(aggregated, 15.seconds)
println("Squares: " + squares)
In this example, we first create a sequence of individual asynchronous tasks that, when complete, provide an int. We then use Future.sequence
to combine those async tasks in to a single async task -- swapping the position of the Future
and the Seq
in the type. Finally, we block the current thread for up to 15 seconds while waiting for the result. In the example, we use the global execution context, which is backed by a fork/join thread pool. For non-trivial examples, you probably would want to use an application specific ExecutionContext
.
在此示例中,我们首先创建一系列单独的异步任务,完成后提供一个 int。然后我们使用Future.sequence
将这些异步任务组合成一个异步任务——交换类型中Future
和 的位置Seq
。最后,我们在等待结果的同时阻塞当前线程最多 15 秒。在示例中,我们使用全局执行上下文,它由 fork/join 线程池支持。对于重要的示例,您可能希望使用特定于应用程序的ExecutionContext
.
Generally, blocking should be avoided when at all possible. There are other combinators available on the Future
class that can help program in an asynchronous style, including onSuccess
, onFailure
, and onComplete
.
通常,应尽可能避免阻塞。有可用的其他组合子Future
类,它可以帮助程序在异步风格,其中包括onSuccess
,onFailure
,和onComplete
。
Also, consider investigating the Akkalibrary, which provides actor-based concurrency for Scala and Java, and interoperates with scala.concurrent
.
此外,请考虑研究Akka库,它为 Scala 和 Java 提供基于 actor 的并发性,并与scala.concurrent
.
Scala 2.9.2 and prior
Scala 2.9.2 及更早版本
This simplest approach is to use Scala's Future class, which is a sub-component of the Actors framework. The scala.actors.Futures.future method creates a Future for the block passed to it. You can then use scala.actors.Futures.awaitAll to wait for all tasks to complete.
这种最简单的方法是使用 Scala 的 Future 类,它是 Actors 框架的一个子组件。scala.actors.Futures.future 方法为传递给它的块创建一个 Future 。然后您可以使用 scala.actors.Futures.awaitAll 等待所有任务完成。
For example, here's a sample that creates 10 tasks, where each tasks sleeps an arbitrary amount of time and then returns the square of the value passed to it.
例如,这里有一个创建 10 个任务的示例,其中每个任务休眠任意时间,然后返回传递给它的值的平方。
import scala.actors.Futures._
val tasks = for (i <- 1 to 10) yield future {
println("Executing task " + i)
Thread.sleep(i * 1000L)
i * i
}
val squares = awaitAll(20000L, tasks: _*)
println("Squares: " + squares)
回答by Janx
You want to look at either the Scala actors library or Akka. Akka has cleaner syntax, but either will do the trick.
您想查看 Scala 演员库或 Akka。Akka 的语法更简洁,但两者都可以。
So it sounds like you need to create a pool of actors that know how to process your tasks. An Actor can basically be any class with a receive method - from the Akka tutorial (http://doc.akkasource.org/tutorial-chat-server-scala):
所以听起来你需要创建一个知道如何处理你的任务的演员池。Actor 基本上可以是任何具有接收方法的类 - 来自 Akka 教程 ( http://doc.akkasource.org/tutorial-chat-server-scala):
class MyActor extends Actor {
def receive = {
case "test" => println("received test")
case _ => println("received unknown message")
}}
val myActor = Actor.actorOf[MyActor]
myActor.start
You'll want to create a pool of actor instances and fire your tasks to them as messages. Here's a post on Akka actor pooling that may be helpful: http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/
您需要创建一个actor 实例池并将您的任务作为消息发送给它们。这是一篇关于 Akka 演员池的帖子,可能会有所帮助:http: //vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/
In your case, one actor per task may be appropriate (actors are extremely lightweight compared to threads so you can have a LOT of them in a single VM), or you might need some more sophisticated load balancing between them.
在您的情况下,每个任务一个 actor 可能是合适的(与线程相比,actor 非常轻量级,因此您可以在单个 VM 中拥有很多它们),或者您可能需要在它们之间进行一些更复杂的负载平衡。
EDIT: Using the example actor above, sending it a message is as easy as this:
编辑:使用上面的示例演员,向它发送消息就像这样简单:
myActor ! "test"
The actor will then output "received test" to standard output.
然后,actor 将“接收到的测试”输出到标准输出。
Messages can be of any type, and when combined with Scala's pattern matching, you have a powerful pattern for building flexible concurrent applications.
消息可以是任何类型,当与 Scala 的模式匹配结合时,您将拥有一个强大的模式来构建灵活的并发应用程序。
In general Akka actors will "do the right thing" in terms of thread sharing, and for the OP's needs, I imagine the defaults are fine. But if you need to, you can set the dispatcher the actor should use to one of several types:
一般来说,Akka 演员会在线程共享方面“做正确的事情”,并且对于 OP 的需求,我想默认值很好。但如果需要,您可以将 actor 应使用的调度程序设置为以下几种类型之一:
* Thread-based
* Event-based
* Work-stealing
* HawtDispatch-based event-driven
It's trivial to set a dispatcher for an actor:
为 actor 设置调度程序很简单:
class MyActor extends Actor {
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("thread-pool-dispatch")
.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(10)
.setMaxPoolSize(10)
.setKeepAliveTimeInMillis(10000)
.build
}
See http://doc.akkasource.org/dispatchers-scala
请参阅http://doc.akkasource.org/dispatchers-scala
In this way, you could limit the thread pool size, but again, the original use case could probably be satisfied with 50K Akka actor instances using default dispatchers and it would parallelize nicely.
通过这种方式,您可以限制线程池的大小,但同样,使用默认调度程序的 50K Akka actor 实例可能会满足原始用例,并且它会很好地并行化。
This really only scratches the surface of what Akka can do. It brings a lot of what Erlang offers to the Scala language. Actors can monitor other actors and restart them, creating self-healing applications. Akka also provides Software Transactional Memory and many other features. It's arguably the "killer app" or "killer framework" for Scala.
这实际上只是 Akka 可以做的事情的皮毛。它为 Scala 语言带来了 Erlang 提供的很多东西。Actor 可以监视其他 Actor 并重新启动它们,从而创建自我修复应用程序。Akka 还提供软件事务内存和许多其他功能。它可以说是 Scala 的“杀手级应用”或“杀手级框架”。
回答by Daniel C. Sobral
If you want to "execute them with 10 threads", then use threads. Scala's actor model, which is usually what people is speaking of when they say Scala is good for concurrency, hidessuch details so you won't see them.
如果你想“用 10 个线程执行它们”,那么使用线程。Scala 的actor 模型,也就是人们常说的Scala 适合并发时所说的模型,隐藏了这样的细节,所以你不会看到它们。
Using actors, or futures with all you have are simple computations, you'd just create 50000 of them and let them run. You might be faced with issues, but they are of a different nature.
使用actor 或future 进行简单的计算,您只需创建50000 个并让它们运行。您可能会遇到问题,但它们的性质不同。
回答by Holger Brandl
Here's another answer similar to mpilquist's response but without deprecated API and including the thread settings via a custom ExecutionContext:
这是另一个类似于 mpilquist 的响应的答案,但没有弃用 API,并通过自定义 ExecutionContext 包括线程设置:
import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._
val numJobs = 50000
var numThreads = 10
// customize the execution context to use the specified number of threads
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numThreads))
// define the tasks
val tasks = for (i <- 1 to numJobs) yield Future {
// do something more fancy here
i
}
// aggregate and wait for final result
val aggregated = Future.sequence(tasks)
val oneToNSum = Await.result(aggregated, 15.seconds).sum