scala 在 Akka Actors 中阻止调用
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/19944034/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Blocking calls in Akka Actors
提问by Aysu Dogma
As a newbie, I am trying to understand how actors work. And, from the documentation, I think I understand that actors are objects which gets executed in sync mode and also that actor execution can contain blocking/sync method calls, e.g. db requests
作为一个新手,我试图了解演员是如何工作的。而且,从文档中,我想我明白演员是在同步模式下执行的对象,而且演员执行可以包含阻塞/同步方法调用,例如数据库请求
But, what I don't understand is that if you write an actor that has some blocking invocations inside (like a blocking query execution), it will mess up the whole thread pool (in the sense that cpu utilization will go down, etc.), right ? I mean, from my understanding, there is no way for JVM to understand whether it can switch that thread to someone else, if/when the actor makes a blocking call.
但是,我不明白的是,如果你编写一个内部有一些阻塞调用的actor(比如阻塞查询执行),它会弄乱整个线程池(从某种意义上说,cpu 利用率会下降,等等。 ), 对 ?我的意思是,根据我的理解,JVM 无法理解它是否可以将该线程切换到其他人,如果/何时演员进行阻塞调用。
So, given the nature of concurrency, shouldn't it be obvious that Actors should not be doing any blocking calls, ever?
因此,考虑到并发的性质,Actor 不应该进行任何阻塞调用,这难道不是很明显吗?
If that is the case, what is the recommended way of doing a non-blocking/async call, let's say a web service call that fetches something and sends a message to another actor when that request is completed? Should we simply use something like within the actor:
如果是这种情况,那么进行非阻塞/异步调用的推荐方法是什么,假设 Web 服务调用在请求完成时获取某些内容并发送消息给另一个参与者?我们是否应该简单地在 actor 中使用类似的东西:
future map { response => x ! response.body }
未来地图 { 响应 => x !响应体 }
Is this the proper way of handling this?
这是处理这个问题的正确方法吗?
Would appreciate it if you can clarify this for me.
如果您能为我澄清这一点,我将不胜感激。
回答by drexin
It really depends on the use-case. If the queries do not need to be serialized, then you can execute the query in a future and send the results back to the sender as follows:
这真的取决于用例。如果查询不需要序列化,那么您可以在将来执行查询并将结果发送回发送方,如下所示:
import scala.concurrent.{ future, blocking}
import akka.pattern.pipe
val resFut = future {
blocking {
executeQuery()
}
}
resFut pipeTo sender
You could also create a dedicated dispatcher exclusively for the DB calls and use a router for actor creation. This way you can also easily limit the number of concurrent DB requests.
您还可以专门为数据库调用创建一个专用的调度程序,并使用路由器来创建角色。通过这种方式,您还可以轻松限制并发数据库请求的数量。
回答by ya_pulser
Really great intro "The Neophyte's Guide to Scala Part 14: The Actor Approach to Concurrency" http://danielwestheide.com/blog/2013/02/27/the-neophytes-guide-to-scala-part-14-the-actor-approach-to-concurrency.html.
非常棒的介绍“Scala 新手指南第 14 部分:Actor 并发方法” http://danielwestheide.com/blog/2013/02/27/the-neophytes-guide-to-scala-part-14-the- actor-approach-to-concurrency.html。
Actor receives message, wraps blocking code to future, in it's Future.onSuccess method - sends out results using other async messages. But beware that sender variable could change, so close it (make a local reference in the future object).
Actor 接收消息,将阻塞代码包装到未来,在它的 Future.onSuccess 方法中 - 使用其他异步消息发送结果。但是要注意 sender 变量可能会改变,所以关闭它(在未来的对象中做一个本地引用)。
p.s.: The Neophyte's Guide to Scala - really great book.
ps: Scala 新手指南 - 非常棒的书。
Updated: (added sample code)
更新:(添加示例代码)
We have worker and manager. Manager sets work to be done, worker reports "got it" and starts long process ( sleep 1000 ). Meanwhile system pings manager with messages "alive" and manager pings worker with them. When work done - worker notifies manager on it.
我们有工人和经理。经理设置要完成的工作,工人报告“得到它”并开始漫长的过程( sleep 1000 )。与此同时,系统用“活着”的消息向管理器发送消息,而管理器用它们向工作人员发送消息。工作完成后 - 工人通知经理。
NB: execution of sleep 1000 done in imported "default/global" thread pool executor - you can get thread starvation. NB: val commander = sender is needed to "close" a reference to original sender, cause when onSuccess will be executed - current sender within actor could be already set to some other 'sender' ...
注意:在导入的“默认/全局”线程池执行程序中执行 sleep 1000 - 您可能会遇到线程饥饿。注意:需要 valcommander = sender 来“关闭”对原始发件人的引用,因为当 onSuccess 将被执行时 - 演员中的当前发件人可能已经设置为其他“发件人”......
Log:
日志:
01:35:12:632 Humming ...
01:35:12:633 manager: flush sent
01:35:12:633 worker: got command
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:660 worker: started
01:35:12:662 worker: alive
01:35:12:662 manager: resource allocated
01:35:12:662 worker: alive
01:35:12:662 worker: alive
01:35:13:661 worker: done
01:35:13:663 manager: work is done
01:35:17:633 Shutdown!
Code:
代码:
import akka.actor.{Props, ActorSystem, ActorRef, Actor}
import com.typesafe.config.ConfigFactory
import java.text.SimpleDateFormat
import java.util.Date
import scala.concurrent._
import ExecutionContext.Implicits.global
object Sample {
private val fmt = new SimpleDateFormat("HH:mm:ss:SSS")
def printWithTime(msg: String) = {
println(fmt.format(new Date()) + " " + msg)
}
class WorkerActor extends Actor {
protected def receive = {
case "now" =>
val commander = sender
printWithTime("worker: got command")
future {
printWithTime("worker: started")
Thread.sleep(1000)
printWithTime("worker: done")
}(ExecutionContext.Implicits.global) onSuccess {
// here commander = original sender who requested the start of the future
case _ => commander ! "done"
}
commander ! "working"
case "alive?" =>
printWithTime("worker: alive")
}
}
class ManagerActor(worker: ActorRef) extends Actor {
protected def receive = {
case "do" =>
worker ! "now"
printWithTime("manager: flush sent")
case "working" =>
printWithTime("manager: resource allocated")
case "done" =>
printWithTime("manager: work is done")
case "alive?" =>
printWithTime("manager alive")
worker ! "alive?"
}
}
def main(args: Array[String]) {
val config = ConfigFactory.parseString("" +
"akka.loglevel=DEBUG\n" +
"akka.debug.lifecycle=on\n" +
"akka.debug.receive=on\n" +
"akka.debug.event-stream=on\n" +
"akka.debug.unhandled=on\n" +
""
)
val system = ActorSystem("mine", config)
val actor1 = system.actorOf(Props[WorkerActor], "worker")
val actor2 = system.actorOf(Props(new ManagerActor(actor1)), "manager")
actor2 ! "do"
actor2 ! "alive?"
actor2 ! "alive?"
actor2 ! "alive?"
printWithTime("Humming ...")
Thread.sleep(5000)
printWithTime("Shutdown!")
system.shutdown()
}
}
回答by theon
You are right to be thinking about the Thread Pool if you are considering doing blocking calls in Akka. The more blocking you do, the larger the Thread Pool you will need. A completely Non-Blocking system only really needs a pool of threads equal to the number of CPU cores of your machine. The reference configuration uses a pool of 3 times the number of CPU cores on the machine to allow for some blocking:
如果您正在考虑在 Akka 中进行阻塞调用,那么考虑线程池是正确的。您执行的阻塞越多,您需要的线程池就越大。一个完全非阻塞的系统只需要一个线程池,它的数量等于你机器的 CPU 内核数。参考配置使用 3 倍于机器上 CPU 内核数量的池来允许一些阻塞:
# The core pool size factor is used to determine thread pool core size
# using the following formula: ceil(available processors * factor).
# Resulting size is then bounded by the core-pool-size-min and
# core-pool-size-max values.
core-pool-size-factor = 3.0
But you might want to increase akka.default-dispatcher.fork-join-executor.core-pool-size-factorto a higher number if you do more blocking, or make a dispatcher other than the default specifically for blocking calls with a higher fork-join-executor.core-pool-size-factor
但是,akka.default-dispatcher.fork-join-executor.core-pool-size-factor如果您执行更多阻塞操作,您可能希望增加到更高的数字,或者使用默认值以外的调度程序专门用于阻塞更高级别的呼叫fork-join-executor.core-pool-size-factor
WRT what is the best way to do blocking calls in Akka. I would recommend scaling out by making multiple instances of the actors that do blocking calls and putting a routerinfront of them to make them look like a single actor to the rest of your application.
WRT 在 Akka 中阻止调用的最佳方法是什么。我建议通过创建多个执行阻塞调用的actor 实例并在它们前面放置一个路由器来进行扩展,使它们看起来像应用程序其余部分的单个actor。

