前言
之前写了一篇为什么智能硬件首选MQTT - 掘金,这次就来搭建一个自己的MQTT交互平台,实际体验一下,没有实战怎么能行。
一、服务端准备
1. 选择平台
我这里用的平台是EMQX Cloud,可以通过github账号免费申请一个MQTT服务器,对于个人使用来说特别方便,同时使用使用 MQTT 客户端快速测试 MQTT 服务去监听或者模拟下发,这里我们选择免费开启,点击立即部署然后一直同意就建立好了。
2. 启动服务
建立好以后我们点击项目管理,里面就会出现一个我们刚申请的服务器,进去后点击启动,这样我们就把服务启动起来了。
3. 创建用户
点击认证鉴权后选择认证,然后点击右边的添加,即可创建我们的连接用户,这个用户的名称和密码就是我们客户端一会建立连接的时候需要的username和password。至此我们就可以去客户端去写连接代码了。
二、客户端搭建
1. 引入
代码语言:javascript复制dependencies {
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
2. AndroidManifest.xml 配置
代码语言:javascript复制<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<application
...
<service android:name="org.eclipse.paho.android.service.MqttService" />
</application>
3. 创建MQTT客户端
代码语言:javascript复制private static MqttAndroidClient mqttAndroidClient;
private static String mqttUsername = ""; //服务端创建的用户名
private static String mqttPassword = ""; //服务端吧创建的用户名密码
private static String clientId = ""; //唯一标识不可重复
//接受消息的队列
public static final LinkedBlockingQueue<MyMessage> SERVER_QUEUE = new LinkedBlockingQueue<>(
200);
//消息订阅的topic,可以自定义
private static final String topic = "/" mqttUsername "/" clientId "/user/get";
public static void initIot() {
String serverUrl = "服务器地址:端口";
try {
mqttAndroidClient = new MqttAndroidClient(context, serverUrl, "clientId");
mqttAndroidClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
Log.i(TAG, "连接断开");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.i(TAG, "收到消息:" message.toString());
//建议使用队列接收
MyMessage myMessage = new MyMessage();
myMessage.setData(message.getPayload());
boolean offer = SERVER_QUEUE.offer(aMessage);
if (!offer) {
Log.e(TAG, "队列已满,无法接受消息!");
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.i(TAG, "deliveryComplete: " token.toString());
}
});
//建立连接规则
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttUsername);
options.setPassword(mqttPassword.toCharArray());
options.setCleanSession(true);
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //MQTT版本
options.setConnectionTimeout(10); //连接超时时间
options.setKeepAliveInterval(180); //心跳间隔时间
options.setMaxInflight(100); //最大请求数,默认10,高流量场景可以增大该值
options.setAutomaticReconnect(true); //设置自动重新连接
mqttAndroidClient.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.i(TAG, "连接成功");
//这里订阅消息
subscribe();
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.i(TAG, "连接失败" exception);
}
});
} catch (Exception e) {
Log.e(TAG, "INIT IOT ERROR!");
}
}
public class MyMessage {
public Object data;
public MyMessage() {
}
public MyMessage(Object data) {
this.data = data;
}
public Object getData() {
return this.data;
}
public void setData(Object data) {
this.data = data;
}
}
4. 订阅消息
代码语言:javascript复制private static void subscribe() {
try {
mqttAndroidClient.subscribe(topic, 1, null,
new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.i(TAG,
"订阅成功 topic: "
topic);
}
@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
Log.e(TAG, "订阅失败!" exception.getMessage());
}
});
} catch (Exception e) {
Log.e(TAG, "订阅失败!" e.getMessage());
}
}
5. 发布消息
代码语言:javascript复制//消息发送队列
public static final LinkedBlockingQueue<String> CLIENT_QUEUE = new LinkedBlockingQueue<>(1000);
//发布消息调用这个方法
public static void putQueue(String msg) {
boolean offer = CLIENT_QUEUE.offer(msg);
if (!offer) {
Log.w(TAG, "操作队列已满!");
}
}
//使用线程去读取队列,这样可以防止同一时间多处调用,同时也不会让发送事件丢失
static class IotPublishRunnable implements Runnable {
@Override
public void run() {
while (true) {
try {
String msg = CLIENT_QUEUE.take();
if (TextUtils.isEmpty(msg)) {
continue;
}
publish(msg);
Thread.sleep(300);
} catch (Exception e) {
Log.e(TAG, "处理iot消息失败");
}
}
}
}
private static void publishNew(String payload) {
String topic = "/" mqttUsername "/" clientId "/user/update";
Integer qos = 1;
try {
if (null == mqttAndroidClient || !mqttAndroidClient.isConnected()) {
Log.w(TAG, "IOT还未初始化!无法发送消息");
return;
}
mqttAndroidClient.publish(topic, payload.getBytes(StandardCharsets.UTF_8), qos, false,
null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
String[] topics = asyncActionToken.getTopics();
Log.e(TAG, "publish message error! topics: " Arrays.toString(topics));
}
});
} catch (MqttException e) {
Log.e(TAG, "发送消息失败!");
} catch (IllegalArgumentException e) {
Log.e(TAG, "MQTT CLIENT ERROR");
}
}
6. 断开连接
代码语言:javascript复制public static void disconnect() {
if (null == mqttAndroidClient || !mqttAndroidClient.isConnected()) {
Log.w(TAG, "IOT还未初始化!");
return;
}
try {
mqttAndroidClient.disconnect().setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.i(TAG, "断开连接成功!");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.i(TAG, "断开连接失败!");
}
});
} catch (MqttException e) {
Log.e(TAG, e.getMessage());
}
}
结尾
以上就是客户端的MQTT代码,我是用Java写的,Kotlin版的建议参考Android 使用 Kotlin 连接 MQTT,代码基本就在这里了,项目啥的就不放了。