在 Spring 中使用什么样的“EventBus”?内置,Reactor,Akka?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/20663988/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-08 06:25:33  来源:igfitidea点击:

What kind of "EventBus" to use in Spring? Built-in, Reactor, Akka?

multithreadingspringakkaevent-driven-designproject-reactor

提问by Benjamin M

We're going to start a new Spring 4 application in a few weeks. And we'd like to use some event-driven architecture. This year I read here and there about "Reactor" and while looking for it on the web, I stumbled upon "Akka".

我们将在几周内启动一个新的 Spring 4 应用程序。我们想使用一些事件驱动的架构。今年我到处阅读有关“Reactor”的文章,在网上寻找它时,我偶然发现了“Akka”。

So for now we have 3 choices:

所以现在我们有3个选择:

I couldn't find a real comparison of those.

我找不到这些的真正比较。



For now we just need something like:

现在我们只需要这样的东西:

  • Xregisters to listen for Event E
  • Yregisters to listen for Event E
  • Zsends an Event E
  • X注册监听 Event E
  • Y注册监听 Event E
  • Z发送一个 Event E

And then Xand Ywill receive and handle the event.

然后XandY将接收并处理该事件。

We will most likely use this in a async way, but for sure there will be also some synchronous scenarios. And we most likely send always a class as event. (The Reactor samples mostly make use of Strings and String patterns, but it also supports Objects).

我们很可能会以异步方式使用它,但肯定也会有一些同步场景。我们很可能总是发送一个类作为事件。(Reactor 示例主要使用字符串和字符串模式,但它也支持对象)。



As far as I understood, ApplicationEventworks synchronous by default and Reactorworks the async way. And Reactoralso allows to use the await()method to make it kinda synchronous. Akkaprovides more or less the same as Reactor, but also supports Remoting.

据我了解,ApplicationEvent默认情况下同步Reactor工作并以异步方式工作。并且Reactor还允许使用该await()方法使其有点同步。Akka提供或多或少与 相同Reactor,但也支持远程处理。

Concerning Reactor's await()method: Can it wait for multiple threads to complete? Or maybe even a partial set of those threads? If we take the example from above:

关于 Reactor 的await()方法:它可以等待多个线程完成吗?或者甚至是这些线程的一部分?如果我们以上面的例子为例:

  • Xregisters to listen for Event E
  • Yregisters to listen for Event E
  • Zsends an Event E
  • X注册监听 Event E
  • Y注册监听 Event E
  • Z发送一个 Event E

Is it possible to make it synchronous, by saying: Wait for XandYto complete. And is it possible to make it wait just for X, but not for Y?

是否可以通过说:等待XY完成来使其同步。是否有可能让它只等待X,而不是等待Y



Maybe there are also some alternatives? What about for example JMS?

也许还有一些选择?例如 JMS 呢?

Lot of questions, but hopefully you can provide some answers!

问题比较多,希望大家多多解答!

Thank you!

谢谢!



EDIT: Example use cases

编辑:示例用例

  1. When a specific event gets fired, I'd like to create 10000 emails. Every email has to get generated with user specific content. So I'd create a lot of threads (max = system cpu cores) which create the mails and do not block the caller thread, 'cause this can take some minutes.

  2. When a specific event gets fired, I'd like to collect information from an unknown number of services. Each fetch takes about 100ms. Here I could imagine using Reactor's await, 'cause I need those information for continuing my work in the main thread.

  3. When a specific event gets fired, I'd like to perform some operations based on application configuration. So the application must be able to dynamically (un)register comsumers/event handlers. They'll do their own stuff with the Event and I don't care. So I would create a thread for every of those handlers and just continue doing my work in the main thread.

  4. Simple decoupling: I basically know all receivers, but I just don't want to call every receiver in my code. This should mostly get done synchronously.

  1. 当特定事件被触发时,我想创建 10000 封电子邮件。每封电子邮件都必须生成用户特定的内容。所以我会创建很多线程(max = system cpu cores)来创建邮件并且不会阻塞调用者线程,因为这可能需要几分钟。

  2. 当特定事件被触发时,我想从未知数量的服务中收集信息。每次获取大约需要 100 毫秒。在这里我可以想象使用 Reactor 的await,因为我需要这些信息来继续我在主线程中的工作。

  3. 当特定事件被触发时,我想根据应用程序配置执行一些操作。因此,应用程序必须能够动态(取消)注册消费者/事件处理程序。他们会用事件做自己的事情,我不在乎。所以我会为每个处理程序创建一个线程,然后继续在主线程中完成我的工作。

  4. 简单解耦:我基本上知道所有接收器,但我只是不想在我的代码中调用每个接收器。这应该主要是同步完成的。

Sound like I need a ThreadPool or a RingBuffer. Do those frameworks have dynamic RingBuffers, which grow in size if needed?

听起来我需要一个 ThreadPool 或一个 RingBuffer。这些框架是否有动态的 RingBuffers,如果需要,它的大小会增加?

采纳答案by Jon Brisbin

I'm not sure I can adequately answer your question in this small space. But I'll give it a shot! :)

我不确定我能否在这个狭小的空间内充分回答您的问题。但我要试一试!:)

Spring's ApplicationEventsystem and Reactor are really quite distinct as far as functionality goes. ApplicationEventrouting is based on the type handled by the ApplicationListener. Anything more complicated than that and you'll have to implement the logic yourself (that's not necessarily a bad thing, though). Reactor, however, provides a comprehensive routing layer that is also very lightweight and completely extensible. Any similarity in function between the two ends at their ability to subscribe and publish events, which is really a feature of any event-driven system. Also don't forget the new spring-messagingmodule out with Spring 4. It's a subset of the tools available in Spring Integration and also provides abstractions for building around an event-driven architecture.

就功能而言,Spring 的ApplicationEvent系统和 Reactor 确实非常不同。ApplicationEvent路由基于ApplicationListener. 任何比这更复杂的事情,您都必须自己实现逻辑(不过,这不一定是件坏事)。然而,Reactor 提供了一个全面的路由层,它也是非常轻量级且完全可扩展的。两者在功能上的任何相似之处都在于它们订阅和发布事件的能力,这实际上是任何事件驱动系统的特征。也不要忘记spring-messagingSpring 4 中的新模块。它是 Spring Integration 中可用工具的一个子集,还提供了围绕事件驱动架构构建的抽象。

Reactor will help you solve a couple key problems that you would otherwise have to manage yourself:

Reactor 将帮助您解决一些您必须自己解决的关键问题:

Selector matching: Reactor does Selectormatching, which encompasses a range of matches--from a simple .equals(Object other)call, to a more complex URI templating match which allows for placeholder extraction. You can also extend the built-in selectors with your own custom logic so you can use rich objects as notification keys (like domain objects, for instance).

选择器匹配:Reactor 进行Selector匹配,包括一系列匹配——从简单的.equals(Object other)调用到更复杂的 URI 模板匹配,它允许提取占位符。您还可以使用您自己的自定义逻辑扩展内置选择器,以便您可以使用丰富的对象作为通知键(例如域对象)。

Stream and Promise APIs: You mentioned the PromiseAPI already with reference to the .await()method, which is really meant for existing code that expects blocking behavior. When writing new code using Reactor, it can't be stressed highly enough to use compositions and callbacks to effectively utilize system resources by not blocking threads. Blocking the caller is almost never a good idea in an architecture that depends on a small number of threads to execute a large volume of tasks. Futures are simply not cloud-scalable, which is why modern applications leverage alternative solutions.

Stream 和 Promise API:您Promise已经在参考.await()方法时提到了API ,这实际上是针对需要阻塞行为的现有代码。使用 Reactor 编写新代码时,不能强调使用组合和回调通过不阻塞线程来有效利用系统资源。在依赖少量线程来执行大量任务的架构中,阻塞调用者几乎从来都不是一个好主意。Futures 根本不是云可扩展的,这就是现代应用程序利用替代解决方案的原因。

Your application could be architected with Streams or Promises either one, though honestly, I think you'll find the Streammore flexible. The key benefit is the composability of the API, which allows you to wire actions together in a dependency chain without blocking. As a completely off-the-cuff example based on your email use-case you describe:

您的应用程序可以使用 Streams 或 Promises 构建,但老实说,我认为您会发现Stream更灵活。关键的好处是 API 的可组合性,它允许您在依赖链中将操作连接在一起而不会阻塞。作为基于您的电子邮件用例的完全现成的示例,您描述了:

@Autowired
Environment env;
@Autowired
SmtpClient client;

// Using a ThreadPoolDispatcher
Deferred<DomainObject, Stream<DomainObject>> input = Streams.defer(env, THREAD_POOL);

input.compose()
  .map(new Function<DomainObject, EmailTemplate>() {
    public EmailTemplate apply(DomainObject in) {
      // generate the email
      return new EmailTemplate(in);
    }
  })
  .consume(new Consumer<EmailTemplate>() {
    public void accept(EmailTemplate email) {
      // send the email
      client.send(email);
    }
  });

// Publish input into Deferred
DomainObject obj = reader.readNext();
if(null != obj) {
  input.accept(obj);
}

Reactor also provides the Boundarywhich is basically a CountDownLatchfor blocking on arbitrary consumers (so you don't have to construct a Promiseif all you want to do is block for a Consumercompletion). You could use a raw Reactorin that case and use the on()and notify()methods to trigger the service status checking.

Reactor 还提供了Boundary,它基本上是CountDownLatch用于阻塞任意消费者的(因此,Promise如果您想做的只是阻塞Consumer完成,则不必构造一个)。Reactor在这种情况下,您可以使用 raw并使用on()notify()方法来触发服务状态检查。

For some things, however, it seems like what you want is a Futurereturned from an ExecutorService, no? Why not just keep things simple? Reactor will only be of real benefit in situations where your throughput performance and overhead effeciency is important. If you're blocking the calling thread, then you're likely going to be wiping away the effeciency gains that Reactor will give you anyway, so you might be better off in that case using a more traditional toolset.

但是,对于某些事情,您似乎想要的是从Future返回ExecutorService,不是吗?为什么不让事情简单化?Reactor 只会在您的吞吐量性能和开销效率很重要的情况下真正受益。如果您阻塞了调用线程,那么您很可能会抹去 Reactor 无论如何都会给您带来的效率提升,因此在这种情况下,使用更传统的工具集可能会更好。

The nice thing about the openness of Reactor is that there's nothing stopping the two from interacting. You can freely mix Futureswith Consumerswithout static. In that case, just keep in mind that you're only ever going to be as fast as your slowest component.

Reactor 开放性的好处是没有什么能阻止两者的交互。您可以FuturesConsumers没有静电的情况下自由混合。在这种情况下,请记住,您的速度只会与最慢的组件一样快。

回答by Adam Gent

Lets ignore the Spring's ApplicationEventas it really is not designed for what your asking (its more about bean lifecycle management).

让我们忽略 Spring 的,ApplicationEvent因为它确实不是为您的要求而设计的(更多的是关于 bean 生命周期管理)。

What you need to figure out is if you want do it

你需要弄清楚的是你是否愿意这样做

  1. the object oriented way (ie actors, dynamic consumers, registered on the fly) OR
  2. the service way (static consumers, registered on startup).
  1. 面向对象的方式(即演员、动态消费者、即时注册)
  2. 服务方式(静态消费者,在启动时注册)。

Using your example of Xand Yare they:

使用您的例子XY他们是:

  1. ephemeral instances (1) or are they
  2. long lived singletons/service objects (2)?
  1. 短暂的实例 (1) 或者它们是
  2. 长寿单身人士/服务对象(2)?

If you need to register consumers on the fly than Akka is a good choice (I'm not sure about reactor as I have never used it). If you don't want to do your consuming in ephemeral objects than you can use JMS or AMQP.

如果您需要即时注册消费者,Akka 是一个不错的选择(我不确定 reactor,因为我从未使用过它)。如果您不想在临时对象中进行消费,则可以使用 JMS 或 AMQP。

You also need to understand that these kind of libraries are trying to solve two problems:

您还需要了解这些类型的库试图解决两个问题:

  1. Concurrency (ie doing things in parallel on the same machine)
  2. Distribution (ie doing things in parallel on multiple machines)
  1. 并发(即在同一台机器上并行做事)
  2. 分发(即在多台机器上并行做事)

Reactor and Akka are mainly focused on #1. Akka just recently added cluster support and the actor abstraction makes it easier to do #2. Message Queues (JMS, AMQP) are focused on #2.

Reactor 和 Akka 主要关注#1。Akka 最近刚刚添加了集群支持,并且actor 抽象使得 #2 变得更容易。消息队列(JMS、AMQP)专注于#2。

For my own work I do the service route and use a heavily modified Guava EventBus and RabbitMQ. I use annotations similar to the Guava Eventbusbut also have annotations for the objects sent on the bus however you can just use Guava's EventBus in Async mode as a POC and then make your own like I did.

对于我自己的工作,我执行服务路由并使用经过大量修改的 Guava EventBus 和 RabbitMQ。我使用类似于Guava Eventbus 的注释,但也有对总线上发送的对象的注释,但是您可以在异步模式下使用 Guava 的 EventBus 作为 POC,然后像我一样制作自己的。

You might think that you need to have dynamic consumers (1) but most problems can be solved with a simple pub/sub. Also managing dynamic consumers can be tricky (hence Akka is a good choice because the actor model has all sort of management for this)

您可能认为您需要拥有动态消费者 (1),但大多数问题都可以通过简单的发布/订阅来解决。管理动态消费者也可能很棘手(因此 Akka 是一个不错的选择,因为 actor 模型对此进行了各种管理)

回答by Alexei Kaigorodov

Carefully define what you want from the framework. If a framework has more features than you need, it is not always good. More features means more bugs, more code to learn, and less performance.

仔细定义您希望从框架中获得什么。如果一个框架的功能比你需要的多,它并不总是好的。更多的功能意味着更多的错误、更多的代码需要学习,以及更少的性能。

Some features to concern are:

需要关注的一些功能是:

  • the nature of actors (threads or lightweight objects)
  • ability to work on a machine cluster (Akka)
  • persistent message queues (JMS)
  • specific features like signals (events without information), transitions (objects to combine messages from different ports into complex event, see Petri Nets) etc.
  • 演员的性质(线程或轻量级对象)
  • 在机器集群上工作的能力(Akka)
  • 持久消息队列 (JMS)
  • 特定功能,如信号(没有信息的事件)、转换(将来自不同端口的消息组合成复杂事件的对象,请参阅 Petri 网)等。

Be careful with synchronous features like await - it blocks the whole thread and is dangerous when actors are executed on a thread pool (thread starvation).

小心使用 await 等同步功能 - 它会阻塞整个线程,并且在线程池(线程饥饿)上执行 actor 时很危险。

More frameworks to look at:

更多框架看:

Fork-Join Pool- in some cases, allows awaitwithout thread starvation

Fork-Join Pool- 在某些情况下,允许await没有线程饥饿

Scientific workflow systems

科学工作流程系统

Dataflow framework for Java- signals, transitions

Java 的数据流框架- 信号、转换

ADD-ON: Two kinds of actors.

附加组件:两种演员。

Generally, parallel working system can be represented as a graph, where active nodes send messages to each other. In Java, as in most other mainstream languages, active nodes (actors) can be implemented either as threads or tasks (Runnable or Callable) executed by a thread pool. Normally, part of actors are threads and part are tasks. Both approaches has their advantages and disadvantages, so it's vital to chose most appropriate implementation for each actor in the system. Briefly, threads can block (and wait for events) but consume much memory for their stacks. Tasks may not block but use shared stacks (of threads in a pool).

通常,并行工作系统可以表示为一个图,其中活动节点相互发送消息。在 Java 中,与大多数其他主流语言一样,活动节点(actor)可以实现为线程或由线程池执行的任务(Runnable 或 Callable)。通常,一部分参与者是线程,一部分是任务。这两种方法都有其优点和缺点,因此为系统中的每个参与者选择最合适的实现至关重要。简而言之,线程可以阻塞(并等待事件)但会为它们的堆栈消耗大量内存。任务可能不会阻塞,而是使用共享堆栈(池中的线程)。

If a task calls a blocking operation, it excludes a pooled thread from service. If many tasks block, they can exclude all threads, causing a deadlock - those tasks which can unblock blocked tasks cannot run. This kind of deadlock is called thread starvation. If, in attempt to prevent thread starvation, configure thread pool as unlimited, we simply convert tasks into threads, loosing advantages of tasks.

如果任务调用阻塞操作,它将从服务中排除一个池线程。如果许多任务阻塞,它们可以排除所有线程,导致死锁——那些可以解除阻塞任务的任务无法运行。这种死锁称为线程饥饿。如果为了防止线程饥饿,将线程池配置为无限制,我们只需将任务转换为线程,就失去了任务的优势。

To eliminate calls to blocking operations in tasks, the task should be split in two (or more) - first task calls blocking operation and exits, and the rest is formatted as an asynchronous task started when the blocking operation finishes. Of course, the blocking operation has to have an alternative asynchronous interface. So, for example, instead of reading socket synchronously, NIO or NIO2 libraries should be used.

为了消除任务中对阻塞操作的调用,应该将任务分成两个(或更多)——第一个任务调用阻塞操作并退出,其余的被格式化为阻塞操作完成时启动的异步任务。当然,阻塞操作必须有一个替代的异步接口。因此,例如,不应使用同步读取套接字,而应使用 NIO 或 NIO2 库。

Unfortunately, standard java library lacks asynchronous counterparts for popular synchronization facilities like queues and semaphores. Fortunately, the are easy to implement from scratch (see Dataflow framework for Javafor examples).

不幸的是,标准 Java 库缺少用于流行同步工具(如队列和信号量)的异步对应物。幸运的是,它们很容易从头开始实现(有关示例,请参阅Java 的 Dataflow 框架)。

So, making computations purely with non-blocking tasks is possible but increases the size of code. Evident advise is to use threads where possible and tasks only for simple massive computations.

因此,纯粹使用非阻塞任务进行计算是可能的,但会增加代码的大小。明显的建议是在可能的情况下使用线程,而任务仅用于简单的大规模计算。