multithreading Scala 中的异步 IO 与期货

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/13097754/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-10 01:20:44  来源:igfitidea点击:

Asynchronous IO in Scala with futures

multithreadingscalaiofuture

提问by F.X.

Let's say I'm getting a (potentially big) list of images to download from some URLs. I'm using Scala, so what I would do is :

假设我从某些 URL 获取要下载的(可能很大)图像列表。我正在使用 Scala,所以我会做的是:

import scala.actors.Futures._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val fimages: List[Future[...]] = urls.map (url => future { download url })

// Do something (display) when complete
fimages.foreach (_.foreach (display _))

I'm a bit new to Scala, so this still looks a little like magic to me :

我对 Scala 有点陌生,所以这对我来说仍然有点像魔术:

  • Is this the right way to do it? Any alternatives if it is not?
  • If I have 100 images to download, will this create 100 threads at once, or will it use a thread pool?
  • Will the last instruction (display _) be executed on the main thread, and if not, how can I make sure it is?
  • 这是正确的方法吗?如果不是,有什么替代方案吗?
  • 如果我有 100 张图片要下载,这会一次创建 100 个线程,还是会使用线程池?
  • 最后一条指令 ( display _) 是否会在主线程上执行,如果没有,我如何确定它是?

Thanks for your advice!

谢谢你的建议!

回答by Heather Miller

Use Futures in Scala 2.10. They were joint work between the Scala team, the Akka team, and Twitter to reach a more standardized future API and implementation for use across frameworks. We just published a guide at: http://docs.scala-lang.org/overviews/core/futures.html

在 Scala 2.10 中使用期货。他们是 Scala 团队、Akka 团队和 Twitter 之间的联合工作,旨在实现更标准化的未来 API 和跨框架使用的实现。我们刚刚发布了一个指南:http: //docs.scala-lang.org/overviews/core/futures.html

Beyond being completely non-blocking (by default, though we provide the ability to do managed blocking operations) and composable, Scala's 2.10 futures come with an implicit thread pool to execute your tasks on, as well as some utilities to manage time outs.

除了完全非阻塞(默认情况下,尽管我们提供了进行托管阻塞操作的能力)和可组合之外,Scala 的 2.10 期货还带有一个隐式线程池来执行您的任务,以及一些用于管理超时的实用程序。

import scala.concurrent.{future, blocking, Future, Await, ExecutionContext.Implicits.global}
import scala.concurrent.duration._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val imagesFuts: List[Future[...]] = urls.map {
  url => future { blocking { download url } }
}

// Do something (display) when complete
val futImages: Future[List[...]] = Future.sequence(imagesFuts)
Await.result(futImages, 10 seconds).foreach(display)

Above, we first import a number of things:

上面,我们首先导入一些东西:

  • future: API for creating a future.
  • blocking: API for managed blocking.
  • Future: Future companion object which contains a number of useful methods for collectionsof futures.
  • Await: singleton object used for blocking on a future (transferring its result to the current thread).
  • ExecutionContext.Implicits.global: the default global thread pool, a ForkJoin pool.
  • duration._: utilities for managing durations for time outs.
  • future:用于创建未来的 API。
  • blocking:用于托管阻塞的 API。
  • Future:Future 伴生对象,其中包含许多用于期货集合的有用方法。
  • Await:用于阻塞未来的单例对象(将其结果传输到当前线程)。
  • ExecutionContext.Implicits.global:默认的全局线程池,一个 ForkJoin 池。
  • duration._:用于管理超时持续时间的实用程序。

imagesFutsremains largely the same as what you originally did- the only difference here is that we use managed blocking- blocking. It notifies the thread pool that the block of code you pass to it contains long-running or blocking operations. This allows the pool to temporarily spawn new workers to make sure that it never happens that all of the workers are blocked. This is done to prevent starvation (locking up the thread pool) in blocking applications. Note that the thread pool also knows when the code in a managed blocking block is complete- so it will remove the spare worker thread at that point, which means that the pool will shrink back down to its expected size.

imagesFuts与您最初所做的大致相同 - 唯一的区别是我们使用托管阻塞 - blocking。它通知线程池您传递给它的代码块包含长时间运行或阻塞的操作。这允许池临时产生新的工作人员,以确保不会发生所有工作人员都被阻塞的情况。这样做是为了防止阻塞应用程序中的饥饿(锁定线程池)。请注意,线程池还知道托管阻塞块中的代码何时完成 - 因此它将在此时删除空闲的工作线程,这意味着池将收缩回其预期大小。

(If you want to absolutely prevent additional threads from ever being created, then you ought to use an AsyncIO library, such as Java's NIO library.)

(如果您想绝对防止创建额外的线程,那么您应该使用 AsyncIO 库,例如 Java 的 NIO 库。)

Then we use the collection methods of the Future companion object to convert imagesFutsfrom List[Future[...]]to a Future[List[...]].

然后我们使用 Future 伴生对象的集合方法将imagesFutsfrom 转换List[Future[...]]为 a Future[List[...]]

The Awaitobject is how we can ensure that displayis executed on the calling thread-- Await.resultsimply forces the current thread to wait until the future that it is passed is completed. (This uses managed blocking internally.)

Await对象是我们如何能够确保display是在调用thread--执行Await.result简单地强制当前线程等待,直到将来,它传递完成。(这在内部使用托管阻塞。)

回答by som-snytt

val all = Future.traverse(urls){ url =>
  val f = future(download url) /*(downloadContext)*/
  f.onComplete(display)(displayContext)
  f
}
Await.result(all, ...)
  1. Use scala.concurrent.Future in 2.10, which is RC now.
  2. which uses an implicit ExecutionContext
  3. The new Future doc is explicit that onComplete (and foreach) may evaluate immediately if the value is available. The old actors Future does the same thing. Depending on what your requirement is for display, you can supply a suitable ExecutionContext (for instance, a single thread executor). If you just want the main thread to wait for loading to complete, traverse gives you a future to await on.
  1. 在 2.10 中使用 scala.concurrent.Future,现在是 RC。
  2. 它使用隐式的 ExecutionContext
  3. 新的 Future 文档明确指出 onComplete(和 foreach)可以在值可用时立即评估。老演员 Future 做同样的事情。根据您的显示要求,您可以提供合适的 ExecutionContext(例如,单线程执行器)。如果你只是想让主线程等待加载完成,traverse 会给你一个等待的未来。

回答by Alexey Romanov

  1. Yes, seems fine to me, but you may want to investigate more powerful twitter-utilor AkkaFuture APIs (Scala 2.10 will have a new Future library in this style).

  2. It uses a thread pool.

  3. No, it won't. You need to use the standard mechanism of your GUI toolkit for this (SwingUtilities.invokeLaterfor Swing or Display.asyncExecfor SWT). E.g.

    fimages.foreach (_.foreach(im => SwingUtilities.invokeLater(new Runnable { display im })))
    
  1. 是的,对我来说似乎很好,但您可能想研究更强大的twitter-utilAkkaFuture API(Scala 2.10 将有一个这种风格的新 Future 库)。

  2. 它使用线程池。

  3. 不,不会。为此,您需要使用 GUI 工具包的标准机制(SwingUtilities.invokeLater用于 Swing 或Display.asyncExecSWT)。例如

    fimages.foreach (_.foreach(im => SwingUtilities.invokeLater(new Runnable { display im })))