scala 如何等待多个Future?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/16256279/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 05:14:17  来源:igfitidea点击:

How to wait for several Futures?

scalaconcurrencyfuture

提问by Michael

Suppose I have several futures and need to wait until eitherany of them fails orall of them succeed.

假设我有几个期货和需要等到或者其中的任何失败或者它们全部成功。

For example: Let there are 3 futures: f1, f2, f3.

例如:假设有 3 个期货:f1, f2, f3

  • If f1succeeds and f2fails I do not wait for f3(and return failureto the client).

  • If f2fails while f1and f3are still running I do not wait for them (and return failure)

  • If f1succeeds and then f2succeeds I continue waiting for f3.

  • 如果f1成功和f2失败,我不会等待f3(并将失败返回给客户端)。

  • 如果f2失败f1并且f3仍在运行,我不会等待它们(并返回失败

  • 如果f1成功然后f2成功我继续等待f3

How would you implement it?

你将如何实施它?

采纳答案by cmbaxter

You could use a for-comprehension as follows instead:

您可以使用 for-comprehension,如下所示:

val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}

val aggFut = for{
  f1Result <- fut1
  f2Result <- fut2
  f3Result <- fut3
} yield (f1Result, f2Result, f3Result)

In this example, futures 1, 2 and 3 are kicked off in parallel. Then, in the for comprehension, we wait until the results 1 and then 2 and then 3 are available. If either 1 or 2 fails, we will not wait for 3 anymore. If all 3 succeed, then the aggFutval will hold a tuple with 3 slots, corresponding to the results of the 3 futures.

在此示例中,期货 1、2 和 3 并行启动。然后,在for comprehension中,我们等到结果1,然后2,然后3可用。如果 1 或 2 失败,我们将不再等待 3。如果所有 3 个都成功,则aggFutval 将保存一个具有 3 个插槽的元组,对应于 3 个期货的结果。

Now if you need the behavior where you want to stop waiting if say fut2 fails first, things get a little trickier. In the above example, you would have to wait for fut1 to complete before realizing fut2 failed. To solve that, you could try something like this:

现在,如果您需要在 say fut2 首先失败时停止等待的行为,事情会变得有点棘手。在上面的示例中,您必须等待 fut1 完成才能意识到 fut2 失败。为了解决这个问题,你可以尝试这样的事情:

  val fut1 = Future{Thread.sleep(3000);1}
  val fut2 = Promise.failed(new RuntimeException("boo")).future
  val fut3 = Future{Thread.sleep(1000);3}

  def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
    val fut = if (futures.size == 1) futures.head._2
    else Future.firstCompletedOf(futures.values)

    fut onComplete{
      case Success(value) if (futures.size == 1)=> 
        prom.success(value :: values)

      case Success(value) =>
        processFutures(futures - value, value :: values, prom)

      case Failure(ex) => prom.failure(ex)
    }
    prom.future
  }

  val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
  aggFut onComplete{
    case value => println(value)
  }

Now this works correctly, but the issue comes from knowing which Futureto remove from the Mapwhen one has been successfully completed. As long as you have some way to properly correlate a result with the Future that spawned that result, then something like this works. It just recursively keeps removing completed Futures from the Map and then calling Future.firstCompletedOfon the remaining Futuresuntil there are none left, collecting the results along the way. It's not pretty, but if you really need the behavior you are talking about, then this, or something similar could work.

现在这可以正常工作,但问题来自于知道FutureMap成功完成时删除哪个。只要您有某种方法可以将结果与产生该结果的 Future 正确关联,那么这样的事情就可以工作。它只是递归地不断从 Map 中删除已完成的 Futures,然后调用Future.firstCompletedOf剩余的FuturesFutures直到没有剩下的,沿途收集结果。这并不漂亮,但如果你真的需要你正在谈论的行为,那么这个或类似的东西可以工作。

回答by gourlaysama

You can use a promise, and send to it either the first failure, or the final completed aggregated success:

您可以使用承诺,并将第一次失败或最终完成的聚合成功发送给它:

def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
  val p = Promise[M[A]]()

  // the first Future to fail completes the promise
  in.foreach(_.onFailure{case i => p.tryFailure(i)})

  // if the whole sequence succeeds (i.e. no failures)
  // then the promise is completed with the aggregated success
  Future.sequence(in).foreach(p trySuccess _)

  p.future
}

Then you can Awaiton that resulting Futureif you want to block, or just mapit into something else.

然后你可以Await在结果上Future进行阻止,或者只是将map其转换为其他内容。

The difference with for comprehension is that here you get the error of the first to fail, whereas with for comprehension you get the first error in traversal order of the input collection (even if another one failed first). For example:

与 for comprehension 的区别在于,在这里您会得到第一个失败的错误,而对于 comprehension,您会得到输入集合遍历顺序中的第一个错误(即使另一个先失败)。例如:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
// this waits one second, then prints "java.lang.ArithmeticException: / by zero"
// the first to fail in traversal order

And:

和:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
// this immediately prints "java.util.NoSuchElementException: None.get"
// the 'actual' first to fail (usually...)
// and it returns early (it does not wait 1 sec)

回答by FranklinChen

Here is a solution without using actors.

这是一个不使用演员的解决方案。

import scala.util._
import scala.concurrent._
import java.util.concurrent.atomic.AtomicInteger

// Nondeterministic.
// If any failure, return it immediately, else return the final success.
def allSucceed[T](fs: Future[T]*): Future[T] = {
  val remaining = new AtomicInteger(fs.length)

  val p = promise[T]

  fs foreach {
    _ onComplete {
      case s @ Success(_) => {
        if (remaining.decrementAndGet() == 0) {
          // Arbitrarily return the final success
          p tryComplete s
        }
      }
      case f @ Failure(_) => {
        p tryComplete f
      }
    }
  }

  p.future
}

回答by Rex Kerr

You can do this with futures alone. Here's one implementation. Note that it won't terminate execution early! In that case you need to do something more sophisticated (and probably implement the interruption yourself). But if you just don't want to keep waiting for something that isn't going to work, the key is to keep waiting for the first thing to finish, and stop when either nothing is left or you hit an exception:

你可以单独使用期货来做到这一点。这是一个实现。请注意,它不会提前终止执行!在这种情况下,您需要做一些更复杂的事情(并且可能自己实现中断)。但是,如果您只是不想继续等待一些不起作用的东西,那么关键是继续等待第一件事完成,并在没有任何东西或遇到异常时停止:

import scala.annotation.tailrec
import scala.util.{Try, Success, Failure}
import scala.concurrent._
import scala.concurrent.duration.Duration
import ExecutionContext.Implicits.global

@tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): 
Either[Throwable, Seq[A]] = {
  val first = Future.firstCompletedOf(fs)
  Await.ready(first, Duration.Inf).value match {
    case None => awaitSuccess(fs, done)  // Shouldn't happen!
    case Some(Failure(e)) => Left(e)
    case Some(Success(_)) =>
      val (complete, running) = fs.partition(_.isCompleted)
      val answers = complete.flatMap(_.value)
      answers.find(_.isFailure) match {
        case Some(Failure(e)) => Left(e)
        case _ =>
          if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done)
          else Right( answers.map(_.get) ++: done )
      }
  }
}

Here's an example of it in action when everything works okay:

这是当一切正常时它的一个例子:

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); println("Fancy meeting you here!") },
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
Fancy meeting you here!
Bye!
res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))

But when something goes wrong:

但是当出现问题时:

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); throw new Exception("boo"); () }, 
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo)

scala> Bye!

回答by Robin Green

For this purpose I would use an Akka actor. Unlike the for-comprehension, it fails as soon as any of the futures fail, so it's a bit more efficient in that sense.

为此,我将使用 Akka 演员。与 for-comprehension 不同,它会在任何期货失败后立即失败,因此从这个意义上说,它的效率更高一些。

class ResultCombiner(futs: Future[_]*) extends Actor {

  var origSender: ActorRef = null
  var futsRemaining: Set[Future[_]] = futs.toSet

  override def receive = {
    case () =>
      origSender = sender
      for(f <- futs)
        f.onComplete(result => self ! if(result.isSuccess) f else false)
    case false =>
      origSender ! SomethingFailed
    case f: Future[_] =>
      futsRemaining -= f
      if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
  }

}

sealed trait Result
case object SomethingFailed extends Result
case object EverythingSucceeded extends Result

Then, create the actor, send a message to it (so that it will know where to send its reply to) and wait for a reply.

然后,创建actor,向它发送一条消息(以便它知道将其回复发送到哪里)并等待回复。

val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
try {
  val f4: Future[Result] = actor ? ()
  implicit val timeout = new Timeout(30 seconds) // or whatever
  Await.result(f4, timeout.duration).asInstanceOf[Result] match {
    case SomethingFailed => println("Oh noes!")
    case EverythingSucceeded => println("It all worked!")
  }
} finally {
  // Avoid memory leaks: destroy the actor
  actor ! PoisonPill
}

回答by lancegatlin

This question has been answered but I am posting my value class solution (value classes were added in 2.10) since there isn't one here. Please feel free to criticize.

这个问题已经得到解答,但我正在发布我的价值类解决方案(价值类是在 2.10 中添加的),因为这里没有。请随意批评。

  implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal {
    def concurrently = ConcurrentFuture(self)
  }
  case class ConcurrentFuture[A](future: Future[A]) extends AnyVal {
    def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future))
    def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class
  }
  def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = {
    val p = Promise[B]()
    val inner = f(outer.future)
    inner.future onFailure { case t => p.tryFailure(t) }
    outer.future onFailure { case t => p.tryFailure(t) }
    inner.future onSuccess { case b => p.trySuccess(b) }
    ConcurrentFuture(p.future)
  }

ConcurrentFuture is a no overhead Future wrapper that changes the default Future map/flatMap from do-this-then-that to combine-all-and-fail-if-any-fail. Usage:

ConcurrentFuture 是一个没有开销的 Future 包装器,它将默认的 Future map/flatMap 从 do-this-then-that 更改为 combine-all-and-fail-if-any-fail。用法:

def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 }
def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" }
def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 }

val f : Future[(Int,String,Double)] = {
  for {
    f1 <- func1.concurrently
    f2 <- func2.concurrently
    f3 <- func3.concurrently
  } yield for {
   v1 <- f1
   v2 <- f2
   v3 <- f3
  } yield (v1,v2,v3)
}.future
f.onFailure { case t => println("future failed $t") }

In the example above, f1,f2 and f3 will run concurrently and if any fail in any order the future of the tuple will fail immediately.

在上面的例子中,f1、f2 和 f3 将同时运行,如果任何顺序失败,元组的未来将立即失败。

回答by JBakouny

You might want to checkout Twitter's Future API. Notably the Future.collect method. It does exactly what you want: https://twitter.github.io/scala_school/finagle.html

您可能想查看 Twitter 的 Future API。特别是 Future.collect 方法。它完全符合您的要求:https: //twitter.github.io/scala_school/finagle.html

The source code Future.scala is available here: https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala

源代码 Future.scala 可在此处获得:https: //github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala

回答by igreenfield

You can use this:

你可以使用这个:

val l = List(1, 6, 8)

val f = l.map{
  i => future {
    println("future " +i)
    Thread.sleep(i* 1000)
    if (i == 12)
      throw new Exception("6 is not legal.")
    i
  }
}

val f1 = Future.sequence(f)

f1 onSuccess{
  case l => {
    logInfo("onSuccess")
    l.foreach(i => {

      logInfo("h : " + i)

    })
  }
}

f1 onFailure{
  case l => {
    logInfo("onFailure")
  }