本文以thingsboard-3.1.1为例说明
正文
thingsboard在内存里面是记录了每个设备(包括网关)的在线状态的,在数据attribute_kv表中active字段对应的就是设备在线状态的值。
thingsboard的对mqtt消息的处理是由MqttTransportHandler来完成的,底层通信基于netty实现,熟悉netty的开发者对ChannelInboundHandlerAdapter一定特别熟悉,咱们直接看下MqttTransportHandler是如何重载channelRead方法的,如下所示:
代码语言:javascript复制@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.trace("[{}] Processing msg: {}", sessionId, msg);
try {
if (msg instanceof MqttMessage) {
processMqttMsg(ctx, (MqttMessage) msg);
} else {
ctx.close();
}
} finally {
ReferenceCountUtil.safeRelease(msg);
}
}
代码语言:javascript复制private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
address = (InetSocketAddress) ctx.channel().remoteAddress();
if (msg.fixedHeader() == null) {
log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
processDisconnect(ctx);
return;
}
deviceSessionCtx.setChannel(ctx);
switch (msg.fixedHeader().messageType()) {
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg);
break;
case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
if (checkConnected(ctx, msg)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
transportService.reportActivity(sessionInfo);
}
break;
case DISCONNECT:
if (checkConnected(ctx, msg)) {
processDisconnect(ctx);
}
break;
default:
break;
}
}
从上面的方法可以看到thingsboard是如何处理mqtt消息的,针对connect、publish、dusbsrcribe等消息类型进行了处理,processConnect与processDisconnect方法是处理设备连接/断开连接的,在processConnect方法中创建了设备的在线信息到内存中,processDisconnect则相反。
processConnect是建立连接,但是要维护设备的实时连接状态,只处理连接消息肯定是不够的,thingsboard还会处理publish(属性更新以及遥测值上传)等消息也会更新设备的活动状态,具体可以参考TransportService.reportActivity方法。
代码语言:javascript复制// 属性更新以及遥测值上传
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
if (!checkConnected(ctx, mqttMsg)) {
return;
}
String topicName = mqttMsg.variableHeader().topicName();
int msgId = mqttMsg.variableHeader().packetId();
log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
if (gatewaySessionHandler != null) {
handleGatewayPublishMsg(topicName, msgId, mqttMsg);
transportService.reportActivity(sessionInfo);
}
} else {
processDevicePublish(ctx, mqttMsg, topicName, msgId);
}
}
代码语言:javascript复制@Override
public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
reportActivityInternal(sessionInfo);
}
private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
UUID sessionId = toSessionId(sessionInfo);
SessionMetaData sessionMetaData = sessions.get(sessionId);
if (sessionMetaData != null) {
sessionMetaData.updateLastActivityTime();
}
return sessionMetaData;
}
可以看到每次设备(通过设备自身或者通过网关上传数据)都会更新设备的最后活跃时间字段。看到这里一直没有看到除了设备主动关闭连接的情况下thingsboard是如何清理过期连接的,接下来是本场的主角:DefaultTransportService.checkInactivityAndReportActivity方法:
代码语言:javascript复制 private void checkInactivityAndReportActivity() {
long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
sessions.forEach((uuid, sessionMD) -> {
long lastActivityTime = sessionMD.getLastActivityTime();
TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo();
if (sessionInfo.getGwSessionIdMSB() > 0 &&
sessionInfo.getGwSessionIdLSB() > 0) {
SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB()));
if (gwMetaData != null) {
lastActivityTime = Math.max(gwMetaData.getLastActivityTime(), lastActivityTime);
}
}
if (lastActivityTime < expTime) {
// 长时间没有对话,session过期
if (log.isDebugEnabled()) {
log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime);
}
process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
sessions.remove(uuid);
sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
} else {
if (lastActivityTime > sessionMD.getLastReportedActivityTime()) {
final long lastActivityTimeFinal = lastActivityTime;
process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
.setAttributeSubscription(sessionMD.isSubscribedToAttributes())
.setRpcSubscription(sessionMD.isSubscribedToRPC())
.setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback<Void>() {
@Override
public void onSuccess(Void msg) {
sessionMD.setLastReportedActivityTime(lastActivityTimeFinal);
}
@Override
public void onError(Throwable e) {
log.warn("[{}] Failed to report last activity time", uuid, e);
}
});
}
}
});
}
checkInactivityAndReportActivity这个方法是DefaultTransportService创建时启动的一个定时检测任务,
private final ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
sessions是以设备id为主键的ConcurrentMap对象,这个方法就会定期去扫描sessions里的session数据,长时间与thingsboard未进行会话就会关闭与设备的会话连接,并清除内存保存的会话数据。