mqtt协议实战(一)

2022-01-24 15:34:49 浏览数 (1)

背景

最近项目中用到了mqtt的协议,今天特地抽时间学习了一下,搭建了一个基于nodejs的mqtt的服务。现在写一篇文章记录,分享给大家。

本案例使用的nodejs,mqtt的代理服务是使用的是mosca。客户端使用的是mqtt.js。 其实网上也有几篇写mqtt的集成,但写的都很随便,时间也比较久,对于集成中的问 题,大多是一带而过,这篇文章我不仅带大家了解mqtt的主要特点,还交大家真正实战mqtt协议。

mqtt简介

如果有人问你什么是mqtt协议,有什么特点,你就用下面这段官方的话来回答他。

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

整理以上的概念描述我们可以得到, mqtt协议的特点

  • 占用带宽小
  • 稳定可靠
  • 发布/订阅
  • 使用代码少

后面我将根据mqtt的这几个特点,为大家在实战中一一演示,解释mqtt的这些特点。

mqtt协议需要一个中间代理服务器,这个东西是其实就像一个传话筒,把客户端推送的消息,推送给指定的客户端。

主要用三个角色,订阅,mqtt代理,发布。下面这张图解释的很清楚。

概念性的东西的我们就说到这里,下面进入实战阶段。

nodejs集成mqtt

  • 创建一个mqtt-test文件夹
  • 使用npm init初始化成nodejs的项目目录 安装二个npm包 mqttmosca
代码语言:javascript复制
npm install mosca mqtt --save
编写服务端代码

根目录下创建一个index.js

这里是用于编写服务端的代码,创建mqtt代理服务,

接受客户端发送的事件进行业务逻辑处理,然后再推送给订阅者。

index.js的代码如下

代码语言:javascript复制
const mosca = require("mosca");
const MqttServer = new mosca.Server({
  port: 1883
});
MqttServer.on("clientConnected", function (client) {
  //当有客户端连接时的回调.
  console.log("client connected", client.id);
});
/**
 * 监听MQTT主题消息
 * 当客户端有连接发布主题消息时
 **/
MqttServer.on("published", function (packet, client) {
  var topic = packet.topic;
  switch (topic) {
    case "temperature":
      // console.log('message-publish', packet.payload.toString());
      //MQTT可以转发主题消息至其他主题
      //MqttServer.publish({ topic: 'other', payload: 'sssss' });
      break;
    case "other":
      console.log("message-123", packet.payload.toString());
      break;
  }
});

MqttServer.on("ready", function () {
  //当服务开启时的回调
  console.log("mqtt is running...");
});

代码很好理解。 使用new mosca.Server()来创建一个服务,配置参数里只指定了一个端口,当然这里还有很多配置参数,可以指定静态目录,也可以配置ssl证书。 具体的文档在这里可以查阅得到。

具体参数如下

  • port 创建服务的端口
  • host 服务的IP地址
  • backend 配置Ascoltatore后端的所有参数对象
  • ascoltatore
  • keyPath 证书的key存放地址
  • certPath 证书的秘钥存放地址
  • static 配置一个web服务的目录
  • stats 推送的频率, 每10s。

事件列表:

  • clientConnected, 当客户端被链接时触发,客户端作为一个参数
  • clientDisconnecting, 当客户端正在断开链接时触发,客户端作为一个参数
  • clientDisconnected, 当客户端已经断开链接时触发
  • published, 当推送了一个新的消息时触发
  • subscribed, 客户端当订阅了一个主题时触发
  • unsubscribed, 当客户端取消订阅时触发

我们在index.js中监听了几个事件, 客户端链接clientConnected,发布主题消息published, 以及在服务启动后的回调函数ready 代码逻辑清晰,易于理解。这正是mqtt的使用简单,便捷的特征。

需要注意的是我们虽然监听了published事件,但其实没有做任何的操作。 只是打印了消息体。这里并没有编写推送给订阅者的代码。

其实这部分在客户端订阅时限定的。客户端端可以只订阅自己需要的topic

编写推送方客户端的代码

下面我们来看一下二个客户端的代码,第一是推送方。 在根目录创建pus.js,编写内容如下

代码语言:javascript复制
const mqtt = require("mqtt");
const client = mqtt.connect("mqtt://127.0.0.1:1883");
setInterval(function () {
  const value = Math.ceil(Math.random() * 40);
  client.publish("temperature", value.toString(), { qos: 0, retain: true });
}, 2000);

代码很简单,一句话就能解释清楚, 创建一个客户端,连接到mqtt服务,然后每2秒推送一个消息, 这里推送的消息有几个参数比较重要需要解释一下

调用client.publish()方法的第一个参数是叫做topic 表明这个消息的主题或者分类。

第二个参数是消息体,可以是String 也可以是Buffer,

这里的例子topic是temperature,注意等下需要用到这个参数。

第三个参数就是就是一些推送的参数,是一个对象。包含以下诸多参数,

  • qos 定义QoS的等级, 默认为0
  • retain 保留的标志, 布尔类型,默认false
  • dup 是否标记为副本, 布尔类型, 默认false

第四个参数是 callback 当报错时会触发该回调函数。

更加具体的参数请查询 https://www.npmjs.com/package/mqtt#publish

此外每一个客户端都有很多方法

  • mqtt.connect()
  • mqtt.Client()
  • mqtt.Client#publish()
  • mqtt.Client#subscribe()
  • mqtt.Client#unsubscribe()
  • mqtt.Client#end()
  • mqtt.Client#removeOutgoingMessage()
  • mqtt.Client#reconnect()
  • mqtt.Client#handleMessage()
  • mqtt.Client#connected
  • mqtt.Client#reconnecting
  • mqtt.Client#getLastMessageId()
  • mqtt.Store()
  • mqtt.Store#put()
  • mqtt.Store#del()
  • mqtt.Store#createStream()
  • mqtt.Store#close()

这里就不一一展开每个api的用法,后面的文章会慢慢讲解。 mqtt的api文档地址 https://www.npmjs.com/package/mqtt#api

编写订阅方客户端的代码

介绍为了推送方的js编写,我们来编写订阅方的js。 有了推送,必须要有订阅才能完整 在根目录下创建sub.js,内容如下

代码语言:javascript复制
const mqtt = require("mqtt");
const client = mqtt.connect("mqtt://127.0.0.1:1883");
client.on("connect", function () {
  console.log("服务器连接成功");
  client.subscribe("temperature", { qos: 1 });
});
client.on("message", function (top, message) {
  console.log("当前topic:", top);
  console.log("当前温度:", message.toString());
});

订阅方的代码也很简单,首先是链接到mqtt代理服务器, 链接成功后会触发connect事件,然后在这个事件的回调函数里,调用subscribe()方法 订阅某个主题下的消息事件。

然后监听一个叫做message的事件, 回调函数里有二个参数,一个是topic,一个消息体mesage

0 人点赞