java 在 JMS 异步消息处理中,只要调用 onMessage(),消息就从队列中删除

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/11498919/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-31 05:20:27  来源:igfitidea点击:

In JMS asynchronous message handling as soon as onMessage() is called, message deleted from queue

javajmsmessagebrokeropenmq

提问by Ashwini

Requirement:I want messages to be persist in queue till onMessage()executed successfully. If any exception occur during execution of onMessage()and if it is not handle then message should be redelivered to listener.

要求:我希望消息在队列中持久化,直到onMessage()成功执行。如果在执行过程中发生任何异常onMessage()并且没有处理,则应将消息重新传递给侦听器。

I am having Glassfish v2 as a application server. I am using OpenMQConnectionFactory and JmsTemplate to send message on queue. Please note that I am not using MDB.

我将 Glassfish v2 作为应用程序服务器。我正在使用 OpenMQConnectionFactory 和 JmsTemplate 在队列上发送消息。 请注意,我没有使用 MDB。

<bean id="openMQConnectionFactory"
    class="com.is.br.util.OpenMqConnectionFactoryBean">
    <property name="imqAddressList" value="mq://localhost:7676" />
    <property name="imqDefaultUsername" value="admin" />
    <property name="imqDefaultPassword" value="admin" />
</bean>

I tried AUTO_ACKNOWLEDGE as acknowledge mode but in listener when exception thrown message is not redelivered.

我尝试将 AUTO_ACKNOWLEDGE 作为确认模式,但在未重新传递异常抛出消息时在侦听器中。

MessageProducer.java

消息生产者.java

public void sendMessage(final String responseStream) {

public void sendMessage(final String responseStream) {

    System.out.println("Enter into IsJmsProducer.sendMessage method");

    try {
        MessageCreator creator = new MessageCreator() {
            public Message createMessage(Session session) {
                ObjectMessage message = null;
                try {
                    message = session.createObjectMessage(responseStream);
                    } catch (Exception e) {
                    System.out.println("Unable create a JMSMessage");
                }
                return message;
            }
        };

        System.out.println("Sending message to destination: " + this.destination.toString());
        this.jmsTemplate.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);

        this.jmsTemplate.send(this.destination, creator);
        System.out.println("SendMessage to queue successfully.");           
    } catch (Exception ex) {
        System.out.println("SendMessage to queue Fail." + ex);
    }
    System.out.println("Exit from IsJmsProducer.sendMessage method");

}

SampleJMSConsumer.java

示例JMSConsumer.java

public class SampleJMSConsumer implements MessageListener {

    @Override
    public void onMessage(Message message) {
        throw new RuntimeException();
   }
}

Then I tried with this.jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);and in listener I called message.acknowledge();and in catchblock I called session.recover()still message is not redeliver.

然后我尝试使用this.jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);并在我调用的侦听器中调用 message.acknowledge();并在catch块中调用session.recover()仍然消息未重新传递。

SampleJMSConsumer.java

示例JMSConsumer.java

public class SampleJMSConsumer implements MessageListener  {

    @Override
    public void onMessage(Message message) {

        ObjectMessage objectMessage = (ObjectMessage) message;
        Object object;
        try {
            object = objectMessage.getObject();
            if (object instanceof String) {
                System.out.println("Message received - " + object.toString());
                throw new JMSException("JMS exception");
            }
            message.acknowledge();
        } catch (JMSException e) {
               session.recover();
        }
    }

}

}

When I run the program in debug mode and I send message on queue in broker admin console I am able to see the number of messages but as soon as onMessage() called number of messages get reduce by one. That means message is consumed and deleted from queue. Is that message consider as "delivered"? Please help me to understand why message is not redeliver when exception occur?

当我在调试模式下运行程序并在代理管理控制台的队列上发送消息时,我能够看到消息数量,但是一旦 onMessage() 调用的消息数量减少一。这意味着消息被消费并从队列中删除。该消息是否被视为“已交付”?请帮助我理解为什么发生异常时消息不重新传递?

Thanks in advance.

提前致谢。

回答by Aksel Willgert

I think this is by design, delivered when onmessage gets called. If you want do something about the exception you might handle it using try catch.

我认为这是设计使然,在调用 onmessage 时传递。如果您想对异常做一些事情,您可以使用 try catch 处理它。

Assume the message was put on the queue once again, you wouls likely hit the same exception when consumed anyways.

假设消息再次放入队列中,无论如何您都可能会遇到相同的异常。

The ack mechanisms should imo be about assuring correct delivery. Maybe what you are after is a reject mechanism where you ask the prpoducerside to send a new message?

确认机制应该是关于确保正确交付。也许您所追求的是一种拒绝机制,您可以在其中要求生产方发送新消息?

回答by Shashi

Client Acknowledge is suited for you. In your onMessage() method, once processing is over, you need to call Acknowledge otherwise if there is any exception, then you don't call Acknowledge().

客户确认适合您。在您的 onMessage() 方法中,一旦处理结束,您需要调用 Acknowledge 否则,如果有任何异常,则不要调用 Acknowledge()。

Session.Recovery() stops and restarts message delivery. The delivery of the message will be from the last unacknowledged message.

Session.Recovery() 停止并重新启动消息传递。消息的传递将来自最后一条未确认的消息。

回答by Amrish Pandey

session created in consumer should set session mode to AUTO_ACK / DUPS_OK_ACK. You haven't shared your code for starting consumer. You are setting session mode in Producer but not Consumer.

在消费者中创建的会话应将会话模式设置为 AUTO_ACK / DUPS_OK_ACK。您尚未共享启动消费者的代码。您正在 Producer 中设置会话模式,而不是 Consumer。

回答by Edmondo1984

I would suggest to verify what is the default session mode for OpenMQ. It could happen that, once you opened a connection, you can't change it, so that has to be specified upon connection openin.

我建议验证 OpenMQ 的默认会话模式是什么。可能会发生这样的情况,一旦打开连接,就无法更改它,因此必须在连接 openin 时指定。