java Spring JMS 侦听器-容器并发属性不起作用
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/33339224/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spring JMS listener-container concurrency attribute not working
提问by mahendra kawde
Hi I am learning Spring JMS with ActiveMQ. In my example scenario is Producer application sends around 50 messages in queue and when I start Consumer application it starts to consume those messages.
嗨,我正在使用 ActiveMQ 学习 Spring JMS。在我的示例场景中,Producer 应用程序在队列中发送大约 50 条消息,当我启动 Consumer 应用程序时,它开始使用这些消息。
Now I want multiple consumer threads to consume messages from queue. I am using JMS listener-container. When I googled for that I found there is a concurrencyattribute.
现在我想要多个消费者线程来消费队列中的消息。我正在使用JMS listener-container。当我用谷歌搜索时,我发现有一个并发属性。
According to Spring JMS docconcurrency attribute specifies
根据Spring JMS docconcurrency 属性指定
The number of concurrent sessions/consumers to start for each listener. Can either be a simple number indicating the maximum number (e.g. "5") or a range indicating the lower as well as the upper limit (e.g. "3-5"). Note that a specified minimum is just a hint and might be ignored at runtime. Default is 1; keep concurrency limited to 1 in case of a topic listener or if queue ordering is important; consider raising it for general queues.
为每个侦听器启动的并发会话/消费者的数量。可以是表示最大数量的简单数字(例如“5”),也可以是表示下限和上限的范围(例如“3-5”)。请注意,指定的最小值只是一个提示,可能会在运行时被忽略。默认为 1;在主题侦听器或队列排序很重要的情况下,将并发限制为 1;考虑为一般队列提高它。
But in my configuration I am setting this attribute to 5 but it seems it fails to start 5 concurrent listeners.
但是在我的配置中,我将此属性设置为 5,但似乎无法启动 5 个并发侦听器。
Configuration for listener:
监听器配置:
consumer-applicationContext.xml
消费者应用上下文.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616" />
<bean id="listener" class="com.jms.example.MyMessageListener"></bean>
<jms:listener-container container-type="default" concurrency="5"
connection-factory="connectionFactory">
<jms:listener destination="MyQueue" ref="listener"
method="onMessage"></jms:listener>
</jms:listener-container>
</beans>
And If I used bean DefaultMessageListenerContainerinstead of jms:listener-containerwith properties:
如果我使用 bean DefaultMessageListenerContainer而不是带有属性的jms:listener-container:
<bean id="msgListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:destination-ref="destination"
p:messageListener-ref="listener"
p:concurrentConsumers="10"
p:maxConcurrentConsumers="50" />
Then in ActiveMQ console I could see 10 consumers but in reality it starts 3 consumers simultaneously or sometimes 6 or only 1 consumer.
然后在 ActiveMQ 控制台中,我可以看到 10 个消费者,但实际上它同时启动了 3 个消费者,有时也有 6 个或只有 1 个消费者。
EDIT:
编辑:
Consumer code:
消费者代码:
public class MyMessageListener implements MessageListener{
public void onMessage(Message m) {
TextMessage message=(TextMessage)m;
try{
System.out.println("Start = " + message.getText());
Thread.sleep(5000);
System.out.println("End = " + message.getText());
}catch (Exception e) {e.printStackTrace(); }
}
}
I am printing consumed messages on console whose output is explained in scenarios below:
我正在控制台上打印消耗的消息,其输出在以下场景中进行了解释:
OBSERVATION:
观察:
I observed some weird behavior. My producer and consumer are two independent applications.
我观察到一些奇怪的行为。我的生产者和消费者是两个独立的应用程序。
Scenario - 1:
场景 - 1:
- I start producer and send messages(Meanwhile consumer is NOT running)
- Then I start consumer to consume messages.
- 我启动生产者并发送消息(同时消费者没有运行)
- 然后我开始消费者来消费消息。
Here problem is it does not loads all 10 consumers. Sometimes it loads 3 OR 1.
这里的问题是它没有加载所有 10 个消费者。有时它会加载 3 或 1。
Start = hello jms 1 // consumer 1 started
Start = hello jms 2 // consumer 2 started
Start = hello jms 3 // consumer 3 started
End = hello jms 1 // consumer 1 ended
Start = hello jms 4 // consumer 4 started and hence always 3 consumers and not 10
End = hello jms 2
Start = hello jms 5
End = hello jms 3
Start = hello jms 6
Scenario - 2:
场景 - 2:
- I start producer and send messages(Meanwhile consumer is running)
- Since the consumer is in running state it starts to consume them.
- 我启动生产者并发送消息(同时消费者正在运行)
- 由于消费者处于运行状态,它开始消费它们。
So it does load all 5 consumers properly as expected. so the output is:
所以它确实按预期正确加载了所有 5 个消费者。所以输出是:
Start = hello jms 1 // consumer 1 started
Start = hello jms 2 // consumer 2 started
Start = hello jms 3 // consumer 3 started
Start = hello jms 4 // consumer 4 started
Start = hello jms 5 // consumer 5 started
Start = hello jms 6 // consumer 6 started
Start = hello jms 7 // consumer 7 started
Start = hello jms 8 // consumer 8 started
Start = hello jms 9 // consumer 9 started
Start = hello jms 10 // consumer 10 started. Hence all them started at same time as expected.
End = hello jms 1
Start = hello jms 11
End = hello jms 2
Start = hello jms 12
End = hello jms 3
Start = hello jms 13
Why is this happening. It is really eating my brain. I don't want to keep consumer to be running forever. I want to keep both detached.
为什么会发生这种情况。它真的在吃我的大脑。我不想让消费者永远运行。我想保持两者分离。
Please help.
请帮忙。
回答by mahendra kawde
As Strelok pointed me about prefetching of messages. Created prefetchPolicy
bean with queuePrefetch
property set to 1.
Whose reference is set in connectionFactory.
正如 Strelok 向我指出的消息预取。创建prefetchPolicy
了queuePrefetch
属性设置为 1 的bean 。其引用在 connectionFactory 中设置。
I did some changes in configuration, those are as below:
我对配置做了一些更改,如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616"
p:prefetchPolicy-ref="prefetchPolicy" />
<bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy"
p:queuePrefetch="1" />
<bean id="listener" class="com.javatpoint.MyMessageListener"></bean>
<jms:listener-container concurrency="10-15" connection-factory="connectionFactory">
<jms:listener destination="javatpointQueue" ref="listener"
method="onMessage"></jms:listener>
</jms:listener-container>
<!-- The JMS destination -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="javatpointQueue" />
</bean>
</beans>
回答by Alexey
Just met this problem on spring-boot 1.5.9 application.
刚刚在 spring-boot 1.5.9 应用程序上遇到了这个问题。
As pointed out by @Strelok and @mahendra kawde, the issue is due to prefetchPolicy parameter. The default value is 1000.
正如@Strelok 和@mahendra kawde 所指出的,问题是由 prefetchPolicy 参数引起的。默认值为 1000。
Large prefetch values are recommended for high performance with high message volumes. However, for lower message volumes, where each message takes a long time to process, the prefetch should be set to 1. This ensures that a consumer is only processing one message at a time. Specifying a prefetch limit of zero, however, will cause the consumer to poll for messages, one at a time, instead of the message being pushed to the consumer.
建议使用较大的预取值以实现高消息量的高性能。但是,对于较低的消息量,其中每条消息需要很长时间来处理,预取应该设置为 1。这确保消费者一次只处理一条消息。但是,将预取限制指定为零将导致消费者轮询消息,一次一个,而不是将消息推送给消费者。
Take a look at http://activemq.apache.org/what-is-the-prefetch-limit-for.html
看看http://activemq.apache.org/what-is-the-prefetch-limit-for.html
One can change prefetchPolicy parameter as following:
可以按如下方式更改 prefetchPolicy 参数:
In
application.properties
file (working example)spring.activemq.broker-url=tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
In DefaultMessageListenerContainer by modifying destinationName parameter (working example)
<bean id="cons-even" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="destinationName" value="queue-name?consumer.prefetchSize=1"/> ... </bean>
In ConnectionFactory bean (working example):
@Bean public ConnectionFactory jmsConnectionFactory() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy(); policy.setQueuePrefetch(1); factory.setPrefetchPolicy(policy); return factory; }
在
application.properties
文件中(工作示例)spring.activemq.broker-url=tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
在 DefaultMessageListenerContainer 中通过修改 destinationName 参数(工作示例)
<bean id="cons-even" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="destinationName" value="queue-name?consumer.prefetchSize=1"/> ... </bean>
在 ConnectionFactory bean(工作示例)中:
@Bean public ConnectionFactory jmsConnectionFactory() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy(); policy.setQueuePrefetch(1); factory.setPrefetchPolicy(policy); return factory; }
Related topics:
相关话题:
回答by Pavan
JMS can work in concurrency mode. Below I am sharing the sample snippet concurrentConsumers = 100 value
JMS 可以在并发模式下工作。下面我将分享示例代码段 concurrentConsumers = 100 值
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers">
<value>100</value>
</property>
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queue" />
<property name="messageListener" ref="messageListener" />
<property name="sessionTransacted" value="false" />
<property name="sessionAcknowledgeMode" value="1" />
</bean>