为了确保遥测数据被thingsboard接收到,在设备以及thingsboard之间传递时序数据是有确认机制的,具体在transport-mqtt模块中,如下所示:
下面看下MqttTransportHandler类相关部分代码
代码语言:javascript复制 private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
if (!checkConnected(ctx, mqttMsg)) {
return;
}
String topicName = mqttMsg.variableHeader().topicName();
int msgId = mqttMsg.variableHeader().packetId();
log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
if (gatewaySessionHandler != null) {
// 通过网关上传数据
handleGatewayPublishMsg(topicName, msgId, mqttMsg);
transportService.reportActivity(sessionInfo);
}
} else {
// 通过设备上传数据
processDevicePublish(ctx, mqttMsg, topicName, msgId);
}
}
下面看下processDevicePublish相关部分,从下面的代码可以看到端倪:将遥测数据发送队列后会执行getPubAckCallback进行数据ack操作
代码语言:javascript复制private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
try {
// 发送遥测数据
if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = adaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
// getPubAckCallback是成功回调
transportService.process(sessionInfo, postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
}
代码语言:javascript复制 private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) {
return new TransportServiceCallback<Void>() {
@Override
public void onSuccess(Void dummy) {
log.trace("[{}] Published msg: {}", sessionId, msg);
if (msgId > 0) {
// 发送回执
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
}
}
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
processDisconnect(ctx);
}
};
}