thingsboard如何维护设备的状态的

2022-03-29 13:49:50 浏览数 (1)

本文以thingsboard-3.1.1为例说明

正文

thingsboard在内存里面是记录了每个设备(包括网关)的在线状态的,在数据attribute_kv表中active字段对应的就是设备在线状态的值。

thingsboard的对mqtt消息的处理是由MqttTransportHandler来完成的,底层通信基于netty实现,熟悉netty的开发者对ChannelInboundHandlerAdapter一定特别熟悉,咱们直接看下MqttTransportHandler是如何重载channelRead方法的,如下所示:

代码语言:javascript复制
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        log.trace("[{}] Processing msg: {}", sessionId, msg);
        try {
            if (msg instanceof MqttMessage) {
                processMqttMsg(ctx, (MqttMessage) msg);
            } else {
                ctx.close();
            }
        } finally {
            ReferenceCountUtil.safeRelease(msg);
        }
    }
代码语言:javascript复制
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
        address = (InetSocketAddress) ctx.channel().remoteAddress();
        if (msg.fixedHeader() == null) {
            log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
            processDisconnect(ctx);
            return;
        }
        deviceSessionCtx.setChannel(ctx);
        switch (msg.fixedHeader().messageType()) {
            case CONNECT:
                processConnect(ctx, (MqttConnectMessage) msg);
                break;
            case PUBLISH:
                processPublish(ctx, (MqttPublishMessage) msg);
                break;
            case SUBSCRIBE:
                processSubscribe(ctx, (MqttSubscribeMessage) msg);
                break;
            case UNSUBSCRIBE:
                processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
                break;
            case PINGREQ:
                if (checkConnected(ctx, msg)) {
                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
                    transportService.reportActivity(sessionInfo);
                }
                break;
            case DISCONNECT:
                if (checkConnected(ctx, msg)) {
                    processDisconnect(ctx);
                }
                break;
            default:
                break;
        }
    }

从上面的方法可以看到thingsboard是如何处理mqtt消息的,针对connect、publish、dusbsrcribe等消息类型进行了处理,processConnect与processDisconnect方法是处理设备连接/断开连接的,在processConnect方法中创建了设备的在线信息到内存中,processDisconnect则相反。

processConnect是建立连接,但是要维护设备的实时连接状态,只处理连接消息肯定是不够的,thingsboard还会处理publish(属性更新以及遥测值上传)等消息也会更新设备的活动状态,具体可以参考TransportService.reportActivity方法。

代码语言:javascript复制
// 属性更新以及遥测值上传
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
        if (!checkConnected(ctx, mqttMsg)) {
            return;
        }
        String topicName = mqttMsg.variableHeader().topicName();
        int msgId = mqttMsg.variableHeader().packetId();
        log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);

        if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
            if (gatewaySessionHandler != null) {
                handleGatewayPublishMsg(topicName, msgId, mqttMsg);
                transportService.reportActivity(sessionInfo);
            }
        } else {
            processDevicePublish(ctx, mqttMsg, topicName, msgId);
        }
    }
代码语言:javascript复制
@Override
    public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
        reportActivityInternal(sessionInfo);
    }

    private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
        UUID sessionId = toSessionId(sessionInfo);
        SessionMetaData sessionMetaData = sessions.get(sessionId);
        if (sessionMetaData != null) {
            sessionMetaData.updateLastActivityTime();
        }
        return sessionMetaData;
    }

可以看到每次设备(通过设备自身或者通过网关上传数据)都会更新设备的最后活跃时间字段。看到这里一直没有看到除了设备主动关闭连接的情况下thingsboard是如何清理过期连接的,接下来是本场的主角:DefaultTransportService.checkInactivityAndReportActivity方法:

代码语言:javascript复制
    private void checkInactivityAndReportActivity() {
        long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
        sessions.forEach((uuid, sessionMD) -> {
            long lastActivityTime = sessionMD.getLastActivityTime();
            TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo();
            if (sessionInfo.getGwSessionIdMSB() > 0 &&
                    sessionInfo.getGwSessionIdLSB() > 0) {
                SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB()));
                if (gwMetaData != null) {
                    lastActivityTime = Math.max(gwMetaData.getLastActivityTime(), lastActivityTime);
                }
            }
            
            if (lastActivityTime < expTime) {
                // 长时间没有对话,session过期
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime);
                }
                process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
                sessions.remove(uuid);
                sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
            } else {
                if (lastActivityTime > sessionMD.getLastReportedActivityTime()) {
                    final long lastActivityTimeFinal = lastActivityTime;
                    process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
                            .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
                            .setRpcSubscription(sessionMD.isSubscribedToRPC())
                            .setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback<Void>() {
                        @Override
                        public void onSuccess(Void msg) {
                            sessionMD.setLastReportedActivityTime(lastActivityTimeFinal);
                        }

                        @Override
                        public void onError(Throwable e) {
                            log.warn("[{}] Failed to report last activity time", uuid, e);
                        }
                    });
                }
            }
        });
    }

checkInactivityAndReportActivity这个方法是DefaultTransportService创建时启动的一个定时检测任务,

private final ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();

sessions是以设备id为主键的ConcurrentMap对象,这个方法就会定期去扫描sessions里的session数据,长时间与thingsboard未进行会话就会关闭与设备的会话连接,并清除内存保存的会话数据。

0 人点赞