如何在 Activemq 中以消费者身份订阅 Java 程序?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/18672354/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-12 10:03:52  来源:igfitidea点击:

How can I subscribe Java program as a consumer in Activemq?

javajmsactivemq

提问by Hanumath

I want to implement Pub/Subdomain in project.Basically I am not a java developer,using google help.I read this Link. I started to implement following structure. enter image description here

我想Pub/Sub在项目中实现域。基本上我不是 Java 开发人员,使用谷歌帮助。我阅读了这个链接。我开始实现以下结构。 在此处输入图片说明

I wrote Java Application name as MessageConsumer.javafor receiving messages from AMQ broker and placed in Webserver(Apache Tomcat).

我写了 Java 应用程序名称作为MessageConsumer.java从 AMQ 代理接收消息并放置在 Web 服务器(Apache Tomcat)中。

MessageConsumercode:

消息消费者代码:

 package PackageName;
 import java.io.IOException;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.jms.*;
 import org.apache.activemq.ActiveMQConnectionFactory;
 public class Consumer extends HttpServlet {
 @Override
 protected void service(HttpServletRequest arg0, HttpServletResponse arg1)
    throws ServletException, IOException {
try {
//creating connectionfactory object for way
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61617");
//establishing the connection b/w this Application and Activemq
Connection connection=connectionFactory.createConnection();
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic queue=session.createTopic("MessageTesting");
javax.jms.MessageConsumer consumer=session.createConsumer(queue);
//fetching queues from Activemq
MessageListener listener = new MyListener();
consumer.setMessageListener(listener);
connection.start();
}
catch (Exception e) {
// TODO: handle exception
}
}

}

}

Separatly I wrote another Java Application for processing messages(MyListener.java).

我单独编写了另一个用于处理消息的 Java 应用程序(MyListener.java)。

MyListener.java code :

MyListener.java 代码:

package PackageName;
import java.io.*;
import java.net.*;
import javax.jms.*;
public class MyListener implements MessageListener {
public void onMessage(Message msg) {
    TextMessage msg1=(TextMessage)msg;
    //just for your understanding I mention dummy code
    //System.out.println(msg1.getText());
    MyListener ml=new MyListener();
    try {

      ml.executeHttp("http://localhost:8080/ExecutableFileProcess/ClassName");
        System.out.println(msg1.getText());
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}}

Both Java Applications are in webserver(Apache Tomcat).so far we are following in the following way.

两个 Java 应用程序都在网络服务器(Apache Tomcat)中。到目前为止,我们按照以下方式进行操作。

  1. Before sending messages to Topic,we are triggering MessageConsumer.java through HTTP on browser.
  1. 在向 Topic 发送消息之前,我们在浏览器上通过 HTTP 触发 MessageConsumer.java。

Right know, what we are trying.Initially we don't want to trigger MessageConsumer.java.

知道,我们正在尝试什么。最初我们不想触发MessageConsumer.java.

Means,Assume MessageConsumer.javais in Webserver. Initially If AMQ get message from anywhere, our MessageConsumer.java should be process their own logic.

意思是,假设MessageConsumer.java在 Webserver 中。最初如果 AMQ 从任何地方获取消息,我们的 MessageConsumer.java 应该处理自己的逻辑。

I hope, you guys understand what We are trying.

我希望,你们明白我们正在尝试什么。

I never work on Apache Camel, can you explain clearly.

我从来没有工作过Apache Camel,你能解释清楚吗。

Thanks.

谢谢。

回答by zenbeni

Did you check Apache Camel? http://camel.apache.org/

你检查过Apache Camel了吗? http://camel.apache.org/

You can define routes with camel to publish and subscribe to topics on a broker from java code (integration with spring beans for instance). There are many examples including interaction with an activemq message broker.

您可以使用骆驼定义路由以从 Java 代码(例如与 spring bean 集成)发布和订阅代理上的主题。有许多示例,包括与 activemq 消息代理交互。

回答by Alpesh Gediya

why you want to trigger MessageConsumer.java manually as invocation of Subscriberis the resposibility of ActiveMQ in your case.

为什么要MessageConsumer手动触发.java 作为调用Subscriber是 ActiveMQ 在您的情况下的责任。

From your topic publish your message to ActiveMQ server and all the subscribers who subscribed to that topic will get your message without manually triggering it.

从您的主题将您的消息发布到 ActiveMQ 服务器,所有订阅该主题的订阅者都将收到您的消息,而无需手动触发它。

refer this as your initial POC http://activemq.apache.org/hello-world.html.

将此作为您的初始 POC http://activemq.apache.org/hello-world.html

You can use below java code for subscribing for topic for client2and client3

您可以使用下面的 java 代码为client2client3订阅主题

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class consumer {
    // URL of the JMS server
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    // Name of the topic from which we will receive messages from = " testt"

    public static void main(String[] args) throws JMSException {
        // Getting JMS connection from the server

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("testt");

        MessageConsumer consumer = session.createConsumer(topic);

        MessageListener listner = new MessageListener() {
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("Received message"
                                + textMessage.getText() + "'");
                    }
                } catch (JMSException e) {
                    System.out.println("Caught:" + e);
                    e.printStackTrace();
                }
            }
        };
        consumer.setMessageListener(listner);

        try {
              System.in.read();
         } catch (IOException e) {
             e.printStackTrace();
         }
    connection.close();

}
}