MQTT简单封装类-TypeScript实现

2020-07-15 11:01:13 浏览数 (1)

使用过C 和Nodejs的Mqtt封装类,之前使用过mosquitto的C语言库以及mosquittopp的C 库;另外nodejs中也有一个mqtt的模块。

TypeScript中封装MQTT客户端模块

安装mqtt模块

在TypeScript中使用mqtt模块十分简单,首先得用npm或者cnpm安装mqtt模块

代码语言:javascript复制
npm install mqtt

mq.ts 源代码 MQTT类

下面是使用TypeScript封装的mqtt的简单客户端,代码如下:

代码语言:javascript复制
import mqtt = require('mqtt')

export interface MqttConnOpt extends mqtt.IClientOptions{}

export declare type OnMessageFunc = (topic: string, payload: Buffer) => void

declare class Topic {
public topic: string;
public qos: 0|1|2;
}

export class MQTT {
mqclient: mqtt.MqttClient;
brokerHost: string;
brokerPort: number;
subscribeTopics: Array<Topic>;
subscribeCallbacks: Map<string, OnMessageFunc>;

constructor(host?: string | any, port?: number) {
 this.brokerHost = host;
 this.brokerPort = port;
 this.subscribeTopics = new Array<Topic>();
 this.subscribeCallbacks = new Map<string, OnMessageFunc>();
}

/**
* 订阅主题
*/
public subscribe(topic: string, qos: 0|1|2) {
 this.subscribeTopics.push({topic: topic, qos: qos});
 if (this.is_connected()){
   this.mqclient.subscribe(topic, {qos: qos});
 }
}

/**
* 设置消息数据回调函数
*/
public set_message_callback(topicPatten: string, cb: OnMessageFunc) {
 this.subscribeCallbacks.set(topicPatten, cb);
}

/**
* 是否已连接到服务器
*/
public is_connected() {
 return this.mqclient.connected == true;
}

/**
* 连接到服务器
*/
public connect(opts?: MqttConnOpt){
 this.mqclient = mqtt.connect(`mqtt://${this.brokerHost}:${this.brokerPort}`, opts);

 this.mqclient.on('connect', ()=>{
   console.log(`成功连接到服务器[${this.brokerHost}:${this.brokerPort}]`);
   for (let index = 0; index < this.subscribeTopics.length; index  ) {
     const element = this.subscribeTopics[index];
     this.mqclient.subscribe(element.topic, {qos: element.qos});
   }
 });

 this.mqclient.on('message', (topic: string, payload: Buffer)=>{
   this.mqclient;
   this.subscribeCallbacks.forEach((val, key)=>{
     if (topic.indexOf(key) != -1){
       val(topic, payload);
     }
   });
 });

 this.mqclient.on('error', (err: Error)=>{

 });
}

/**
* 推送数据
*/
public publish(topic: string, message: string, qos: 0|1|2) {
 this.mqclient.publish(topic, message, {qos: qos, retain: false})
}

}

测试

代码语言:javascript复制
import { MQTT } from './mq'

// mqtt客户端列表
var mqttList: Array<MQTT> = new Array<MQTT>();

/**
   * 实时数据处理函数
   */
function handleReal(topic: string, payload: Buffer) {
  // console.log(`data: ${topic}=>${payload.toString()}`);
  const topics = topic.split('/');
}

/**
  * 数据包处理函数
  */
function handlePacked(topic: string, payload: Buffer) {
  // console.log(`rx: ${topic}=>${payload.toString()}`);
}

/**
   * 连接MQTT服务器
   */
function connectMqtt(host: string, port: number, user: string, pwd: string, id: string, clean: boolean) {
  let it = new MQTT(host, port);
  it.connect({
    username:user,
    password: pwd,
    clientId: id,
    clean: clean,
  });
  // 订阅主题 /gb212/#
  it.subscribe('/gb212/#', 0);
  // 设置主题 /gb212/data 的消息回调
  it.set_message_callback('/gb212/data', handleReal.bind(this));
  // 设置主题 /gb212/rx 的消息回调
  it.set_message_callback('/gb212/rx', handlePacked.bind(this));
  // 将MQTT变量it放到MQTT客户端列表中
  mqttList.push(it);
}
  
// 连接MQTT服务器
connectMqtt('127.0.0.1', 1883, 'test', '123456', 'this_is_test_200507_nodejs_oilfume_2', true);

0 人点赞