Scala 等待期货序列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29344430/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:01:25  来源:igfitidea点击:

Scala waiting for sequence of futures

scalafuture

提问by matanster

I was hoping code like follows would wait for both futures, but it does not.

我希望像下面这样的代码会等待两个期货,但事实并非如此。

object Fiddle {
  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2) // in the general case, this would be a dynamically sized list

  val seq = Future.sequence(lf) 

  seq.onComplete {
    _ => lf.foreach(f => println(f.isCompleted))
  }
}

val a = FuturesSequence

I assumed seq.onCompletewould wait for them all to complete before completing itself, but not so; it results in:

我认为seq.onComplete在完成之前会等待它们全部完成,但事实并非如此;结果是:

true
false

.sequencewas a bit hard to follow in the source of scala.concurrent.Future, I wonder how I would implement a parallel that waits for all original futures of a (dynamically sized) sequence, or what might be the problem here.

.sequence在 scala.concurrent.Future 的源代码中有点难以理解,我想知道我将如何实现一个等待(动态大小的)序列的所有原始期货的并行,或者这里可能存在什么问题。

Edit:A related question: https://worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future:)

编辑:一个相关问题:https: //worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future:)

回答by Travis Brown

One common approach to waiting for all results (failed or not) is to "lift" failures into a new representation inside the future, so that all futures complete with some result (although they may complete with a result that represents failure). One natural way to get that is lifting to a Try.

等待所有结果(失败与否)的一种常见方法是将失败“提升”到未来内部的新表示中,以便所有期货以某个结果完成(尽管它们可能以表示失败的结果完成)。获得它的一种自然方法是提升到Try.

Twitter's implementation of futuresprovides a liftToTrymethod that makes this trivial, but you can do something similar with the standard library's implementation:

Twitter 的 futures 实现提供了liftToTry一种使这变得微不足道的方法,但您可以对标准库的实现做类似的事情:

import scala.util.{ Failure, Success, Try }

val lifted: List[Future[Try[Int]]] = List(f1, f2).map(
  _.map(Success(_)).recover { case t => Failure(t) }
)

Now Future.sequence(lifted)will be completed when every future is completed, and will represent successes and failures using Try.

现在Future.sequence(lifted)将在每个未来完成时完成,并使用Try.

And so, a generic solution for waiting on all original futures of a sequence of futures may look as follows, assuming an execution context is of course implicitly available.

因此,假设执行上下文当然是隐式可用的,用于等待期货序列的所有原始期货的通用解决方案可能如下所示。

  import scala.util.{ Failure, Success, Try }

  private def lift[T](futures: Seq[Future[T]]) = 
    futures.map(_.map { Success(_) }.recover { case t => Failure(t) })

  def waitAll[T](futures: Seq[Future[T]]) =
    Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used

  waitAll(SeqOfFutures).map { 
    // do whatever with the completed futures
  }

回答by Ionu? G. Stan

A Futureproduced by Future.sequencecompletes when either:

在以下任一情况下Future生成的AFuture.sequence完成:

  • all the futures have completed successfully, or
  • one of the futures has failed
  • 所有期货均已成功完成,或
  • 其中一个期货失败了

The second point is what's happening in your case, and it makes sense to complete as soon as one of the wrapped Futurehas failed, because the wrapping Futurecan only hold a single Throwablein the failure case. There's no point in waiting for the other futures because the result will be the same failure.

第二点是您的案例中发生的情况,一旦包装Future中的一个失败就完成是有意义的,因为在失败的情况下包装Future只能容纳一个Throwable。等待其他期货是没有意义的,因为结果将是同样的失败。

回答by Jason Smith

This is an example that supports the previous answer. There is an easy way to do this using just the standard Scala APIs.

这是一个支持先前答案的示例。有一种简单的方法可以使用标准的 Scala API 来做到这一点。

In the example, I am creating 3 futures. These will complete at 5, 7, and 9 seconds respectively. The call to Await.resultwill block until all futures have resolved. Once all 3 futures have completed, awill be set to List(5,7,9)and execution will continue.

在示例中,我创建了 3 个期货。这些将分别在 5、7 和 9 秒完成。调用Await.result将阻塞,直到所有期货都已解决。一旦所有 3 个期货都完成,a将设置为List(5,7,9)并且执行将继续。

Additionally, if an exception is thrown in any of the futures, Await.resultwill immediately unblock and throw the exception. Uncomment the Exception(...)line to see this in action.

此外,如果在任何期货中Await.result抛出异常,将立即解除阻塞并抛出异常。取消注释该Exception(...)行以查看此操作。

  try {
    val a = Await.result(Future.sequence(Seq(
      Future({
        blocking {
          Thread.sleep(5000)
        }
        System.err.println("A")
        5
      }),
      Future({
        blocking {
          Thread.sleep(7000)
        }
        System.err.println("B")
        7
        //throw new Exception("Ha!")
      }),
      Future({
        blocking {
          Thread.sleep(9000)
        }
        System.err.println("C")
        9
      }))),
      Duration("100 sec"))

    System.err.println(a)
  } catch {
    case e: Exception ?
      e.printStackTrace()
  }

回答by Bruno

We can enrich Seq[Future[T]]with its own onCompletemethod through an implicit class:

我们可以通过一个隐式类来丰富Seq[Future[T]]它自己的onComplete方法:

  def lift[T](f: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] =
    f map { Success(_) } recover { case e => Failure(e) }

  def lift[T](fs: Seq[Future[T]])(implicit ec: ExecutionContext): Seq[Future[Try[T]]] =
    fs map { lift(_) }

  implicit class RichSeqFuture[+T](val fs: Seq[Future[T]]) extends AnyVal {
    def onComplete[U](f: Seq[Try[T]] => U)(implicit ec: ExecutionContext) = {
      Future.sequence(lift(fs)) onComplete {
        case Success(s) => f(s)
        case Failure(e) => throw e // will never happen, because of the Try lifting
      }
    }
  }

Then, in your particular MWE, you can do:

然后,在您的特定 MWE 中,您可以执行以下操作:

  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2)

  lf onComplete { _ map {
    case Success(v) => ???
    case Failure(e) => ???
  }}

This solution has the advantage of allowing you to call an onCompleteon a sequence of futures as you would on a single future.

此解决方案的优点是允许您像调用onComplete单个期货一样对一系列期货进行调用。

回答by Nikunj Kakadiya

    Even though it is quite old question But this is how I got it running in recent time.

    object Fiddle {
      val f1 = Future {
        throw new Throwable("baaa") // emulating a future that bumped into an exception
      }

      val f2 = Future {
        Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
        2
      }

      val lf = List(f1, f2) // in the general case, this would be a dynamically sized list

      val seq = Future.sequence(lf) 
      import scala.concurrent.duration._
      Await.result(seq, Duration.Inf) 
    }

This won't get completed and will wait till all the future gets completed. You can change the waiting time as per your use case. I have kept it to infinite and that was required in my case.

这不会完成,将等到所有未来完成。您可以根据您的用例更改等待时间。我将它保持为无限,这在我的情况下是必需的。

回答by Narcoleptic Snowman

Create the Future with a Try to avoid extra hoops.

创造未来,尽量避免额外的箍。

implicit val ec = ExecutionContext.global

val f1 = Future {
  Try {
    throw new Throwable("kaboom")
  }
}

val f2 = Future {
  Try {
    Thread.sleep(1000L)
    2
  }
}

Await.result(
  Future.sequence(Seq(f1, f2)), Duration("2 sec")
) foreach {
  case Success(res) => println(s"Success. $res")
  case Failure(e)   => println(s"Failure. ${e.getMessage}")
}