java 如何从 Web 应用程序侦听消息队列?(Tomcat,ActiveMQ)
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/31669219/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to listen to a message queue from a web application? (Tomcat, ActiveMQ)
提问by Dimitri Dewaele
I'm happily improving my web applicationthat runs on a Apache Tomcat. An ActiveMQJMS server is added to send and receive messages.
我很高兴地改进了在 Apache Tomcat上运行的Web 应用程序。添加了一个ActiveMQJMS 服务器来发送和接收消息。
I already can send and receive messages, but need help on the receiver side.
我已经可以发送和接收消息,但需要接收方的帮助。
How should my web app continuously listen to one queue to receive messages?
我的 Web 应用程序应该如何持续侦听一个队列以接收消息?
New messages arrive and the server should act on them. Ex: add data to the DB or or send an message back.
新消息到达,服务器应该对它们采取行动。例如:将数据添加到数据库或发送回消息。
I can already send messages. This is code.
我已经可以发送消息了。这是代码。
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("clientQueue");
MessageProducer publisher = session.createProducer(queue);
connection.start();
Message message = null;
message = session.createTextMessage("Text Message");
publisher.send(message);
I can already receive a message after a request (a click ;-))
我已经可以在收到请求后收到一条消息(单击 ;-))
connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("serverQueue");
consumer = session.createConsumer(destination);
while (true) {
Message message = consumer.receive(300000);
//Do message stuff
}
How should I let the web application listen to the queue continuously?What is the advised way?
我应该如何让 Web 应用程序持续监听队列?什么是建议的方式?
All help is warmly appreciated. Thanks.
热烈感谢所有帮助。谢谢。
EDIT - SOLUTION
编辑 - 解决方案
Current working solution with the proposals from DaveH
来自DaveH的建议的当前工作解决方案
I have added a ServletContextListener to listen to my message continuously.
我添加了一个 ServletContextListener 来连续收听我的消息。
web.xml
网页.xml
<listener>
<listener-class>com.test.JMSContextListener</listener-class>
</listener>
The Listeren:
听者:
public class JMSContextListener implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent arg0) {
Thread thread = new Thread(new JMSConnector());
thread.start();
}
@Override
public void contextDestroyed(ServletContextEvent arg0) {
//Nothing
}
}
The Connection:
连接:
public class JMSConnector implements Runnable {
public void run() {
try {
Context context = new InitialContext();
QueueConnectionFactory factory = (QueueConnectionFactory) context.lookup("java:comp/env/jms/ConnectionFactory");
Connection connection = factory.createConnection();
Queue queue = (javax.jms.Queue) context.lookup("java:comp/env/jms/serverQueue");
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
//This MessageListener will do stuff with the message
MessageListenerImpl messageListener = new MessageListenerImpl();
consumer.setMessageListener(messageListener);
connection.start();
// Start connection or nothing will happen!!!
connection.start();
} catch (JMSException ex) {
//TODO
} catch (NamingException ex) {
//TODO
}
}
}
Is this an advised way or should this be improved?
这是一种建议的方式还是应该改进?
All help is warmly appreciated. Thanks.
热烈感谢所有帮助。谢谢。
采纳答案by DaveH
If you have code that already can consume messages from the queue ( which it appears you do ), then I think your problem comes down to how do you get that piece of code to run.
如果您的代码已经可以使用队列中的消息(看起来您确实这样做了),那么我认为您的问题归结为如何让那段代码运行。
It appears you aren't using any frameworks so I think the approach that I would take would be to take that code that can retrieve messages from the queue and run it in a separate thread in the application server. Get that thread to start up at application server start time, and tidy up itself as the application server closes down.
看来您没有使用任何框架,所以我认为我将采用的方法是采用可以从队列中检索消息的代码并在应用程序服务器的单独线程中运行它。让该线程在应用服务器启动时启动,并在应用服务器关闭时自行整理。
Easiest way to start the thread on app server startup is to introduce a ServletContextListener ( an example here.) In the Context Listener start your queue listening code in a separate thread.
在应用服务器启动时启动线程的最简单方法是引入一个 ServletContextListener(这里是一个示例)。在 Context Listener 中,在单独的线程中启动您的队列侦听代码。
EDIT:I used this proposed solution and added the code above to the question.
编辑:我使用了这个建议的解决方案并将上面的代码添加到问题中。
回答by Grigory Kislin
Configure JMS queue in Tomcat: https://martinsdeveloperworld.wordpress.com/2013/03/03/apache-activemq-and-tomcat/
Put activemq-all-5.xx.jar to $TOMCAT_HOME/lib
Listen in cycle in Thread, started in
@WebListener
:Stop when contextDestroyed
@WebListener @Slf4j public class JmsMailListener implements ServletContextListener { private Thread listenerThread = null; private QueueConnection connection; @Override public void contextInitialized(ServletContextEvent sce) { try { InitialContext initCtx = new InitialContext(); ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) initCtx.lookup("java:comp/env/jms/ConnectionFactory"); connectionFactory.setTrustAllPackages(true); connection = connectionFactory.createQueueConnection(); QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = (Queue) initCtx.lookup("java:comp/env/jms/queue/MailQueue"); QueueReceiver receiver = queueSession.createReceiver(queue); connection.start(); log.info("Listen JMS messages ..."); listenerThread = new Thread(() -> { try { while (!Thread.interrupted()) { Message m = receiver.receive(); if (m instanceof ObjectMessage) { ObjectMessage om = (ObjectMessage) m; MyObject myObject = (MyObject) om.getObject(); log.info("Received MyObject {}", myObject); ... } } } catch (Exception e) { log.error("Receiving messages failed: " + e.getMessage(), e); } }); listenerThread.start(); } catch (Exception e) { log.error("JMS failed: " + e.getMessage(), e); } } @Override public void contextDestroyed(ServletContextEvent sce) { if (connection != null) { try { connection.close(); } catch (JMSException ex) { log.warn("Couldn't close JMSConnection: ", ex); } } if (listenerThread != null) { listenerThread.interrupt(); } } }
在Tomcat中配置JMS队列:https: //martinsdeveloperworld.wordpress.com/2013/03/03/apache-activemq-and-tomcat/
将 activemq-all-5.xx.jar 放到 $TOMCAT_HOME/lib
在线程中循环监听,开始于
@WebListener
:当 contextDestroyed 时停止
@WebListener @Slf4j public class JmsMailListener implements ServletContextListener { private Thread listenerThread = null; private QueueConnection connection; @Override public void contextInitialized(ServletContextEvent sce) { try { InitialContext initCtx = new InitialContext(); ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) initCtx.lookup("java:comp/env/jms/ConnectionFactory"); connectionFactory.setTrustAllPackages(true); connection = connectionFactory.createQueueConnection(); QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = (Queue) initCtx.lookup("java:comp/env/jms/queue/MailQueue"); QueueReceiver receiver = queueSession.createReceiver(queue); connection.start(); log.info("Listen JMS messages ..."); listenerThread = new Thread(() -> { try { while (!Thread.interrupted()) { Message m = receiver.receive(); if (m instanceof ObjectMessage) { ObjectMessage om = (ObjectMessage) m; MyObject myObject = (MyObject) om.getObject(); log.info("Received MyObject {}", myObject); ... } } } catch (Exception e) { log.error("Receiving messages failed: " + e.getMessage(), e); } }); listenerThread.start(); } catch (Exception e) { log.error("JMS failed: " + e.getMessage(), e); } } @Override public void contextDestroyed(ServletContextEvent sce) { if (connection != null) { try { connection.close(); } catch (JMSException ex) { log.warn("Couldn't close JMSConnection: ", ex); } } if (listenerThread != null) { listenerThread.interrupt(); } } }
回答by Hossein
I am using spring to listen to a queue. defining the listener is like this :
我正在使用 spring 来收听队列。定义监听器是这样的:
<jms:listener-container connection-factory="jmsConnectionFactoryLocal">
<jms:listener destination="QUEUE_NAME" ref="channelManagerSimulatorDefault"/>
</jms:listener-container>
the jmsConnectionFactoryLocal must be created according to your MQ. in my case it was IBM WebsphereMQ, so the jmsConnectionFactoryLocal defining is like this :
jmsConnectionFactoryLocal 必须根据您的 MQ 创建。就我而言,它是 IBM WebsphereMQ,所以 jmsConnectionFactoryLocal 定义是这样的:
<bean id="mqConnectionFactoryLocal" class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName">
<value>THE_MQ_SERVER_IP</value>
</property>
<property name="port">
<value>MQ_PORT</value>
</property>
<property name="queueManager">
<value>QUEUE_MANAGER_NAME</value>
</property>
<property name="transportType">
<value>1</value>
</property>
</bean>
<bean id="jmsConnectionFactoryLocal" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory" ref="mqConnectionFactoryLocal"/>
<property name="username" value="USER_NAME"/>
<property name="password" value="PASSWORD"/>
</bean>
you have to find the right ConnectionFactory implementation for ActiveMQ and use it. the listener and jmsConnectionFactory are the same and independent to the MQ provider.
您必须为 ActiveMQ 找到正确的 ConnectionFactory 实现并使用它。侦听器和 jmsConnectionFactory 是相同的并且独立于 MQ 提供程序。
回答by ManojP
I am using activeMQ in my webApplication spring-mvc and JMS Template with the following approach.
我使用以下方法在我的 webApplication spring-mvc 和 JMS 模板中使用 activeMQ。
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
<bean id="messageSender" class="xyz.MessageSender"/>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="destination" />
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="REQUEST_QUEUE" />
</bean>
<bean id="messageListener" class="xtz.MessageListener">
<property name="listenerId" value="1" />
</bean>
<jms:listener-container connection-factory="connectionFactory">
<jms:listener destination="RESPONSE_QUEUE" ref="messageListener" method="messageReceived" />
</jms:listener-container>
Sender and Listener class implementation given below.
下面给出了发送者和监听者类的实现。
public class MessageSender
{
@Autowired
private JmsTemplate jmsTemplate;
public void sendMessage()
{
jmsTemplate.send(new MessageCreator()
{
public Message createMessage(Session session) throws JMSException
{
MapMessage message = session.createMapMessage();
message.setString("messageType", XXX);
message.setString("jsonMessage", XXXX);
return message;
}
});
}
}
public class MessageListener
{
private int listenerId;
@Override
public void messageReceived(Map<String, Object> message) throws Exception
{
//put your logic here
}
public int getListenerId()
{
return listenerId;
}
public void setListenerId(int listenerId)
{
this.listenerId = listenerId;
}
}