laravel实现利用RabbitMQ实现MQTT即时通讯

2021-01-03 20:23:06 浏览数 (1)

laravel实现利用RabbitMQ实现MQTT即时通讯

有时候我们的项目中会用到即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时我们需要通知前端支付成功。而 RabbitMQ 可以很方便的实现即时通讯功能,如果你的业务只是少量地方使用即时通信,需要一个简易的消息系统,你可以直接考虑 MQ 的实现, MQ 有很高的吞吐率,具有持久化,还可以横向扩展,总之还不错,用就完了,奥利给!

本文需要安装好 rabbitMQlaravel ,没弄好环境的看我之前的文章 php laravel5.5使用rabbitmq消息队列

MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上。 MQTT 最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

MQTT相关概念

实际上还是 MQ 的那些东西,主要看 MQ 有没有实现 MQTT 模型,懂的随便看看,不懂的先去理解 MQ

  • Publisher(发布者):消息的发出者,负责发送消息。
  • Subscriber(订阅者):消息的订阅者,负责接收并处理消息。
  • Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。
  • Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。
  • Payload(负载);可以理解为发送消息的内容。
  • QoS(消息质量):全称Quality of Service,即消息的发送质量,主要有QoS 0、QoS 1、QoS 2三个等级,下面分别介绍下:
  • QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;
  • QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;
  • QoS 2(Exactly Once):只有一次,确保消息只到达一次。

RabbitMQ启用MQTT功能

我们是采用 docker 安装的,直接进入容器一顿操作就行

代码语言:javascript复制
docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_mqtt

开启成功后,查看管理控制台,我们可以发现 MQTT 服务运行在 1883 端口上了。

MQTT客户端

我们可以使用 MQTT 客户端来测试 MQTT 的即时通讯功能,这里使用的是 MQTTBox 这个客户端工具。

首先下载并安装好 MQTTBox ,下载地址:http://workswithweb.com/mqttbox.html

点击 Create MQTT Client 按钮来创建一个 MQTT 客户端;

接下来对 MQTT 客户端进行配置,主要是配置好协议端口、连接用户名密码和QoS即可, 注意 Protocolmqtt/tcp

然后我们利用这个工具测试一下发布和订阅消息是否可用,一端向 TopicA 发送消息,另一端订阅 TopicA

可用看到效果已经出现了,那么我们如何让前端来订阅呢?

前端实现即时通讯

我们通过 html javascript 实现一个简单的聊天功能,由于 RabbitMQWeb端 交互底层使用的是 WebSocket ,所以我们需要开启 RabbitMQMQTT WEB 支持,使用如下命令开启即可

代码语言:javascript复制
docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_web_mqtt

开启成功后,查看管理控制台,我们可以发现 MQTTWEB 服务运行在 15675 端口上了;

WEB端MQTT 服务进行通讯需要使用一个叫 MQTT.js 的库,项目地址:https://github.com/mqttjs/MQTT.js

实现的功能非常简单,一个单聊功能,需要注意的是配置好 MQTT 服务的访问地址为:ws://localhost:15675/ws

代码语言:javascript复制
<!DOCTYPE html>
<html lang="en">
<head>
	<meta charset="UTF-8">
	<title>Title</title>
</head>
<body>
<div>
	<label>目标Topic:<input id="targetTopicInput" type="text"></label><br>
	<label>发送消息:<input id="messageInput" type="text"></label><br>
	<button onclick="sendMessage()">发送</button>
	<button onclick="clearMessage()">清空</button>
	<div id="messageDiv"></div>
</div>
</body>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
    //RabbitMQ的web-mqtt连接地址
    const url = 'ws://ip:15675/ws';
    //获取订阅的topic
    const topic = getQueryString("topic");
    //连接到消息队列
    let client = mqtt.connect(url);
    client.on('connect', function () {
        //连接成功后订阅topic
        client.subscribe(topic, function (err) {
            if (!err) {
                showMessage("订阅topic:"   topic   "成功!");
            }
        });
    });
    //获取订阅topic中的消息
    client.on('message', function (topic, message) {
        showMessage("收到消息:"   message.toString());
    });

    //发送消息
    function sendMessage() {
        let targetTopic = document.getElementById("targetTopicInput").value;
        let message = document.getElementById("messageInput").value;
        //向目标topic中发送消息
        client.publish(targetTopic, message);
        showMessage("发送消息给"   targetTopic   "的消息:"   message);
    }

    //从URL中获取参数
    function getQueryString(name) {
        let reg = new RegExp("(^|&)"   name   "=([^&]*)(&|$)", "i");
        let r = window.location.search.substr(1).match(reg);
        if (r != null) {
            return decodeURIComponent(r[2]);
        }
        return null;
    }

    //在消息列表中展示消息
    function showMessage(message) {
        let messageDiv = document.getElementById("messageDiv");
        let messageEle = document.createElement("div");
        messageEle.innerText = message;
        messageDiv.appendChild(messageEle);
    }

    //清空消息列表
    function clearMessage() {
        let messageDiv = document.getElementById("messageDiv");
        messageDiv.innerHTML = "";
    }
</script>
</html>

在Laravel中使用

需要保证 laravelrabbitmq 已经可以正常生产和发布消息了,保证没问题再进行以下操作

  • 安装mqtt包
代码语言:javascript复制
composer require salmanzafar/laravel-mqtt
  • app.php
代码语言:javascript复制
/*
 * Application Service Providers...
 */
SalmanMqttMqttServiceProvider::class,
  • MqttService
代码语言:javascript复制
<?php

namespace AppService;

use IlluminateSupportFacadesAuth;
use SalmanMqttMqttClassMqtt;

class MqttService
{

    public static function SendMsgViaMqtt($topic, $message)
    {
        $mqtt = new Mqtt();
        $client_id = Auth::user()->id ?? 0;
        $output = $mqtt->ConnectAndPublish($topic, $message, $client_id);

        if ($output === true)
        {
            return "published";
        }

        return "Failed";
    }
}
  • laravel生产消息
代码语言:javascript复制
Route::get('/pub', function () {

    $data = [
        'name' => 'zhangsan',
        'age' => 18
    ];
    $res = AppServiceMqttService::SendMsgViaMqtt('topicA', json_encode($data));
    dump($res);
});

请求路由测试

注意:通过url的queryString进行topic订阅

总结

消息中间件应用越来越广泛,不仅可以实现可靠的异步通信,还可以实现即时通讯,掌握一个消息中间件还是很有必要的。像普通的订单下了给后台一个推送等等,都可以选择采用 MQ 实现,方便好用!奥利给!!

0 人点赞