scala 如何在scala中顺序执行期货

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/20414500/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 05:55:32  来源:igfitidea点击:

how to do sequential execution of Futures in scala

scala

提问by user776635

I have this scenario where I need to use an iterator, for each of the item a function f(item) is called and returns a Future[Unit].

我有一个需要使用迭代器的场景,对于每个项目,调用函数 f(item) 并返回一个Future[Unit].

However, I need to make it that each f(item)call is executed sequentially, they can not run in parallel.

但是,我需要确保每个f(item)调用都是按顺序执行的,它们不能并行运行。

for(item <- it)
  f(item)

won't work becuase this starts all the calls in parallel.

不会工作,因为这会并行启动所有调用。

How do I do it so they follow in sequence?

我该怎么做才能让他们按顺序进行?

回答by Glen Best

If you don't mind a very localised var, you can serialise the asynchronous processing (each f(item)) as follows (flatMapdoes the serialization):

如果您不介意非常本地化var,则可以f(item)按如下方式序列化异步处理(每个)(flatMap是否进行序列化):

val fSerialized = {
  var fAccum = Future{()}
  for(item <- it) {
    println(s"Processing ${item}")
    fAccum = fAccum flatMap { _ => f(item) }
  }
  fAccum
}

fSerialized.onComplete{case resTry => println("All Done.")}

In general, avoid Awaitoperations - they block (kind of defeats the point of async, consumes resources and for sloppy designs, can deadlock)

一般来说,避免Await操作 - 它们会阻塞(有点破坏异步点,消耗资源,对于草率的设计,可能会死锁)



Cool Trick 1:

酷招1:

You can chain together Futuresvia that usual suspect, flatmap- it serializes asynchronous operations. Is there anything it can't do? ;-)

您可以Futures通过通常的嫌疑人链接在一起flatmap- 它序列化异步操作。有什么不能做的吗?;-)

def f1 = Future { // some background running logic here...}
def f2 = Future { // other background running logic here...}

val fSerialized: Future[Unit] = f1 flatMap(res1 => f2)  

fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}

None of the above blocks - the main thread runs straight through in a few dozen nanoseconds. Futures are used in all cases to execute parallel threads and keep track of asynchronous state/results and to chain logic.

以上都没有块 - 主线程在几十纳秒内直接运行。在所有情况下都使用期货来执行并行线程并跟踪异步状态/结果以及链接逻辑。

fSerializedrepresents a composite of two different asynchronous operations chained together. As soon as the val is evaluated, it immediately starts f1(running asynchonously). f1runs like any Future- when it eventually finishes, it calls it's onCompletecallback block. Here's the cool bit - flatMapinstalls it's argument as the f1onComplete callback block - so f2is initiated as soon as f1completes, with no blocking, polling or wasteful resource usage. When f2is complete, then fSerializedis complete - so it runs the fSerialized.onCompletecallback block - printing "Both Done".

fSerialized表示链接在一起的两个不同异步操作的组合。评估 val 后,它会立即启动f1(异步运行)。 f1像任何一样运行Future- 当它最终完成时,它调用它的onComplete回调块。这是很酷的一点 -flatMap将它的参数安装为f1onComplete 回调块 - 因此f2f1完成后立即启动,没有阻塞、轮询或浪费资源使用。当f2完成时,则fSerialized完成 - 所以它运行fSerialized.onComplete回调块 - 打印“Both Done”。

Not only that, but you can chain flatmaps as much as you like with neat non-spaghetti code

不仅如此,您还可以使用整洁的非意大利面代码将平面图链接到任意数量

 f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...

If you were to do that via Future.onComplete, you would have to embed the successive operations as nested onComplete layers:

如果要通过 Future.onComplete 执行此操作,则必须将连续操作嵌入为嵌套的 onComplete 层:

f1.onComplete{case res1Try => 
  f2
  f2.onComplete{case res2Try =>
    f3
    f3.onComplete{case res3Try =>
      f4
      f4.onComplete{ ...
      }
    }
  }
}

Not as nice.

没那么好。

Test to prove:

测试证明:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8))

fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}

Cool Trick 2:

酷招2:

for-comprehensions like this:

像这样的理解:

for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr

are nothing but syntactic-sugar for this:

只不过是语法糖:

aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }

that's a chain of flatMaps, followed by a final map.

这是一个 flatMap 链,然后是最终的地图。

That means that

这意味着

f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")

is identical to

等同于

for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"

Test to Prove (following on from previous test):

测试证明(继之前的测试之后):

val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!"
fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}

Not-So-Cool Trick 3:

不那么酷的技巧 3:

Unfortunately you can't mix iterators & futures in the same for-comprehension. Compile error:

不幸的是,您不能在同一个 for-comprehension 中混合使用迭代器和期货。编译错误:

val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last

And nesting fors creates a challenge. The following doesn't serialize, but runs async blocks in parallel (nested comprehensions don't chain subsequent Futures with flatMap/Map, but instead chains as Iterable.flatMap{item => f(item)} - not the same!)

嵌套fors 带来了挑战。以下不序列化,但并行运行异步块(嵌套理解不会将后续 Future 与 flatMap/Map 链接起来,而是将其链接为 Iterable.flatMap{item => f(item)} - 不一样!)

val fSerial = {for {nextItem <- itemIterable} yield
                 for {nextRes <- f(nextItem)} yield "Did It"}.last

Also using foldLeft/foldRight plus flatMap doesn't work as you'd expect - seems a bug/limitation; all async blocks are processed in parallel (so Iterator.foldLeft/Rightis not sociable with Future.flatMap):

同样使用 foldLeft/foldRight 和 flatMap 也不能像你期望的那样工作 - 似乎是一个错误/限制;所有异步块都是并行处理的(因此Iterator.foldLeft/Right无法与 交流Future.flatMap):

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
val empty = Future[Unit]{()}
def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2)

//val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))}
val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))}

fSerialized.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}

But this works (var involved):

但这有效(涉及var):

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)

var fSerial = Future{()}
for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem)) 

回答by wingedsubmariner

def seqFutures[T, U](items: TraversableOnce[T])(yourfunction: T => Future[U]): Future[List[U]] = {
  items.foldLeft(Future.successful[List[U]](Nil)) {
    (f, item) => f.flatMap {
      x => yourfunction(item).map(_ :: x)
    }
  } map (_.reverse)
}

If you are running sequentially because resource constraints prevent running more than one Futureat a time, it may be easier to create and use a custom ExecutionContextwith only a single thread.

如果由于资源限制导致一次运行多个,而按顺序运行Future,则创建和使用ExecutionContext仅具有单个线程的自定义可能会更容易。

回答by Somatik

An other option is using Akka Streams:

另一种选择是使用 Akka Streams:

val doneFuture = Source
  .fromIterator(() => it)
  .mapAsync(parallelism = 1)(f)
  .runForeach{identity}

回答by borice

Just expanding on @wingedsubmariner's answer since the .reverseat the end was bugging me (and added import statements for completeness)

只是扩展了@wingedsubmariner 的答案,因为.reverse最后让我烦恼(并添加了导入语句以确保完整性)

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}

def seqFutures[T, U](xs: TraversableOnce[T])(f: T => Future[U])
                    (implicit ec: ExecutionContext): Future[List[U]] = {
  val resBase = Future.successful(mutable.ListBuffer.empty[U])
  xs
    .foldLeft(resBase) { (futureRes, x) =>
      futureRes.flatMap {
        res => f(x).map(res += _)
      }
    }
    .map(_.toList)
}

Note:ListBufferhas constant time +=and .toListoperations

注意:ListBuffer具有恒定的时间+=.toList操作

回答by LuisKarlos

This code shows you how to run futures in sequence using a simple promiseto accomplish it.

这段代码向您展示了如何使用一个简单的承诺来按顺序运行期货来完成它。

The code contains two sequencers one executes the work one by one, the other allow you to specify how many to run at the same time.

代码包含两个排序器,一个一个接一个执行工作,另一个允许您指定同时运行多少个。

Exceptions are not manage to keep it simple.

例外无法保持简单。

import scala.concurrent.{Await, Future, Promise}
import scala.util.Try
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration

/**
  * Simple class to encapsulate work, the important element here is the future
  * you can ignore the rest
  */
case class Work(id:String, workTime:Long = 100) {
  def doWork(): Future[String] = Future {
    println(s"Starting $id")
    Thread.sleep(workTime)
    println(s"End $id")
    s"$id ready"
  }
}

/**
  * SimpleSequencer is the one by one execution, the promise is the element
  * who allow to the sequencer to work, pay attention to it.
  *
  * Exceptions are ignore, this is not production code
  */
object SimpleSequencer {
  private def sequence(works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = {
    works match {
      case Nil => p.tryComplete(Try(results))
      case work::tail => work.doWork() map {
        result => sequence(tail, results :+ result, p)
      }
    }
  }

  def sequence(works:Seq[Work]) : Future[Seq[String]] = {
    val p = Promise[Seq[String]]()
    sequence(works, Seq.empty, p)
    p.future
  }
}

/**
  * MultiSequencer fire N works at the same time
  */
object MultiSequencer {
  private def sequence(parallel:Int, works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = {
    works match {
      case Nil => p.tryComplete(Try(results))
      case work =>
        val parallelWorks: Seq[Future[String]] = works.take(parallel).map(_.doWork())
        Future.sequence(parallelWorks) map {
          result => sequence(parallel, works.drop(parallel), results ++ result, p)
        }
    }
  }

  def sequence(parallel:Int, works:Seq[Work]) : Future[Seq[String]] = {
    val p = Promise[Seq[String]]()
    sequence(parallel, works, Seq.empty, p)
    p.future
  }

}


object Sequencer {

  def main(args: Array[String]): Unit = {
    val works = Seq.range(1, 10).map(id => Work(s"w$id"))
    val p = Promise[Unit]()

    val f = MultiSequencer.sequence(4, works) map {
      resultFromMulti =>
        println(s"MultiSequencer Results: $resultFromMulti")
        SimpleSequencer.sequence(works) map {
          resultsFromSimple =>
            println(s"MultiSequencer Results: $resultsFromSimple")
            p.complete(Try[Unit]())
        }
    }

    Await.ready(p.future, Duration.Inf)
  }
}

回答by Ori Cohen

Perhaps a more elegant solution would be to use recursion like detailed below.

也许更优雅的解决方案是使用如下详述的递归。

This can be used as an example for a long operation returning a Future:

这可以用作返回 Future 的长操作的示例:

def longOperation(strToReturn: String): Future[String] = Future {
  Thread.sleep(5000)
  strToReturn
}

The following is the recursive function that traverses through the items to processed, and processes them in sequence:

下面是递归函数,遍历要处理的项,依次处理:

def processItems(strToReturn: Seq[String]): Unit = strToReturn match {
  case x :: xs => longOperation(x).onComplete {
    case Success(str) =>
      println("Got: " + str)
      processItems(xs)
    case Failure(_) =>
      println("Something went wrong")
      processItems(xs)
  }
  case Nil => println("Done")
}

This is done by having the function recursively calling itself with the remaining items to process once the Future has either completed or failed.

这是通过让函数递归调用自身来完成的,一旦 Future 完成或失败,剩余的项目就会被处理。

To start this activity you call the 'processItems' function with a few items to process, like so:

要开始此活动,您可以调用“processItems”函数并处理一些要处理的项目,如下所示:

processItems(Seq("item1", "item2", "item3"))

回答by Stefan Kunze

you can use Await.result : (code untested)

你可以使用 Await.result :(代码未经测试)

"Await: singleton object used for blocking on a future (transferring its result to the current thread)."

“等待:用于阻塞未来的单例对象(将其结果传输到当前线程)。”

val result  = item map {it => Await.result(f(it), Duration.Inf) }