thingsboard3.3.4版本之后就有了对应的边缘网关的管理功能,对应的边缘网关项目为thingsboard-edge,相比于之前的普通网关或者设备上传遥测数据,边缘网关增加了很多优势:1、边缘端与云端断开连接时,在边缘端缓冲数据,等连接上之后再将缓冲的数据上传到云端,如下图所示:
2、距离设备更近,网络稳定性更高,能够更快的完成设备联动操作
本文主要讲下边缘网关连接云端的过程:
1、在thingsboard项目创建对应的边缘实例
2、 使用上图中的Edge Key与secret启动thingsboard-edge项目
3、边缘端与云端通信是通过grpc实现的,下面就是建立连接流程
EdgeGrpcService
代码语言:javascript复制 @Override
public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> outputStream) {
return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper,
sendDownlinkExecutorService).getInputStream();
}
接下来看下EdgeGrpcSession的构造过程:
代码语言:javascript复制 EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream,
BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener,
Consumer<EdgeId> sessionCloseListener, ObjectMapper mapper,
ScheduledExecutorService sendDownlinkExecutorService) {
this.sessionId = UUID.randomUUID();
this.ctx = ctx;
this.outputStream = outputStream;
this.sessionOpenListener = sessionOpenListener;
this.sessionCloseListener = sessionCloseListener;
this.mapper = mapper;
this.sendDownlinkExecutorService = sendDownlinkExecutorService;
initInputStream();
}
下面是建立连接的核心方法:initInputStream
代码语言:javascript复制 private void initInputStream() {
this.inputStream = new StreamObserver<>() {
@Override
public void onNext(RequestMsg requestMsg) {
if (!connected && requestMsg.getMsgType().equals(RequestMsgType.CONNECT_RPC_MESSAGE)) {
ConnectResponseMsg responseMsg = processConnect(requestMsg.getConnectRequestMsg());
outputStream.onNext(ResponseMsg.newBuilder()
.setConnectResponseMsg(responseMsg)
.build());
if (ConnectResponseCode.ACCEPTED != responseMsg.getResponseCode()) {
outputStream.onError(new RuntimeException(responseMsg.getErrorMsg()));
} else {
connected = true;
}
}
if (connected) {
if (requestMsg.getMsgType().equals(RequestMsgType.SYNC_REQUEST_RPC_MESSAGE)) {
if (requestMsg.hasSyncRequestMsg() && requestMsg.getSyncRequestMsg().getSyncRequired()) {
startSyncProcess(edge.getTenantId(), edge.getId());
} else {
syncCompleted = true;
}
}
if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE)) {
if (requestMsg.hasUplinkMsg()) {
onUplinkMsg(requestMsg.getUplinkMsg());
}
if (requestMsg.hasDownlinkResponseMsg()) {
onDownlinkResponse(requestMsg.getDownlinkResponseMsg());
}
}
}
}
@Override
public void onError(Throwable t) {
log.error("Failed to deliver message from client!", t);
closeSession();
}
@Override
public void onCompleted() {
closeSession();
}
private void closeSession() {
connected = false;
if (edge != null) {
try {
sessionCloseListener.accept(edge.getId());
} catch (Exception ignored) {
}
}
try {
outputStream.onCompleted();
} catch (Exception ignored) {
}
}
};
}
initInputStream方法除了有建立连接的processConnect,还有处理来自边缘端上行消onUplinkMsg方法,以及处理云端下发到边缘端的消息响应的onDownlinkResponse方法。