1.问题
一个好的通信框架是怎样的?同时如何使用netty?
一个好的通信框架,必然是支持双工通信的,同时它能够对半包黏包进行处理,方便高效的编解码、序列化,拥有心跳超时机制,同时支持大量连接并发,方便调用。而这个通信的过程,我始终是觉得它的起源是三次握手和四次挥手。它们影响着消息中间件和通信框架以及SOA框架的发展。
2.netty中的echo例子
netty最简单的是它的EchoServer和EchoClient,两者有同时有自己对应的处理器EchoServerHandler、EchoClientHandler。
在EchoServer中,通常需要创建两个线程boss线程组和worker线程组。而在于创建线程组NioEventLoop的过程中,会执行openSelector()方法。此时会开启多路复用器。因此这个时候联想到如果使用java的nio的时候,必然需要首先创建需要连接的对象,然后根据连接对象,打开多路复用选择器。然后执行读写方法。而Netty的思想就是这样的。首先创建好连接,然后根据连接执行,然后执行读事件。而这个过程的执行都会经过processSelectedKeys处理选中的keys,也即事件。当如果写满了,没法写的时候,此时就会注册写事件,当可以写的时候,然后会执行写操作。
读写是在I/O 事件由 ChannelInboundHandler 或ChannelOutboundHandler}处理并通过调用中定义的事件传播方法转发到ChannelHandlerContext,例如 ChannelHandlerContext#fireChannelRead(Object)和ChannelHandlerContext#write(Object),进行数据的处理和写操作。
入站InBound数据通常是通过事件输入操作从远程读取的,如:SocketChannel#read(Buffer)。
出站OutBound处理程序通常会生产或者转换出站数据,如写入数据,I/O线程经常执行实际的输出操作,如SocketChannel#write(ByteBuffer)
Netty中的相关事件ops:
代码语言:javascript复制OP_READ = 1 << 0; 读事件 1
OP_WRITE = 1 << 2; 写事件 4
OP_CONNECT = 1 << 3; 连接事件 8
OP_ACCEPT = 1 << 4; accpet事件 16
3.业务中使用Netty
这里不对netty整合spring,同时设置编解码、相应的请求code和响应code,这里就不进行展开了。
在血透业务中,对接测温仪,首先需要采集人脸图片信息,方便人脸识别来测定人体温的时候,存储起来。
业务处理整合Netty
为了将体温仪嵌入到我们的业务中,使用了Netty来保证高性能的信息通信和传输。自然我们需要考虑的首先是netty如何加入到血透系统中。
考虑到设备方需要进行人脸识别的对比,因此首先需要采集人脸数据,然后对人脸数据进行保存。然后后期以此进行对比。比对识别成功,则可以进行体温的测量,否则需要先进行人脸采集,然后进行体温的测量。
首先启动netty的过程中需要考虑加入到服务中,然后在启动nettyServer,而在启动的过程中,需要考虑传输过程中的编解码问题,这里需要根据传入信息的大小,进行内存的分配,为了合理的使用,分配一个新的接收缓冲区,其容量可能足够大以读取所有入站数据,而又足够小不要浪费其空间。
代码语言:javascript复制//设置bossGroup组配置
serverBootstrap = serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
// 设置workerGroup组配置
serverBootstrap = serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
// 设置处理事件链 进行编解码的设置以及适配器,考虑后期的扩展
serverBootstrap = serverBootstrap.childHandler(new NettyChannelInitializer<SocketChannel>());
而适配器中,是我们进行业务处理的关键,执行入站操作。为什么是入站?
在netty中,入站是消息进入接收缓冲区,而出站是消息从发送缓冲区中刷出。也即入站操作主要是指读取数据的操作;而出站操作主要是指写入数据的操作。由于我们需要从设备方中读取数据,因此是入站操作。
那么这个过程首先需要请求设备方,然后根据设备方的响应,从而执行具体业务逻辑的处理。因为业务是基于人脸来来识别的,因此第一个信息是基于人脸的,服务端如果识别人脸,则基于深度学习根据人脸识别特征匹配的人脸,则可以根据人脸信息的数据信息,来记录人的数据信息推送到客户端,而客户端可以根据服务端的信息,进行记录。在这个过程中,同时支持进行离线识别上传的情况。
4.具体业务处理代码
这里省略编解码、序列化,只截取逻辑上的一部分:
代码语言:javascript复制 @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
Message message = (Message) msg;
MessageData messageData = JSONObject.parseObject(message.getData(), MessageData.class);
//创建响应,设置响应信息
Response response = new Response();
response.setResult(CmdConstant.resultStatus);
response.setDesc(CmdConstant.descStr);
response.setMessageId(messageData.getMessageId());
log.debug("客户端消息:" JSONObject.toJSONString(message));
autowired();
switch (message.getCmd()) {
case CmdConstant.loginReq:
// 设备登录(login) 客户端请求服务端
log.info("设备登录");
ChannelMap.channels.add(ctx.channel());
response.setMessage(CmdConstant.loginRspStr);
//将信息推送到设备中
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.loginRsp, response);
break;
case CmdConstant.registerReq:
// 设备注册(register)
log.info("设备注册");
RegisterResp registerResp = new RegisterResp();
registerResp.setMachId("123456778888");
registerResp.setPasswd("123456");
response.setData(registerResp);
response.setMessage(CmdConstant.registerRspStr);
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.registerRsp, response);
break;
case CmdConstant.heartBeatReq:
// 设备心跳
log.debug("设备心跳");
HeartBeatResp heartBeatResp = new HeartBeatResp();
response.setData(heartBeatResp);
response.setMessage(CmdConstant.heartbeatRspStr);
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.heartBeatRsp, response);
break;
case CmdConstant.getUserReq:
log.info("获取人员信息");
UserReq userReq = getUserReq(messageData);
response.setData(userReq);
response.setMessage(CmdConstant.getUserRspStr);
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.getUserRsp, response);
break;
// 服务端传过来的,进行解析,然后入库
case CmdConstant.snapshotFaceReq:
log.info("上传识别记录");
SnapshotFace snapshotFace = JSONObject.parseObject(messageData.getData(), SnapshotFace.class);
if (snapshotFace != null) {
Long fkPatientId = PushCmdUtils.intToPatientId(Integer.valueOf(snapshotFace.getUserId()), sysTenantService.getDefaultId());
HdPatient hdPatient = hdPatientManager.selectByPrimaryKey(fkPatientId);
BigDecimal temperature = BigDecimal.valueOf(Double.parseDouble(snapshotFace.getTemperature()));
if (Objects.nonNull(hdPatient)) {
//更新患者透析体温数据,此时保存体温信息
hdDrBaseService.saveTemperature(fkPatientId, temperature);
}
}
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.snapshotFaceRsp, response);
break;
... //省略部分逻辑
//离线上传和识别,通过系统
case CmdConstant.offlineFaceReq:
log.info("离线识别记录上传(带图片)");
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.snapshotFaceRsp, response);
break;
case CmdConstant.offlineNoFaceReq:
log.info("离线识别记录上传(无图片)");
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.snapshotFaceRsp, response);
break;
default:
log.info("客户端消息:" JSONObject.toJSONString(message));
response.setData(new HashMap<>());
PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.heartBeatRsp, response);
log.info("无法识别命令");
break;
}
}
而服务端会根据客户端请求的reqCode来进行对应的处理,使用处理器进行处理。而这种处理方式在使用Netty的RocketMQ和sofa-bolt中是可以看到的。那么你是不是很好奇netty的应用,同时方便netty的使用。
下一篇来看sofa-bolt是如何封装netty来给我们带来方便使用的。