MQTT这么好玩不来自己搭建一个吗

2023-11-22 09:57:53 浏览数 (2)

前言

之前写了一篇为什么智能硬件首选MQTT - 掘金,这次就来搭建一个自己的MQTT交互平台,实际体验一下,没有实战怎么能行。

一、服务端准备

1. 选择平台

我这里用的平台是EMQX Cloud,可以通过github账号免费申请一个MQTT服务器,对于个人使用来说特别方便,同时使用使用 MQTT 客户端快速测试 MQTT 服务去监听或者模拟下发,这里我们选择免费开启,点击立即部署然后一直同意就建立好了。

2. 启动服务

建立好以后我们点击项目管理,里面就会出现一个我们刚申请的服务器,进去后点击启动,这样我们就把服务启动起来了。

3. 创建用户

点击认证鉴权后选择认证,然后点击右边的添加,即可创建我们的连接用户,这个用户的名称和密码就是我们客户端一会建立连接的时候需要的username和password。至此我们就可以去客户端去写连接代码了。

二、客户端搭建

1. 引入

代码语言:javascript复制
dependencies {
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' 
}

2. AndroidManifest.xml 配置

代码语言:javascript复制
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

<application
   ...
   <service android:name="org.eclipse.paho.android.service.MqttService" />
</application>

3. 创建MQTT客户端

代码语言:javascript复制
private static MqttAndroidClient mqttAndroidClient;
private static String mqttUsername = ""; //服务端创建的用户名
private static String mqttPassword = ""; //服务端吧创建的用户名密码
private static String clientId = ""; //唯一标识不可重复
 //接受消息的队列
public static final LinkedBlockingQueue<MyMessage> SERVER_QUEUE = new LinkedBlockingQueue<>(
            200);

//消息订阅的topic,可以自定义
private static final String topic = "/"   mqttUsername   "/"   clientId   "/user/get"; 


public static void initIot() {

        String serverUrl = "服务器地址:端口";

        try {
            mqttAndroidClient = new MqttAndroidClient(context, serverUrl, "clientId");

            mqttAndroidClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    Log.i(TAG, "连接断开");
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    Log.i(TAG, "收到消息:"   message.toString());

                    //建议使用队列接收
                    MyMessage myMessage = new MyMessage();
                    myMessage.setData(message.getPayload());
                    boolean offer = SERVER_QUEUE.offer(aMessage);
                    if (!offer) {
                        Log.e(TAG, "队列已满,无法接受消息!");
                    }
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    Log.i(TAG, "deliveryComplete: "   token.toString());

                }
            });

            //建立连接规则
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttUsername);
            options.setPassword(mqttPassword.toCharArray());
            options.setCleanSession(true);
            options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //MQTT版本
            options.setConnectionTimeout(10); //连接超时时间
            options.setKeepAliveInterval(180); //心跳间隔时间
            options.setMaxInflight(100); //最大请求数,默认10,高流量场景可以增大该值
            options.setAutomaticReconnect(true); //设置自动重新连接

            mqttAndroidClient.connect(options, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.i(TAG, "连接成功");
                    //这里订阅消息
                    subscribe();
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.i(TAG, "连接失败"   exception);
                }
            });
        } catch (Exception e) {
            Log.e(TAG, "INIT IOT ERROR!");
        }
    }

public class MyMessage {
    
    public Object data;
    
    public MyMessage() {
    }

    public MyMessage(Object data) {
        this.data = data;
    }
    
    public Object getData() {
        return this.data;
    }
    
    public void setData(Object data) {
        this.data = data;
    }
}

4. 订阅消息

代码语言:javascript复制
private static void subscribe() {
        try {
            mqttAndroidClient.subscribe(topic, 1, null,
                    new IMqttActionListener() {
                        @Override
                        public void onSuccess(IMqttToken asyncActionToken) {
                            Log.i(TAG,
                                    "订阅成功 topic: "
                                              topic);
                        }

                        @Override
                        public void onFailure(IMqttToken asyncActionToken,
                                              Throwable exception) {
                            Log.e(TAG, "订阅失败!"   exception.getMessage());
                        }
                    });

        } catch (Exception e) {
            Log.e(TAG, "订阅失败!"   e.getMessage());
        }
    }

5. 发布消息

代码语言:javascript复制
//消息发送队列
public static final LinkedBlockingQueue<String> CLIENT_QUEUE = new LinkedBlockingQueue<>(1000);

//发布消息调用这个方法
public static void putQueue(String msg) {
        boolean offer = CLIENT_QUEUE.offer(msg);
        if (!offer) {
            Log.w(TAG, "操作队列已满!");
        }
    }

//使用线程去读取队列,这样可以防止同一时间多处调用,同时也不会让发送事件丢失
static class IotPublishRunnable implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    String msg = CLIENT_QUEUE.take();
                    if (TextUtils.isEmpty(msg)) {
                        continue;
                    }
                    publish(msg);
                    Thread.sleep(300);
                } catch (Exception e) {
                    Log.e(TAG, "处理iot消息失败");
                }

            }
        }
    }

private static void publishNew(String payload) {
        String topic = "/"   mqttUsername   "/"   clientId   "/user/update";
        Integer qos = 1;

        try {
            if (null == mqttAndroidClient || !mqttAndroidClient.isConnected()) {
                Log.w(TAG, "IOT还未初始化!无法发送消息");
                return;
            }
            mqttAndroidClient.publish(topic, payload.getBytes(StandardCharsets.UTF_8), qos, false,
                    null, new IMqttActionListener() {
                        @Override
                        public void onSuccess(IMqttToken asyncActionToken) {

                        }

                        @Override
                        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                            String[] topics = asyncActionToken.getTopics();
                            Log.e(TAG, "publish message error! topics: "   Arrays.toString(topics));
                        }
                    });
        } catch (MqttException e) {
            Log.e(TAG, "发送消息失败!");
        } catch (IllegalArgumentException e) {
            Log.e(TAG, "MQTT CLIENT ERROR");
        }
    }

6. 断开连接

代码语言:javascript复制
public static void disconnect() {
        if (null == mqttAndroidClient || !mqttAndroidClient.isConnected()) {
            Log.w(TAG, "IOT还未初始化!");
            return;
        }

        try {
            mqttAndroidClient.disconnect().setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.i(TAG, "断开连接成功!");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.i(TAG, "断开连接失败!");
                }
            });
        } catch (MqttException e) {
            Log.e(TAG, e.getMessage());
        }
    }

结尾

以上就是客户端的MQTT代码,我是用Java写的,Kotlin版的建议参考Android 使用 Kotlin 连接 MQTT,代码基本就在这里了,项目啥的就不放了。

0 人点赞