Netty的使用

2021-06-24 14:15:51 浏览数 (1)

1.问题

一个好的通信框架是怎样的?同时如何使用netty?

一个好的通信框架,必然是支持双工通信的,同时它能够对半包黏包进行处理,方便高效的编解码、序列化,拥有心跳超时机制,同时支持大量连接并发,方便调用。而这个通信的过程,我始终是觉得它的起源是三次握手和四次挥手。它们影响着消息中间件和通信框架以及SOA框架的发展。

2.netty中的echo例子

netty最简单的是它的EchoServer和EchoClient,两者有同时有自己对应的处理器EchoServerHandler、EchoClientHandler。

在EchoServer中,通常需要创建两个线程boss线程组和worker线程组。而在于创建线程组NioEventLoop的过程中,会执行openSelector()方法。此时会开启多路复用器。因此这个时候联想到如果使用java的nio的时候,必然需要首先创建需要连接的对象,然后根据连接对象,打开多路复用选择器。然后执行读写方法。而Netty的思想就是这样的。首先创建好连接,然后根据连接执行,然后执行读事件。而这个过程的执行都会经过processSelectedKeys处理选中的keys,也即事件。当如果写满了,没法写的时候,此时就会注册写事件,当可以写的时候,然后会执行写操作。

读写是在I/O 事件由 ChannelInboundHandler 或ChannelOutboundHandler}处理并通过调用中定义的事件传播方法转发到ChannelHandlerContext,例如 ChannelHandlerContext#fireChannelRead(Object)和ChannelHandlerContext#write(Object),进行数据的处理和写操作。

入站InBound数据通常是通过事件输入操作从远程读取的,如:SocketChannel#read(Buffer)。

出站OutBound处理程序通常会生产或者转换出站数据,如写入数据,I/O线程经常执行实际的输出操作,如SocketChannel#write(ByteBuffer)

Netty中的相关事件ops:

代码语言:javascript复制
OP_READ = 1 << 0; 读事件 1
OP_WRITE = 1 << 2; 写事件 4
OP_CONNECT = 1 << 3; 连接事件 8
OP_ACCEPT = 1 << 4; accpet事件 16   

3.业务中使用Netty

这里不对netty整合spring,同时设置编解码、相应的请求code和响应code,这里就不进行展开了。

在血透业务中,对接测温仪,首先需要采集人脸图片信息,方便人脸识别来测定人体温的时候,存储起来。

业务处理整合Netty

为了将体温仪嵌入到我们的业务中,使用了Netty来保证高性能的信息通信和传输。自然我们需要考虑的首先是netty如何加入到血透系统中。

考虑到设备方需要进行人脸识别的对比,因此首先需要采集人脸数据,然后对人脸数据进行保存。然后后期以此进行对比。比对识别成功,则可以进行体温的测量,否则需要先进行人脸采集,然后进行体温的测量。

首先启动netty的过程中需要考虑加入到服务中,然后在启动nettyServer,而在启动的过程中,需要考虑传输过程中的编解码问题,这里需要根据传入信息的大小,进行内存的分配,为了合理的使用,分配一个新的接收缓冲区,其容量可能足够大以读取所有入站数据,而又足够小不要浪费其空间。

代码语言:javascript复制
//设置bossGroup组配置
serverBootstrap = serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
// 设置workerGroup组配置
serverBootstrap = serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
// 设置处理事件链 进行编解码的设置以及适配器,考虑后期的扩展
 serverBootstrap = serverBootstrap.childHandler(new NettyChannelInitializer<SocketChannel>());

而适配器中,是我们进行业务处理的关键,执行入站操作。为什么是入站?

在netty中,入站是消息进入接收缓冲区,而出站是消息从发送缓冲区中刷出。也即入站操作主要是指读取数据的操作;而出站操作主要是指写入数据的操作。由于我们需要从设备方中读取数据,因此是入站操作。

那么这个过程首先需要请求设备方,然后根据设备方的响应,从而执行具体业务逻辑的处理。因为业务是基于人脸来来识别的,因此第一个信息是基于人脸的,服务端如果识别人脸,则基于深度学习根据人脸识别特征匹配的人脸,则可以根据人脸信息的数据信息,来记录人的数据信息推送到客户端,而客户端可以根据服务端的信息,进行记录。在这个过程中,同时支持进行离线识别上传的情况。

4.具体业务处理代码

这里省略编解码、序列化,只截取逻辑上的一部分:

代码语言:javascript复制
 @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
        Message message = (Message) msg;
        MessageData messageData = JSONObject.parseObject(message.getData(), MessageData.class);
        //创建响应,设置响应信息
        Response response = new Response();
        response.setResult(CmdConstant.resultStatus);
        response.setDesc(CmdConstant.descStr);
        response.setMessageId(messageData.getMessageId());
        log.debug("客户端消息:"   JSONObject.toJSONString(message));
        autowired();
        switch (message.getCmd()) {
            case CmdConstant.loginReq:
                // 设备登录(login) 客户端请求服务端
                log.info("设备登录");
                ChannelMap.channels.add(ctx.channel());
                response.setMessage(CmdConstant.loginRspStr);
                //将信息推送到设备中
                PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.loginRsp, response);
                break;
            case CmdConstant.registerReq:
                // 设备注册(register)
                log.info("设备注册");
                RegisterResp registerResp = new RegisterResp();
                registerResp.setMachId("123456778888");
                registerResp.setPasswd("123456");
                response.setData(registerResp);
                response.setMessage(CmdConstant.registerRspStr);
                PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.registerRsp, response);
                break;
            case CmdConstant.heartBeatReq:
                // 设备心跳
                log.debug("设备心跳");
                HeartBeatResp heartBeatResp = new HeartBeatResp();
                response.setData(heartBeatResp);
                response.setMessage(CmdConstant.heartbeatRspStr);
                PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.heartBeatRsp, response);
                break;
            case CmdConstant.getUserReq:
                log.info("获取人员信息");
                UserReq userReq = getUserReq(messageData);
                response.setData(userReq);
                response.setMessage(CmdConstant.getUserRspStr);
                PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.getUserRsp, response);
                break;
            // 服务端传过来的,进行解析,然后入库
            case CmdConstant.snapshotFaceReq:
                 log.info("上传识别记录");
                SnapshotFace snapshotFace = JSONObject.parseObject(messageData.getData(), SnapshotFace.class);
                if (snapshotFace != null) {   
                    Long fkPatientId = PushCmdUtils.intToPatientId(Integer.valueOf(snapshotFace.getUserId()), sysTenantService.getDefaultId());
                    HdPatient hdPatient = hdPatientManager.selectByPrimaryKey(fkPatientId);
                    BigDecimal temperature = BigDecimal.valueOf(Double.parseDouble(snapshotFace.getTemperature()));
                    if (Objects.nonNull(hdPatient)) {
                        //更新患者透析体温数据,此时保存体温信息
                        hdDrBaseService.saveTemperature(fkPatientId, temperature);
                    } 
                }
                PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.snapshotFaceRsp, response);
                break;
             ... //省略部分逻辑
            //离线上传和识别,通过系统    
            case CmdConstant.offlineFaceReq:
                log.info("离线识别记录上传(带图片)");
                PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.snapshotFaceRsp, response);
                break;
            case CmdConstant.offlineNoFaceReq:
                log.info("离线识别记录上传(无图片)");
                PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.snapshotFaceRsp, response);
                break;
            default:
                log.info("客户端消息:"   JSONObject.toJSONString(message));
                response.setData(new HashMap<>());
                PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.heartBeatRsp, response);
                log.info("无法识别命令");
                break;
        }
    }

而服务端会根据客户端请求的reqCode来进行对应的处理,使用处理器进行处理。而这种处理方式在使用Netty的RocketMQ和sofa-bolt中是可以看到的。那么你是不是很好奇netty的应用,同时方便netty的使用。

下一篇来看sofa-bolt是如何封装netty来给我们带来方便使用的。

0 人点赞