如何使用 java 和 spring 3.0 同时处理来自 JMS 主题(不是队列)的多条消息?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/3088814/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How can I handle multiple messages concurrently from a JMS topic (not queue) with java and spring 3.0?
提问by Edgar Styles
Note that I'd like multiple message listeners to handle successive messages from the topic concurrently. In addition I'd like each message listener to operate transactionally so that a processing failure in a given message listener would result in that listener's message remaining on the topic.
请注意,我希望多个消息侦听器同时处理来自主题的连续消息。此外,我希望每个消息侦听器都以事务方式进行操作,以便给定消息侦听器中的处理失败将导致该侦听器的消息保留在该主题上。
The spring DefaultMessageListenerContainer seems to support concurrency for JMS queues only.
spring DefaultMessageListenerContainer 似乎只支持 JMS 队列的并发。
Do I need to instantiate multiple DefaultMessageListenerContainers?
我需要实例化多个 DefaultMessageListenerContainers 吗?
If time flows down the vertical axis:
如果时间沿纵轴流动:
ListenerA reads msg 1 ListenerB reads msg 2 ListenerC reads msg 3
ListenerA reads msg 4 ListenerB reads msg 5 ListenerC reads msg 6
ListenerA reads msg 7 ListenerB reads msg 8 ListenerC reads msg 9
ListenerA reads msg 10 ListenerB reads msg 11 ListenerC reads msg 12
...
UPDATE:
Thanks for your feedback @T.Rob and @skaffman.
更新:
感谢@T.Rob 和@skaffman 的反馈。
What I ended up doing is creating multiple DefaultMessageListenerContainers
with concurrency=1
and then putting logic in the message listener so that only one thread would process a given message id.
我最终做的是创建多个DefaultMessageListenerContainers
withconcurrency=1
然后将逻辑放入消息侦听器中,以便只有一个线程可以处理给定的消息 ID。
回答by skaffman
You don't want multiple DefaultMessageListenerContainer
instances, no, but you do need to configure the DefaultMessageListenerContainer
to be concurrent, using the concurrentConsumers
property:
您不想要多个DefaultMessageListenerContainer
实例,不,但您确实需要DefaultMessageListenerContainer
使用concurrentConsumers
属性将 配置为并发:
Specify the number of concurrent consumers to create. Default is 1.
Specifying a higher value for this setting will increase the standard level of scheduled concurrent consumers at runtime: This is effectively the minimum number of concurrent consumers which will be scheduled at any given time. This is a static setting; for dynamic scaling, consider specifying the "maxConcurrentConsumers" setting instead.
Raising the number of concurrent consumers is recommendable in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues.
指定要创建的并发消费者的数量。默认值为 1。
为此设置指定更高的值将增加运行时调度并发使用者的标准级别:这实际上是将在任何给定时间调度的并发使用者的最小数量。这是一个静态设置;对于动态缩放,请考虑指定“maxConcurrentConsumers”设置。
建议增加并发消费者的数量,以扩展来自队列的消息的消耗。但是,请注意,一旦注册了多个消费者,任何排序保证都会丢失。一般来说,对于低容量队列,坚持使用 1 个消费者。
However, there's big warning at the bottom:
但是,底部有一个很大的警告:
Do not raise the number of concurrent consumers for a topic. This would lead to concurrent consumption of the same message, which is hardly ever desirable.
不要增加一个主题的并发消费者数量。这将导致同一消息的并发消费,这几乎是不可取的。
This is interesting, and makes sense when you think about it. The same would occur if you had multiple DefaultMessageListenerContainer
instances.
这很有趣,当你考虑它时,它是有道理的。如果您有多个DefaultMessageListenerContainer
实例,也会发生同样的情况。
I think perhaps you need to rethink your design, although I'm not sure what I'd suggest. Concurrent consumption of pub/sub messages seems like a perfectly reasonable thing to do, but how to avoid getting the same message delivered to all of your consumers at the same time?
我想也许你需要重新考虑你的设计,虽然我不确定我会建议什么。发布/订阅消息的并发消费似乎是一件非常合理的事情,但如何避免将相同的消息同时发送给所有消费者?
回答by T.Rob
This is one of those occasions where the differences in transport providers bubble up through the abstraction of JMS. JMS wants to provide a copy of the message for each subscriber on a topic. But the behavior that you want is really that of a queue. I suspect that there are other requirements driving this to a pub/sub solution which were not described - for example other things need to subscribe to the same topic independent of your app.
这是传输提供程序的差异通过 JMS 的抽象冒泡的场合之一。JMS 想要为主题的每个订阅者提供消息的副本。但是您想要的行为实际上是队列的行为。我怀疑还有其他要求将其驱动到未描述的发布/订阅解决方案 - 例如,其他事情需要订阅独立于您的应用程序的同一主题。
If I were to do this in WebSphere MQ the solution would be to create an administrative subscription which would result in a single copy of each message on the given topic to be placed onto a queue. Then your multiple subscribers could compete for messages on that queue. This way your app could have multiple threads among which the messages are distributed, and at the same time other subscribers independent of this application could dynamically (un)subscribe to the same topic.
如果我要在 WebSphere MQ 中执行此操作,解决方案将是创建一个管理订阅,这将导致将给定主题上的每条消息的单个副本放置到队列中。然后您的多个订阅者可能会竞争该队列上的消息。这样,您的应用程序可以有多个线程,消息在其中分发,同时独立于该应用程序的其他订阅者可以动态(取消)订阅同一主题。
Unfortunately, there's no generic JMS-portable way of doing this. You are dependent on the transport provider's implementation to a great degree. The only one of these I can speak to is WebSphere MQ but I'm sure other transports support this in one way or another and to varying degrees if you are creative.
不幸的是,没有通用的 JMS 可移植方式来执行此操作。您在很大程度上依赖于传输提供商的实现。我可以谈论的唯一一个是 WebSphere MQ,但我相信其他传输会以一种或另一种方式支持这一点,并且如果您有创造力,则可以在不同程度上支持这一点。
回答by cobbzilla
I've run into the same problem. I'm currently investigating RabbitMQ, which seems to offer a perfect solution in a design pattern they call "work queues." More info here: http://www.rabbitmq.com/tutorials/tutorial-two-java.html
我遇到了同样的问题。我目前正在研究 RabbitMQ,它似乎在他们称为“工作队列”的设计模式中提供了完美的解决方案。更多信息在这里:http: //www.rabbitmq.com/tutorials/tutorial-two-java.html
If you're not totally tied to JMS you might look into this. There might also be a JMS to AMQP bridge, but that might start to look hacky.
如果您不完全依赖于 JMS,您可能会研究这个。可能还有一个 JMS 到 AMQP 的桥接器,但这可能开始看起来很麻烦。
I'm having some fun (read: difficulties) getting RabbitMQ installed and running on my Mac but think I'm close to having it working, I will post back if I'm able to solve this.
我在我的 Mac 上安装和运行 RabbitMQ 很有趣(阅读:困难),但我认为我已经接近让它工作了,如果我能够解决这个问题,我会回帖。
回答by Krishna Sankar
Came across this question. My configuration is :
遇到了这个问题。我的配置是:
Create a bean with id="DefaultListenerContainer"
, add property name="concurrentConsumers" value="10"
and property name="maxConcurrentConsumers" value ="50"
.
创建一个 bean id="DefaultListenerContainer"
,添加属性name="concurrentConsumers" value="10"
和属性name="maxConcurrentConsumers" value ="50"
。
Works fine, so far. I printed the thread id and verified that multiple threads do get created and also reused.
工作正常,到目前为止。我打印了线程 id 并验证了多个线程确实被创建和重用。
回答by shrini1000
Here's a possibility:
这是一种可能性:
1) create only one DMLC configured with the bean and method to handle the incoming message. Set its concurrency to 1.
1) 只创建一个配置了 bean 和方法的 DMLC 来处理传入的消息。将其并发设置为 1。
2) Configure a task executor with its #threads equal to the concurrency you desire. Create an object pool for objects which are actually supposed to process a message. Give a reference of task executor and object pool to the bean you configured in #1. Object pool is useful if the actual message processing bean is not thread-safe.
2)配置一个任务执行器,它的#threads 等于你想要的并发。为实际应该处理消息的对象创建一个对象池。为您在 #1 中配置的 bean 提供任务执行器和对象池的引用。如果实际的消息处理 bean 不是线程安全的,则对象池很有用。
3) For an incoming message, the bean in DMLC creates a custom Runnable, points it to the message and the object pool, and gives it to task executor.
3)对于传入的消息,DMLC中的bean创建一个自定义的Runnable,将其指向消息和对象池,并交给任务执行器。
4) The run method of Runnable gets a bean from the object pool and calls its 'process' method with the message given.
4) Runnable 的 run 方法从对象池中获取一个 bean,并使用给定的消息调用它的 'process' 方法。
#4 can be managed with a proxy and the object pool to make it easier.
#4 可以使用代理和对象池进行管理,以使其更容易。
I haven't tried this solution yet, but it seems to fit the bill. Note that this solution is not as robust as EJB MDB. Spring e.g. will not discard an object from the pool if it throws a RuntimeException.
我还没有尝试过这个解决方案,但它似乎符合要求。请注意,此解决方案不如 EJB MDB 健壮。如果 Spring 抛出 RuntimeException,则它不会从池中丢弃对象。
回答by deFreitas
At least in ActiveMQ what you want is totally supported, his name is VirtualTopic
至少在ActiveMQ中你想要的完全支持,他的名字是VirtualTopic
The concept is:
概念是:
- You create a VirtualTopic(Simply creating a Topic using the prefix
VirtualTopic.
) eg.VirtualTopic.Color
- Create a consumer subscribing to this VirtualTopicmatching this pattern
Consumer.<clientName>.VirtualTopic.<topicName>
eg.Consumer.client1.VirtualTopic.Color
, doing it, Activemq will create a queuewith that name and that queue will subscribe toVirtualTopic.Color
then every message published to this Virtual Topic will be delivered to client1queue, note that it works like rabbitmq exchanges. - You are done, now you can consume client1queue like every queue, with many consumers, DLQ, customized redelivery policy, etc.
- At this point I think you understood that you can create client2, client3and how many subscribersyou want, all of them will receive a copy of the message published to
VirtualTopic.Color
- 您创建一个VirtualTopic(只需使用前缀创建一个主题
VirtualTopic.
),例如。VirtualTopic.Color
- 创建订阅此VirtualTopic匹配此模式的消费者,
Consumer.<clientName>.VirtualTopic.<topicName>
例如。Consumer.client1.VirtualTopic.Color
,这样做,Activemq将创建一个具有该名称的队列,该队列将订阅,VirtualTopic.Color
然后发布到此虚拟主题的每条消息都将传递到client1队列,注意它的工作方式类似于 rabbitmq 交换。 - 大功告成,现在你可以像每个队列一样消费client1队列,有很多消费者,DLQ,定制的重新投递策略等。
- 在这一点上,我想你明白你可以创建client2,client3以及 你想要多少订阅者,他们都会收到发布到的消息的副本
VirtualTopic.Color
Here the code
这里的代码
@Component
public class ColorReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);
@Autowired
private JmsTemplate jmsTemplate;
// simply generating data to the topic
long id=0;
@Scheduled(fixedDelay = 500)
public void postMail() throws JMSException, IOException {
final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)];
final Color color = new Color(++id, colorName.getName());
final ActiveMQObjectMessage message = new ActiveMQObjectMessage();
message.setObject(color);
message.setProperty("color", color.getName());
LOGGER.info("status=color-post, color={}", color);
jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message);
}
/**
* Listen all colors messages
*/
@JmsListener(
destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer"
selector = "color <> 'RED'"
)
public void genericReceiveMessage(Color color) throws InterruptedException {
LOGGER.info("status=GEN-color-receiver, color={}", color);
}
/**
* Listen only red colors messages
*
* the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that
* the containers clientId need to be different between each other
*/
@JmsListener(
// destination = "Consumer.redColorContainer.VirtualTopic.color",
destination = "Consumer.client1.VirtualTopic.color",
containerFactory = "redColorContainer", selector = "color='RED'"
)
public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException {
LOGGER.info("status=RED-color-receiver, color={}", message.getObject());
}
/**
* Listen all colors messages
*/
@JmsListener(
destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer"
)
public void genericReceiveMessage2(Color color) throws InterruptedException {
LOGGER.info("status=GEN-color-receiver-2, color={}", color);
}
}
@SpringBootApplication
@EnableJms
@EnableScheduling
@Configuration
public class Config {
/**
* Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different
* clientIds per consumer pool (as two @JmsListener above, or two application instances)
*
*/
@Bean
public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("1-5");
configurer.configure(factory, connectionFactory);
// container.setClientId("aId..."); lets spring generate a random ID
return factory;
}
@Bean
public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
// necessary when post serializable objects (you can set it at application.properties)
connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName()));
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("1-2");
configurer.configure(factory, connectionFactory);
return factory;
}
}
public class Color implements Serializable {
public static final Color WHITE = new Color("WHITE");
public static final Color BLUE = new Color("BLUE");
public static final Color RED = new Color("RED");
private String name;
private long id;
// CONSTRUCTORS, GETTERS AND SETTERS
}
回答by b10y
Creating a custom task executor seemingly solved the issue for me, w/o duplicate processing:
创建自定义任务执行程序似乎为我解决了这个问题,无需重复处理:
@Configuration
class BeanConfig {
@Bean(destroyMethod = "shutdown")
public ThreadPoolTaskExecutor topicExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setAllowCoreThreadTimeOut(true);
executor.setKeepAliveSeconds(300);
executor.setCorePoolSize(4);
executor.setQueueCapacity(0);
executor.setThreadNamePrefix("TOPIC-");
return executor;
}
@Bean
JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer, @Qualifier("topicExecutor") Executor topicExecutor) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(true);
factory.setSessionTransacted(false);
factory.setSubscriptionDurable(false);
factory.setTaskExecutor(topicExecutor);
return factory;
}
}
class MyBean {
@JmsListener(destination = "MYTOPIC", containerFactory = "topicListenerFactory", concurrency = "1")
public void receiveTopicMessage(SomeTopicMessage message) {}
}
回答by N.Shrivastava
Multiple Consumers Allowed on the Same Topic Subscription in JMS 2.0, while this was not the case with JMS 1.1. Please refer: https://www.oracle.com/technetwork/articles/java/jms2messaging-1954190.html
JMS 2.0 中允许多个使用者订阅同一主题,而 JMS 1.1 则不是这种情况。请参考:https: //www.oracle.com/technetwork/articles/java/jms2messaging-1954190.html
回答by Nora alshareef
on server.xml configs:
在 server.xml 配置上:
so , in maxSessions you can identify the number of sessions you want.
因此,在 maxSessions 中,您可以确定所需的会话数。