Scala 超时的未来

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/16615750/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 05:17:57  来源:igfitidea点击:

Future with Timeout in Scala

scalaconcurrencyfuture

提问by Michael

Suppose I have a function, which invokes a blocking interruptibleoperation. I would like to run it asynchronously with a timeout. That is, I would like to interrupt the function when the timeout is expired.

假设我有一个函数,它调用一个阻塞的可中断操作。我想在超时的情况下异步运行它。也就是说,我想在超时到期时中断该功能。

So I am trying to do something like that:

所以我正在尝试做这样的事情:

import scala.util.Try
import scala.concurrent.Future

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() 

  import ExecutionContext.Implicits.global
  Future {Thread.sleep(timeout); aref.get().interrupt} // 1
  Future {aref.set(Thread.currentThread); Try(f())}    // 2
}

The problem is that arefin (1) can be null because (2) has not set it to the current thread yet. In this case I would like to wait until arefis set. What is the best way to do that ?

问题是aref在 (1) 中可以为空,因为 (2) 尚未将其设置为当前线程。在这种情况下,我想等到aref设置。最好的方法是什么?

采纳答案by Rex Kerr

If you add a CountDownLatchyou can achieve the behavior you want. (Note that blocking (i.e. getting stuck at await) in lots and lots of Futures may lead to starvation of thread pools.)

如果你添加一个CountDownLatch你可以实现你想要的行为。(请注意,await大量Futures中的阻塞(即卡在)可能会导致线程池饥饿。)

import scala.util.Try
import scala.concurrent.Future

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]()
  val cdl = new java.util.concurrent.CountDownLatch(1)

  import ExecutionContext.Implicits.global
  Future {Thread.sleep(timeout); cdl.await(); aref.get().interrupt}   // 1
  Future {aref.set(Thread.currentThread); cdl.countDown(); Try(f())}  // 2
}

回答by flavian

You can go for a slightly easier approach using Await. The Await.resultmethod takes timeout duration as a second parameter and throws a TimeoutExceptionon timeout.

您可以使用Await寻求更简单的方法。该Await.result方法将超时持续时间作为第二个参数,并TimeoutException在超时时抛出一个。

try {
  import scala.concurrent.duration._
  Await.result(aref, 10 seconds);
} catch {
    case e: TimeoutException => // whatever you want to do.
}

回答by anshumans

I needed the same behavior as well, so this is how I solved it. I basically created an object that creates a timer and fails the promise with a TimeoutException if the future hasn't completed in the specified duration.

我也需要相同的行为,所以这就是我解决它的方法。我基本上创建了一个创建计时器的对象,如果未来没有在指定的持续时间内完成,则使用 TimeoutException 使承诺失败。

package mypackage

import scala.concurrent.{Promise, Future}
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import scala.concurrent.ExecutionContext.Implicits.global

object TimeoutFuture {

  val actorSystem = ActorSystem("myActorSystem")
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
    val promise = Promise[A]()
    actorSystem.scheduler.scheduleOnce(timeout) {
      promise tryFailure new java.util.concurrent.TimeoutException
    }

    Future {
      try {
        promise success block
      }
      catch {
        case e:Throwable => promise failure e
      } 
    }

    promise.future
  }
}

回答by drexin

Although you already got some answers on how to achieve it with blocking the additional thread to handle the timeout, I would suggest you to try a different way, for the reason Rex Kerr already gave. I don't exactly know, what you are doing in f(), but if it is I/O bound, I would suggest you to just use an asynchronous I/O library instead. If it is some kind of loop, you could pass the timeout value directly into that function and throw a TimeoutExceptionthere, if it exceeds the timeout. Example:

尽管您已经得到了一些关于如何通过阻塞附加线程来处理超时来实现它的答案,但我建议您尝试不同的方法,原因 Rex Kerr 已经给出了。我不完全知道你在做什么f(),但如果它是 I/O 绑定的,我建议你只使用异步 I/O 库。如果它是某种循环,您可以将超时值直接传递给该函数并在TimeoutException那里抛出一个,如果它超过超时。例子:

import scala.concurrent.duration._
import java.util.concurrent.TimeoutException

def doSth(timeout: Deadline) = {
  for {
    i <- 0 to 10
  } yield {
    Thread.sleep(1000)
    if (timeout.isOverdue)
      throw new TimeoutException("Operation timed out.")

    i
  }
}

scala> future { doSth(12.seconds.fromNow) }
res3: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = 
  scala.concurrent.impl.Promise$DefaultPromise@3d104456

scala> Await.result(res3, Duration.Inf)
res6: scala.collection.immutable.IndexedSeq[Int] =
  Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> future { doSth(2.seconds.fromNow) }
res7: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = 
  scala.concurrent.impl.Promise$DefaultPromise@f7dd680

scala> Await.result(res7, Duration.Inf)
java.util.concurrent.TimeoutException: Operation timed out.
    at $anonfun$doSth.apply$mcII$sp(<console>:17)
    at $anonfun$doSth.apply(<console>:13)
    at $anonfun$doSth.apply(<console>:13)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    ...

scala> res7.value
res10: Option[scala.util.Try[scala.collection.immutable.IndexedSeq[Int]]] =
  Some(Failure(java.util.concurrent.TimeoutException: Operation timed out.))

This will only use only 1 thread, that will be terminated after timeout + execution time of a single step.

这将只使用 1 个线程,它将在超时 + 单步执行时间后终止。

回答by cmbaxter

You could also try using a CountDownLatchlike this:

你也可以尝试使用CountDownLatch这样的:

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() 

  import ExecutionContext.Implicits.global
  val latch = new CountDownLatch(1)
  Future {
    latch.await()
    aref.get().interrupt
  }

  Future {
    aref.set(Thread.currentThread) 
    latch.countDown()
    Try(f())
  }
}

Now I'm waiting forever with my call to latch.await(), but you could certainly change that to:

现在我一直在等待我的电话latch.await(),但您当然可以将其更改为:

latch.await(1, TimeUnit.SECONDS)

and then wrap it with a Tryto handle if when/if it times out.

然后用 a 包裹它Try以处理是否超时。