thingsboard之边缘网关建立连接过程

2022-06-30 15:44:40 浏览数 (1)

        thingsboard3.3.4版本之后就有了对应的边缘网关的管理功能,对应的边缘网关项目为thingsboard-edge,相比于之前的普通网关或者设备上传遥测数据,边缘网关增加了很多优势:1、边缘端与云端断开连接时,在边缘端缓冲数据,等连接上之后再将缓冲的数据上传到云端,如下图所示:

2、距离设备更近,网络稳定性更高,能够更快的完成设备联动操作

本文主要讲下边缘网关连接云端的过程:

1、在thingsboard项目创建对应的边缘实例

2、 使用上图中的Edge Key与secret启动thingsboard-edge项目

3、边缘端与云端通信是通过grpc实现的,下面就是建立连接流程

EdgeGrpcService

代码语言:javascript复制
    @Override
    public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> outputStream) {
        return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper,
                sendDownlinkExecutorService).getInputStream();
    }

 接下来看下EdgeGrpcSession的构造过程:

代码语言:javascript复制
    EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream,
            BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener,
            Consumer<EdgeId> sessionCloseListener, ObjectMapper mapper,
            ScheduledExecutorService sendDownlinkExecutorService) {
        this.sessionId = UUID.randomUUID();
        this.ctx = ctx;
        this.outputStream = outputStream;
        this.sessionOpenListener = sessionOpenListener;
        this.sessionCloseListener = sessionCloseListener;
        this.mapper = mapper;
        this.sendDownlinkExecutorService = sendDownlinkExecutorService;
        initInputStream();
    }

下面是建立连接的核心方法:initInputStream

代码语言:javascript复制
    private void initInputStream() {
        this.inputStream = new StreamObserver<>() {
            @Override
            public void onNext(RequestMsg requestMsg) {
                if (!connected && requestMsg.getMsgType().equals(RequestMsgType.CONNECT_RPC_MESSAGE)) {
                    ConnectResponseMsg responseMsg = processConnect(requestMsg.getConnectRequestMsg());
                    outputStream.onNext(ResponseMsg.newBuilder()
                            .setConnectResponseMsg(responseMsg)
                            .build());
                    if (ConnectResponseCode.ACCEPTED != responseMsg.getResponseCode()) {
                        outputStream.onError(new RuntimeException(responseMsg.getErrorMsg()));
                    } else {
                        connected = true;
                    }
                }
                if (connected) {
                    if (requestMsg.getMsgType().equals(RequestMsgType.SYNC_REQUEST_RPC_MESSAGE)) {
                        if (requestMsg.hasSyncRequestMsg() && requestMsg.getSyncRequestMsg().getSyncRequired()) {
                            startSyncProcess(edge.getTenantId(), edge.getId());
                        } else {
                            syncCompleted = true;
                        }
                    }
                    if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE)) {
                        if (requestMsg.hasUplinkMsg()) {
                            onUplinkMsg(requestMsg.getUplinkMsg());
                        }
                        if (requestMsg.hasDownlinkResponseMsg()) {
                            onDownlinkResponse(requestMsg.getDownlinkResponseMsg());
                        }
                    }
                }
            }
            
            @Override
            public void onError(Throwable t) {
                log.error("Failed to deliver message from client!", t);
                closeSession();
            }
            
            @Override
            public void onCompleted() {
                closeSession();
            }
            
            private void closeSession() {
                connected = false;
                if (edge != null) {
                    try {
                        sessionCloseListener.accept(edge.getId());
                    } catch (Exception ignored) {
                    }
                }
                try {
                    outputStream.onCompleted();
                } catch (Exception ignored) {
                }
            }
        };
    }

initInputStream方法除了有建立连接的processConnect,还有处理来自边缘端上行消onUplinkMsg方法,以及处理云端下发到边缘端的消息响应的onDownlinkResponse方法。

rpc

0 人点赞