背景
最近项目中用到了mqtt的协议,今天特地抽时间学习了一下,搭建了一个基于nodejs的mqtt的服务。现在写一篇文章记录,分享给大家。
本案例使用的nodejs,mqtt的代理服务是使用的是mosca
。客户端使用的是mqtt.js
。
其实网上也有几篇写mqtt的集成,但写的都很随便,时间也比较久,对于集成中的问
题,大多是一带而过,这篇文章我不仅带大家了解mqtt的主要特点,还交大家真正实战mqtt协议。
mqtt简介
如果有人问你什么是mqtt协议,有什么特点,你就用下面这段官方的话来回答他。
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
整理以上的概念描述我们可以得到, mqtt协议的特点
- 占用带宽小
- 稳定可靠
- 发布/订阅
- 使用代码少
后面我将根据mqtt的这几个特点,为大家在实战中一一演示,解释mqtt的这些特点。
mqtt协议需要一个中间代理服务器,这个东西是其实就像一个传话筒,把客户端推送的消息,推送给指定的客户端。
主要用三个角色,订阅,mqtt代理,发布。下面这张图解释的很清楚。
概念性的东西的我们就说到这里,下面进入实战阶段。
nodejs集成mqtt
- 创建一个mqtt-test文件夹
- 使用
npm init
初始化成nodejs的项目目录 安装二个npm包mqtt
和mosca
npm install mosca mqtt --save
编写服务端代码
根目录下创建一个index.js
这里是用于编写服务端的代码,创建mqtt代理服务,
接受客户端发送的事件进行业务逻辑处理,然后再推送给订阅者。
index.js的代码如下
代码语言:javascript复制const mosca = require("mosca");
const MqttServer = new mosca.Server({
port: 1883
});
MqttServer.on("clientConnected", function (client) {
//当有客户端连接时的回调.
console.log("client connected", client.id);
});
/**
* 监听MQTT主题消息
* 当客户端有连接发布主题消息时
**/
MqttServer.on("published", function (packet, client) {
var topic = packet.topic;
switch (topic) {
case "temperature":
// console.log('message-publish', packet.payload.toString());
//MQTT可以转发主题消息至其他主题
//MqttServer.publish({ topic: 'other', payload: 'sssss' });
break;
case "other":
console.log("message-123", packet.payload.toString());
break;
}
});
MqttServer.on("ready", function () {
//当服务开启时的回调
console.log("mqtt is running...");
});
代码很好理解。
使用new mosca.Server()
来创建一个服务,配置参数里只指定了一个端口,当然这里还有很多配置参数,可以指定静态目录,也可以配置ssl证书。
具体的文档在这里可以查阅得到。
具体参数如下
- port 创建服务的端口
- host 服务的IP地址
- backend 配置Ascoltatore后端的所有参数对象
- ascoltatore
- keyPath 证书的key存放地址
- certPath 证书的秘钥存放地址
- static 配置一个web服务的目录
- stats 推送的频率, 每10s。
事件列表:
- clientConnected, 当客户端被链接时触发,客户端作为一个参数
- clientDisconnecting, 当客户端正在断开链接时触发,客户端作为一个参数
- clientDisconnected, 当客户端已经断开链接时触发
- published, 当推送了一个新的消息时触发
- subscribed, 客户端当订阅了一个主题时触发
- unsubscribed, 当客户端取消订阅时触发
我们在index.js
中监听了几个事件,
客户端链接clientConnected
,发布主题消息published
,
以及在服务启动后的回调函数ready
代码逻辑清晰,易于理解。这正是mqtt的使用简单,便捷的特征。
需要注意的是我们虽然监听了published
事件,但其实没有做任何的操作。
只是打印了消息体。这里并没有编写推送给订阅者的代码。
其实这部分在客户端订阅时限定的。客户端端可以只订阅自己需要的topic
编写推送方客户端的代码
下面我们来看一下二个客户端的代码,第一是推送方。 在根目录创建pus.js,编写内容如下
代码语言:javascript复制const mqtt = require("mqtt");
const client = mqtt.connect("mqtt://127.0.0.1:1883");
setInterval(function () {
const value = Math.ceil(Math.random() * 40);
client.publish("temperature", value.toString(), { qos: 0, retain: true });
}, 2000);
代码很简单,一句话就能解释清楚, 创建一个客户端,连接到mqtt服务,然后每2秒推送一个消息, 这里推送的消息有几个参数比较重要需要解释一下
调用client.publish()
方法的第一个参数是叫做topic
表明这个消息的主题或者分类。
第二个参数是消息体,可以是String 也可以是Buffer,
这里的例子topic是temperature
,注意等下需要用到这个参数。
第三个参数就是就是一些推送的参数,是一个对象。包含以下诸多参数,
- qos 定义QoS的等级, 默认为0
- retain 保留的标志, 布尔类型,默认false
- dup 是否标记为副本, 布尔类型, 默认false
第四个参数是 callback 当报错时会触发该回调函数。
更加具体的参数请查询 https://www.npmjs.com/package/mqtt#publish
此外每一个客户端都有很多方法
- mqtt.connect()
- mqtt.Client()
- mqtt.Client#publish()
- mqtt.Client#subscribe()
- mqtt.Client#unsubscribe()
- mqtt.Client#end()
- mqtt.Client#removeOutgoingMessage()
- mqtt.Client#reconnect()
- mqtt.Client#handleMessage()
- mqtt.Client#connected
- mqtt.Client#reconnecting
- mqtt.Client#getLastMessageId()
- mqtt.Store()
- mqtt.Store#put()
- mqtt.Store#del()
- mqtt.Store#createStream()
- mqtt.Store#close()
这里就不一一展开每个api的用法,后面的文章会慢慢讲解。 mqtt的api文档地址 https://www.npmjs.com/package/mqtt#api
编写订阅方客户端的代码
介绍为了推送方的js编写,我们来编写订阅方的js。
有了推送,必须要有订阅才能完整
在根目录下创建sub.js
,内容如下
const mqtt = require("mqtt");
const client = mqtt.connect("mqtt://127.0.0.1:1883");
client.on("connect", function () {
console.log("服务器连接成功");
client.subscribe("temperature", { qos: 1 });
});
client.on("message", function (top, message) {
console.log("当前topic:", top);
console.log("当前温度:", message.toString());
});
订阅方的代码也很简单,首先是链接到mqtt代理服务器,
链接成功后会触发connect
事件,然后在这个事件的回调函数里,调用subscribe()
方法
订阅某个主题下的消息事件。
然后监听一个叫做message
的事件,
回调函数里有二个参数,一个是topic,一个消息体mesage