Scala Futures - 内置超时?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/16304471/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 05:14:58  来源:igfitidea点击:

Scala Futures - built in timeout?

scalaconcurrency

提问by LaloInDublin

there is an aspect of futures that I do not exactly understand from the official tutorial ref. http://docs.scala-lang.org/overviews/core/futures.html

从官方教程参考中,我并不完全理解期货的一个方面。http://docs.scala-lang.org/overviews/core/futures.html

Do futures in scala have a built in time-out mechanism of some kind? Let's say the example below was a 5 gigabyte text file... does the implied scope of "Implicits.global" eventually cause onFailure to fire in a non-blocking way or can that be defined? And without a default time-out of some kind, wouldn't that imply it's possible neither success nor failure would ever fire?

scala 中的期货是否具有某种内置的超时机制?假设下面的示例是一个 5 GB 的文本文件......“Implicits.global”的隐含范围最终会导致 onFailure 以非阻塞方式触发还是可以定义?如果没有某种默认超时,这是否意味着成功和失败都不会触发?

import scala.concurrent._
import ExecutionContext.Implicits.global

val firstOccurence: Future[Int] = future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
  case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
  case t => println("Could not process file: " + t.getMessage)
}

回答by cmbaxter

You only get timeout behavior when you use blocking to get the results of the Future. If you want to use the non-blocking callbacks onComplete, onSuccessor onFailure, then you would have to roll your own timeout handling. Akka has built in timeout handling for request/response (?) messaging between actors, but not sure if you want to start using Akka. FWIW, in Akka, for timeout handling, they compose two Futurestogether via Future.firstCompletedOf, one which represents the actual async task and one that represents the timeout. If the timeout timer (via a HashedWheelTimer) pops first, you get a failure on the async callback.

当您使用阻塞来获取Future. 如果要使用非阻塞回调onComplete,onSuccessonFailure,则必须滚动自己的超时处理。Akka 为?参与者之间的请求/响应 ( ) 消息传递内置了超时处理,但不确定您是否要开始使用 Akka。FWIW,在 Akka 中,对于超时处理,它们Futures通过 将两个组合在一起Future.firstCompletedOf,一个代表实际的异步任务,一个代表超时。如果超时计时器(通过 a HashedWheelTimer)首先弹出,则异步回调会失败。

A very simplified example of rolling your own might go something like this. First, an object for scheduling timeouts:

一个非常简单的滚动你自己的例子可能是这样的。首先,一个用于调度超时的对象:

import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException

object TimeoutScheduler{
  val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
  def scheduleTimeout(promise:Promise[_], after:Duration) = {
    timer.newTimeout(new TimerTask{
      def run(timeout:Timeout){              
        promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))        
      }
    }, after.toNanos, TimeUnit.NANOSECONDS)
  }
}

Then a function to take a Future and add timeout behavior to it:

然后是一个函数来获取 Future 并向其添加超时行为:

import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration

def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
  val prom = Promise[T]()
  val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
  val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
  fut onComplete{case result => timeout.cancel()}
  combinedFut
}

Note that the HashedWheelTimerI am using here is from Netty.

请注意,HashedWheelTimer我在这里使用的是来自 Netty。

回答by Pablo Fernandez

I've just created a TimeoutFutureclass for a coworker:

我刚刚TimeoutFuture为同事创建了一个类:

TimeoutFuture

超时未来

package model

import scala.concurrent._
import scala.concurrent.duration._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits._

object TimeoutFuture {
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {

    val prom = promise[A]

    // timeout logic
    Akka.system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future { 
      prom success block
    }

    prom.future
  } 
}

Usage

用法

val future = TimeoutFuture(10 seconds) { 
  // do stuff here
}

future onComplete {
  case Success(stuff) => // use "stuff"
  case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
}

Notes:

笔记:

  • Assumes Play! framework (but it's easy enough to adapt)
  • Every piece of code runs in the same ExecutionContextwhich may not be ideal.
  • 假设播放!框架(但它很容易适应)
  • 每段代码都以相同的方式运行,ExecutionContext这可能并不理想。

回答by justinhj

All of these answers require additional dependencies. I decided to write a version using java.util.Timer which is an efficient way to run a function in the future, in this case to trigger a timeout.

所有这些答案都需要额外的依赖。我决定使用 java.util.Timer 编写一个版本,这是将来运行函数的有效方式,在这种情况下会触发超时。

Blog post with more details here

此处提供更多详细信息的博客文章

Using this with Scala's Promise, we can make a Future with timeout as follows:

将此与 Scala 的 Promise 结合使用,我们可以创建一个带有超时的 Future,如下所示:

package justinhj.concurrency

import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps

object FutureUtil {

  // All Future's that use futureWithTimeout will use the same Timer object
  // it is thread safe and scales to thousands of active timers
  // The true parameter ensures that timeout timers are daemon threads and do not stop
  // the program from shutting down

  val timer: Timer = new Timer(true)

  /**
    * Returns the result of the provided future within the given time or a timeout exception, whichever is first
    * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
    * Thread.sleep would
    * @param future Caller passes a future to execute
    * @param timeout Time before we return a Timeout exception instead of future's outcome
    * @return Future[T]
    */
  def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {

    // Promise will be fulfilled with either the callers Future or the timer task if it times out
    val p = Promise[T]

    // and a Timer task to handle timing out

    val timerTask = new TimerTask() {
      def run() : Unit = {
            p.tryFailure(new TimeoutException())
        }
      }

    // Set the timeout to check in the future
    timer.schedule(timerTask, timeout.toMillis)

    future.map {
      a =>
        if(p.trySuccess(a)) {
          timerTask.cancel()
        }
    }
    .recover {
      case e: Exception =>
        if(p.tryFailure(e)) {
          timerTask.cancel()
        }
    }

    p.future
  }

}

回答by Kir

Play framework contains Promise.timeout so you can write code like following

Play 框架包含 Promise.timeout 因此您可以编写如下代码

private def get(): Future[Option[Boolean]] = {
  val timeoutFuture = Promise.timeout(None, Duration("1s"))
  val mayBeHaveData = Future{
    // do something
    Some(true)
  }

  // if timeout occurred then None will be result of method
  Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
}

回答by Raul

I'm quite surprise this is not standard in Scala. My versions is short and has no dependencies

我很惊讶这在 Scala 中不是标准的。我的版本很短并且没有依赖项

import scala.concurrent.Future

sealed class TimeoutException extends RuntimeException

object FutureTimeout {

  import scala.concurrent.ExecutionContext.Implicits.global

  implicit class FutureTimeoutLike[T](f: Future[T]) {
    def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
      Thread.sleep(ms)
      throw new TimeoutException
    }))

    lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
  }

}

Usage example

使用示例

import FutureTimeout._
Future { /* do smth */ } withTimeout

回答by akauppi

Nobody's mentioned akka-streams, yet. The flows have an easy completionTimeoutmethod, and applying that on a single-source stream works like a Future.

还没有人提到akka-streams。这些流有一个简单的completionTimeout方法,将它应用于单源流就像 Future 一样。

But, akka-streams also does cancellation so it can actually end the source from running, i.e. it signals the timeout to the source.

但是,akka-streams 也会取消,因此它实际上可以结束源的运行,即它向源发出超时信号。

回答by galbarm

If you want the writer (promise holder) to be the one who controls the timeout logic, use akka.pattern.after, in the following way:

如果您希望作者(承诺持有者)成为控制超时逻辑的人,请使用akka.pattern.after,如下所示:

val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during...")))
Future.firstCompletedOf(Seq(promiseRef.future, timeout))

This way, if your promise completion logic never takes place, your caller's future will still be completed at some point with a failure.

这样,如果您的承诺完成逻辑永远不会发生,您的调用者的未来仍然会在某个时刻以失败的方式完成。

回答by gzm0

You can specify the timeout when you wait on the future:

您可以指定等待未来时的超时时间:

For scala.concurrent.Future, the resultmethod lets you specify a timeout.

对于scala.concurrent.Future,该result方法允许您指定超时。

For scala.actors.Future, Futures.awaitAlllets you specify a timeout.

对于scala.actors.FutureFutures.awaitAll让您指定超时。

I do not think there is a timeout built-in the execution of a Future.

我认为 Future 的执行没有内置超时。

回答by u2130573

Monix Taskhas timeout support

MonixTask有超时支持

import monix.execution.Scheduler.Implicits.global
import monix.eval._
import scala.concurrent.duration._
import scala.concurrent.TimeoutException

val source = Task("Hello!").delayExecution(10.seconds)

// Triggers error if the source does not complete in 3 seconds after runOnComplete
val timedOut = source.timeout(3.seconds)

timedOut.runOnComplete(r => println(r))
//=> Failure(TimeoutException)

回答by unveloper

This version works without using any external timer (just Await.result)

此版本无需使用任何外部计时器即可工作(只是 Await.result)

import scala.concurrent._
import scala.concurrent.duration.FiniteDuration

object TimeoutFuture {
    def apply[A](
        timeout: FiniteDuration
    )(block: => A)(implicit executor: ExecutionContext): Future[A] =
        try {
            Future { Await.result(Future { block }, timeout) }
        } catch {
            case _: TimeoutException => Future.failed(new TimeoutException(s"Timed out after ${timeout.toString}"))
        }
}