laravel实现利用RabbitMQ实现MQTT即时通讯
有时候我们的项目中会用到即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时我们需要通知前端支付成功。而 RabbitMQ
可以很方便的实现即时通讯功能,如果你的业务只是少量地方使用即时通信,需要一个简易的消息系统,你可以直接考虑 MQ
的实现, MQ
有很高的吞吐率,具有持久化,还可以横向扩展,总之还不错,用就完了,奥利给!
本文需要安装好 rabbitMQ
和 laravel
,没弄好环境的看我之前的文章 php laravel5.5使用rabbitmq消息队列
MQTT协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP
协议上。 MQTT
最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
MQTT相关概念
实际上还是 MQ
的那些东西,主要看 MQ
有没有实现 MQTT
模型,懂的随便看看,不懂的先去理解 MQ
- Publisher(发布者):消息的发出者,负责发送消息。
- Subscriber(订阅者):消息的订阅者,负责接收并处理消息。
- Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。
- Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。
- Payload(负载);可以理解为发送消息的内容。
- QoS(消息质量):全称Quality of Service,即消息的发送质量,主要有QoS 0、QoS 1、QoS 2三个等级,下面分别介绍下:
- QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;
- QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;
- QoS 2(Exactly Once):只有一次,确保消息只到达一次。
RabbitMQ启用MQTT功能
我们是采用 docker
安装的,直接进入容器一顿操作就行
docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_mqtt
开启成功后,查看管理控制台,我们可以发现 MQTT
服务运行在 1883
端口上了。
MQTT客户端
我们可以使用 MQTT
客户端来测试 MQTT
的即时通讯功能,这里使用的是 MQTTBox
这个客户端工具。
首先下载并安装好 MQTTBox
,下载地址:http://workswithweb.com/mqttbox.html
点击 Create MQTT Client
按钮来创建一个 MQTT
客户端;
接下来对 MQTT
客户端进行配置,主要是配置好协议端口、连接用户名密码和QoS即可, 注意 Protocol
是 mqtt/tcp
然后我们利用这个工具测试一下发布和订阅消息是否可用,一端向 TopicA
发送消息,另一端订阅 TopicA
可用看到效果已经出现了,那么我们如何让前端来订阅呢?
前端实现即时通讯
我们通过 html javascript
实现一个简单的聊天功能,由于 RabbitMQ
与 Web端
交互底层使用的是 WebSocket
,所以我们需要开启 RabbitMQ
的 MQTT WEB
支持,使用如下命令开启即可
docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_web_mqtt
开启成功后,查看管理控制台,我们可以发现 MQTT
的 WEB
服务运行在 15675
端口上了;
WEB端
与 MQTT
服务进行通讯需要使用一个叫 MQTT.js
的库,项目地址:https://github.com/mqttjs/MQTT.js
实现的功能非常简单,一个单聊功能,需要注意的是配置好 MQTT
服务的访问地址为:ws://localhost:15675/ws
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<div>
<label>目标Topic:<input id="targetTopicInput" type="text"></label><br>
<label>发送消息:<input id="messageInput" type="text"></label><br>
<button onclick="sendMessage()">发送</button>
<button onclick="clearMessage()">清空</button>
<div id="messageDiv"></div>
</div>
</body>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
//RabbitMQ的web-mqtt连接地址
const url = 'ws://ip:15675/ws';
//获取订阅的topic
const topic = getQueryString("topic");
//连接到消息队列
let client = mqtt.connect(url);
client.on('connect', function () {
//连接成功后订阅topic
client.subscribe(topic, function (err) {
if (!err) {
showMessage("订阅topic:" topic "成功!");
}
});
});
//获取订阅topic中的消息
client.on('message', function (topic, message) {
showMessage("收到消息:" message.toString());
});
//发送消息
function sendMessage() {
let targetTopic = document.getElementById("targetTopicInput").value;
let message = document.getElementById("messageInput").value;
//向目标topic中发送消息
client.publish(targetTopic, message);
showMessage("发送消息给" targetTopic "的消息:" message);
}
//从URL中获取参数
function getQueryString(name) {
let reg = new RegExp("(^|&)" name "=([^&]*)(&|$)", "i");
let r = window.location.search.substr(1).match(reg);
if (r != null) {
return decodeURIComponent(r[2]);
}
return null;
}
//在消息列表中展示消息
function showMessage(message) {
let messageDiv = document.getElementById("messageDiv");
let messageEle = document.createElement("div");
messageEle.innerText = message;
messageDiv.appendChild(messageEle);
}
//清空消息列表
function clearMessage() {
let messageDiv = document.getElementById("messageDiv");
messageDiv.innerHTML = "";
}
</script>
</html>
在Laravel中使用
需要保证 laravel
和 rabbitmq
已经可以正常生产和发布消息了,保证没问题再进行以下操作
- 安装mqtt包
composer require salmanzafar/laravel-mqtt
- app.php
/*
* Application Service Providers...
*/
SalmanMqttMqttServiceProvider::class,
- MqttService
<?php
namespace AppService;
use IlluminateSupportFacadesAuth;
use SalmanMqttMqttClassMqtt;
class MqttService
{
public static function SendMsgViaMqtt($topic, $message)
{
$mqtt = new Mqtt();
$client_id = Auth::user()->id ?? 0;
$output = $mqtt->ConnectAndPublish($topic, $message, $client_id);
if ($output === true)
{
return "published";
}
return "Failed";
}
}
- laravel生产消息
Route::get('/pub', function () {
$data = [
'name' => 'zhangsan',
'age' => 18
];
$res = AppServiceMqttService::SendMsgViaMqtt('topicA', json_encode($data));
dump($res);
});
请求路由测试
注意:通过url的queryString进行topic订阅
总结
消息中间件应用越来越广泛,不仅可以实现可靠的异步通信,还可以实现即时通讯,掌握一个消息中间件还是很有必要的。像普通的订单下了给后台一个推送等等,都可以选择采用 MQ
实现,方便好用!奥利给!!