Java 使用 PAHO 订阅和读取 MQTT 消息

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/22715682/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 17:27:21  来源:igfitidea点击:

Subscribe and Read MQTT Message Using PAHO

javaeclipsemqtt

提问by Goot

I'm using paho to send and receive mqtt messages. So far it has been no problem to send the messages, I'm receiving them by using mosquitto.

我正在使用 paho 发送和接收 mqtt 消息。到目前为止,发送消息没有问题,我正在使用 mosquitto 接收它们。

Now I want to read the messages by using a java client and I noticed that there has been less documentation about receiving the messages.

现在我想使用 java 客户端读取消息,我注意到关于接收消息的文档较少。

I implemented the MqttCallback interface but I still couldn't figure out how to read a message of a topic I've subscribed to.

我实现了 MqttCallback 接口,但我仍然不知道如何阅读我订阅的主题的消息。

This is my sourcecode so far, I can read the messages using mosquitto_sub.

到目前为止,这是我的源代码,我可以使用 mosquitto_sub 读取消息。

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PahoDemo implements MqttCallback {
    MqttClient client;
    MqttClient subClient;

    public PahoDemo() {
    }

    public static void main(String[] args) {
        new PahoDemo().doDemo();
    }

    public void doDemo() {
        try {
            client = new MqttClient("tcp://192.168.118.11:1883", "Sending");
            subClient = new MqttClient("tcp://192.168.118.11:1883",
                    "Subscribing");
            client.connect();
            subClient.connect();
            subClient.subscribe("foo");
            MqttMessage message = new MqttMessage();
            message.setPayload("A single message from my computer fff"
                    .getBytes());
            client.publish("foo", message);
            client.disconnect();
            client.close();
            subClient.disconnect();
            subClient.close();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void connectionLost(Throwable cause) {
        // TODO Auto-generated method stub

    }

    @Override
    public void messageArrived(String topic, MqttMessage message)
            throws Exception {
 System.out.println(message);       
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // TODO Auto-generated method stub

    }

}

采纳答案by hardillb

You are closing the client down before the broker has time to send the message back.

在代理有时间发回消息之前,您正在关闭客户端。

Also you don't need 2 instance of the client, you can send and receive with just one.

此外,您不需要 2 个客户端实例,只需一个即可发送和接收。

I've edited your code a little, it now will continue to run and receive messages until you kill it.

我已经稍微编辑了您的代码,它现在将继续运行并接收消息,直到您将其杀死。

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PahoDemo implements MqttCallback {

MqttClient client;

public PahoDemo() {
}

public static void main(String[] args) {
    new PahoDemo().doDemo();
}

public void doDemo() {
    try {
        client = new MqttClient("tcp://192.168.118.11:1883", "Sending");
        client.connect();
        client.setCallback(this);
        client.subscribe("foo");
        MqttMessage message = new MqttMessage();
        message.setPayload("A single message from my computer fff"
                .getBytes());
        client.publish("foo", message);
    } catch (MqttException e) {
        e.printStackTrace();
    }
}

@Override
public void connectionLost(Throwable cause) {
    // TODO Auto-generated method stub

}

@Override
public void messageArrived(String topic, MqttMessage message)
        throws Exception {
 System.out.println(message);   
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
    // TODO Auto-generated method stub

}

}

EDIT: added the missing client.setCallback(this)

编辑:添加了缺失的 client.setCallback(this)