数据实时反馈技术

2022-07-28 14:30:39 浏览数 (2)

其实不知道怎么起这个标题,这是一个这样的场景,在开发后台管理系统,尤其是实时监控系统的时候,往往需要展示数据的不断更新变化。常用的技术就是轮询,或者使用websocket进行长连接实时通讯。我们知道webpack在调试模式的时候有个热更新功能,它是通过服务器数据推送功能实现的。就是所谓的Server-Sent Events(SSE).

具体原理和用法可以参考阮一峰 的文章

http://www.ruanyifeng.com/blog/2017/05/server-sent_events.html

本文将结合Node.js、SSE、Koa、Pm2、Rxjs技术来实现一个优雅的数据实时反馈的开发技术。 在Node.js端,我们可以安装一个NPM包:http-event-stream来做服务端推送。这个包源码十分简单,读者可以自己看一下,自己写也不难。 到目前为止,从服务端出发到浏览器端,数据实时更新是很简单了,但还差最后的开发体验,就是如何将服务器端的数据实时“推送”到带有http-event-stream的请求中去呢?

一种简单的方法,就是当得到来自客户端的SSE请求的时候,启动一个定时器,在定时器里面去获取数据库或者内存中的数据,然后再发送给客户端。 在写这段代码之前,我们需要准备一个中间件用来将Rxjs的事件转换成SSE发送出去。

代码语言:javascript复制
const { createEventStream } = require("http-event-stream")
const { Sink } = require('fastrx')
module.exports = async function(ctx, next) {
    const sink = new Sink
    const stream = createEventStream(ctx.res, {
        onClose() {
            sink.dispose()
        }
    })
    sink.next = data => stream.sendMessage({ event: 'message', data: JSON.stringify(data) })
    sink.complete = err => stream.close();
    sink.subscribe(await next())
    ctx.respond = false
}

这里的fastrx库,是我自己研发的高速Rxjs实现,大家可以去NPM网站上查看。我也有详细写过相关原理的文章。 有了这个中间件后,假定我们需要从MongoDB中每隔5秒读取一次数据。 我们的Controller响应函数就可以这么写(koa-router)

代码语言:javascript复制
async function facade({collection }) {
    return pipe(interval(5000), startWith(0), switchMap(x => fromPromise(collection('apps').aggregate([{
        $lookup: {
            from: "servers",
            as: "servers",
            let: { app: "$_id" },
            pipeline: [
                { $match: { $expr: { $eq: ["$app", "$$app"] } } },
                { $group: { _id: "$env", envs: { $push: "$$ROOT" } } },
                { $project: { k: "$_id", v: "$envs", _id: 0 } }
            ]
        }
    }, { $addFields: { servers: { $arrayToObject: "$servers" } } }]).toArray())))
}

其中collection是来自Mongodb的client提前在Koa中进行了封装。

进阶

定时获取数据有许多局限性,真实场景中,我们往往需要在事件发生的时候及时广播数据到监控前台,而且有些数据并非保存在某地待你去获取的。那么我就需要建立一个数据源到Koa控制器中间的管道。这里介绍在pm2管理下的进程间通讯方式。这种方法也可以扩展到网络间通讯。

代码语言:javascript复制
const subject_rounds = rx.subject()
pm2.launchBus((err, bus) => bus.on('myEvent', ({ data, process }) => subject_rounds.next(data)))

结合koa-router

代码语言:javascript复制
router.get('/getData', eventStreamM, ctx => subject_rounds)

其中eventStreamM就是之前讲的中间件

浏览器端可以通过EventSource对象监听数据了。 在另一个数据源发生的进程中,我们用process.send({type:"myEvent",data:数据})方式广播事件到pm2的事件总线中去就可以了。

对于其他的系统架构,我们可以用不同的技术进行数据的广播,比如消息队列等等。但最终都可以用到Rxjs中的subject作为桥梁给SSE推送事件。Subject就是这种存在,同时担任观察者和可观察对象,对于SSE来说是可观察对象,对于其他数据源来说就是观察者。如果有多个人打开了SSE进行监听,都可以完美应对。

0 人点赞