java 尝试测试队列持久性。消费者断点时无法发送消息

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/12267168/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-31 08:15:27  来源:igfitidea点击:

Trying to test the queue persistence. Can't send messages when breakpoint on consumer

javajmsactivemqspring-jms

提问by Sebastien Lorber

I'm trying to test the queue persistence of ActiveMQ.

我正在尝试测试 ActiveMQ 的队列持久性。

I have an embedded ActiveMQ server with an unique consumer. This embedded server receive JMS messages from many other JVM applications.

我有一个带有唯一使用者的嵌入式 ActiveMQ 服务器。这个嵌入式服务器从许多其他 JVM 应用程序接收 JMS 消息。

It works fine, consumer application receive the notifications.

它工作正常,消费者应用程序收到通知。

So i've tried to test the persistence of the messages. I've put a (remote) breakpoint on the MessageListener of the consumer so that i can enqueue many messages and make the ActiveMQ server crash. On server restart, i'd like all the enqueued messages to be able to be consumed, and not to be lost.

所以我试图测试消息的持久性。我在消费者的 MessageListener 上放置了一个(远程)断点,这样我就可以将许多消息加入队列并使 ActiveMQ 服务器崩溃。在服务器重新启动时,我希望所有排队的消息都能够被使用,而不会丢失。

And then i tried that test. I got into that breakpoint on the first message send. But for all messages i try to send, i get the following stacktrack on the producer side:

然后我尝试了那个测试。我在第一条消息发送时进入了那个断点。但是对于我尝试发送的所有消息,我在生产者端得到以下堆栈跟踪:

Exception in thread "main" org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Wire format negotiation timeout: peer did not send his wire format.
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:168)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:469)
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534)
    at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:612)
    at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:604)
    at com.xxxxxxxxxxx.mobilepush.client.RealClientTest.main(RealClientTest.java:29)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Caused by: javax.jms.JMSException: Wire format negotiation timeout: peer did not send his wire format.
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
    at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1380)
    at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1466)
    at org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:308)
    at org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:196)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:457)
    ... 9 more
Caused by: java.io.IOException: Wire format negotiation timeout: peer did not send his wire format.
    at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:98)
    at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
    at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
    at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
    at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1351)
    ... 13 more

I don't understand why my producer would be blocked when my consumer is in my breakpoint.

我不明白为什么当我的消费者处于我的断点时我的生产者会被阻止。

My broker uri is: mobilepush.activemq.broker.transport.connector.uri=tcp://0.0.0.0:61616

我的经纪人uri是: mobilepush.activemq.broker.transport.connector.uri=tcp://0.0.0.0:61616

The producer connects through tcp to the broker. The consumer, colocated with the broker, connects through vm://localhost.

生产者通过 tcp 连接到代理。与代理位于同一位置的消费者通过 连接vm://localhost



My configuration is pretty simple:

我的配置很简单:

SERVER:

    <!--  lets create an embedded ActiveMQ Broker -->
    <amq:broker useJmx="false" persistent="true">
        <amq:transportConnectors>
            <amq:transportConnector uri="${mobilepush.activemq.broker.transport.connector.uri}" />
        </amq:transportConnectors>
        <amq:persistenceAdapter>
            <amq:kahaPersistenceAdapter directory="${mobilepush.activemq.broker.queue.persistence.directory}" maxDataFileLength="100 Mb"/>
        </amq:persistenceAdapter>
    </amq:broker>


CONSUMER:
(management namespace and xebia class it only a JMX decorator)

<bean id="connectionFactory" class="fr.xebia.management.jms.SpringManagedConnectionFactory">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory" >
                <property name="brokerURL" value="${mobilepush.activemq.broker.uri}"/>
            </bean>
        </property>
    </bean>

    <bean id="pushConsumer" class="com.xxxxxxxxxxxxxxx.mobilepush.messaging.jms.PushConsumer">
        <property name="jmsPushMessageConverter" ref="jmsPushMessageConverter"/>
        <property name="pushDelegate" ref="directPushDelegate"/>
    </bean>

    <management:executor-service id="pushConsumerExecutor"
                                 pool-size="${mobilepush.consumer.thread.min}-${mobilepush.consumer.thread.max}" keep-alive="60" />

    <jms:listener-container
            task-executor="pushConsumerExecutor"
            connection-factory="connectionFactory"
            acknowledge="auto"
            container-class="fr.xebia.springframework.jms.ManagedDefaultMessageListenerContainer">
        <jms:listener destination="mobilepush.queue" ref="pushConsumer" method="onMessage" />
    </jms:listener-container>


PRODUCER:

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"  >
        <property name="brokerURL" value="${mobilepush.activemq.broker.uri}"/>
    </bean>


    <bean id="mobilePushJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="defaultDestination" ref="mobilePushQueue"/>
        <property name="messageConverter" ref="jmsPushMessageConverter"/>
        <property name="connectionFactory">
            <!-- lets wrap in a pool to avoid creating a connection per send -->
            <bean class="org.springframework.jms.connection.SingleConnectionFactory">
                <property name="targetConnectionFactory">
                    <ref local="connectionFactory" />
                </property>
            </bean>
        </property>
    </bean>

采纳答案by Sebastien Lorber

I found the problem!

我发现了问题!

The remote breakpoint i put on my embedded ActiveMQ consumer was a default breakpoint with suspend-policty=all.

我在嵌入式 ActiveMQ 使用者上放置的远程断点是一个带有 suspend-policty=all 的默认断点。

And as the consumer and the server runs on the same JVM, i was also suspending all the ActiveMQ server threads!

由于消费者和服务器在同一个 JVM 上运行,我也暂停了所有 ActiveMQ 服务器线程!

The solution is to use a breakpoint suspend-policy=thread so that only the consumer thread is suspended and the server threads can continue to run.

解决方法是使用断点 suspend-policy=thread ,这样只有消费者线程被挂起,服务器线程可以继续运行。

回答by Ker p pag

I fixed this issue by using the latest logback-core and logback-classic jar file which is 1.1.2

我使用最新的 logback-core 和 logback-classic jar 文件解决了这个问题,即 1.1.2

回答by jtahlborn

"java.io.IOException: Wire format negotiation timeout: peer did not send his wire format"seems pretty clear. you are blocking the client thread which is the other end of the network connection. the server is getting network timeouts trying to interact with the client. network connections are a situation where it's hard to debug by arbitrarily suspending a thread.

"java.io.IOException: Wire format negotiation timeout: peer did not send his wire format"似乎很清楚。您正在阻塞作为网络连接另一端的客户端线程。服务器正在尝试与客户端交互时出现网络超时。网络连接是一种很难通过任意挂起线程进行调试的情况。

回答by jkysam

The activemq broker will wait for a few seconds for the client to send the wire format before forcing a disconnect. On your connection URL try adding the following parameter to extend that time to something that will allow you to do your debugging:

在强制断开连接之前,activemq 代理将等待客户端发送有线格式几秒钟。在您的连接 URL 上尝试添加以下参数以将该时间延长到允许您进行调试的时间:

tcp://localhost:61616?wireFormat.maxInactivityDurationInitalDelay=30000 

The integer value is the number of miliseconds to wait.

整数值是等待的毫秒数。