通常客户端只会主动发送心跳消息,目的是为了保持与服务端连接,而其他消息往往需要服务端发送消息至客户端调取。
实现步骤
- 客户端在第一次与服务端建立连接时,将此连接的通道在
Map
中保存下来,为了保证线程安全,可以使用线程安全的ConcurrentHashMap
。 - 在发送消息给客户端时,通过设备标识遍历
ConcurrentHashMap
找到目标客户端连接通道。找到后先判断通道是否存活,如果连接是存活状态,就通过此通道发送消息给客户端,如果不是存活状态,就从Map
中删除此通道信息。 - 将消息发送至客户端后,服务端正常接收客户端传回的信息。
实现代码
前两篇文章中已经提供了 netty
的整体框架代码,这里只提供一些核心的关键代码,其余代码不再赘述。
指路:
- Netty系列(一):Springboot整合Netty,自定义协议实现
- Netty系列(二):Netty拆包/沾包问题的解决方案
新建一个 ChannelMap
类,在客户端第一次连接时保存 channel
连接。后续服务端向客户端发送消息时,先从 Map
中找到对应的客户端消息通道连接,再向通道中写入消息进行发送。
/**
* @Author 鳄鱼儿
* @Description 连接通道保存MAP
* @date 2022/11/27 16:30
* @Version 1.0
*/
public class ChannelMap {
/**
* 存放客户端标识ID(消息ID)与channel的对应关系
*/
private static volatile ConcurrentHashMap<String, Channel> channelMap = null;
private ChannelMap() {
}
public static ConcurrentHashMap<String, Channel> getChannelMap() {
if (null == channelMap) {
synchronized (ChannelMap.class) {
if (null == channelMap) {
channelMap = new ConcurrentHashMap<>();
}
}
}
return channelMap;
}
public static Channel getChannel(String id) {
return getChannelMap().get(id);
}
}
在客户端建立连接(服务端收到心跳消息)时,将channel加入map中。
代码语言:javascript复制public class ServerListenerHandler extends SimpleChannelInboundHandler<Message> {
private static final Logger log = LoggerFactory.getLogger(ServerListenerHandler.class);
/**
* 设备接入连接时处理
*
* @param ctx
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
log.info("有新的连接:[{}]", ctx.channel().id().asLongText());
}
/**
* 数据处理
*
* @param ctx
* @param msg
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
// 获取消息实例中的消息体
String content = msg.getContent();
// 对不同消息类型进行处理
MessageEnum type = MessageEnum.getStructureEnum(msg);
switch (type) {
case CONNECT:
// 将通道加入ChannelMap
ChannelMap.getChannelMap().put(msg.getId(), ctx.channel());
// 将客户端ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
AttributeKey<String> key = AttributeKey.valueOf("id");
ctx.channel().attr(key).setIfAbsent(msg.getId());
// TODO 心跳消息处理
case STATE:
// TODO 设备状态
default:
System.out.println(type.content "消息内容" content);
}
}
/**
* 设备下线处理
*
* @param ctx
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
log.info("设备下线了:{}", ctx.channel().id().asLongText());
// map中移除channel
removeId(ctx);
}
/**
* 设备连接异常处理
*
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 打印异常
log.info("异常:{}", cause.getMessage());
// map中移除channel
removeId(ctx);
// 关闭连接
ctx.close();
}
private void removeId(ChannelHandlerContext ctx) {
AttributeKey<String> key = AttributeKey.valueOf("id");
// 获取channel中id
String id = ctx.channel().attr(key).get();
// map移除channel
ChannelMap.getChannelMap().remove(id);
}
}
写一个服务端发送消息的业务层类,并通过客户端id在map中获取到channel通道,将消息转化成json字符串后,通过writeAndFlush
发送至客户端。
/**
* @Author 鳄鱼儿
* @Description 向客户端发送消息
* @date 2022/11/27 17:29
* @Version 1.0
*/
@Service
public class PushMsgServiceImpl implements PushMsgService {
/**
* 向一个客户端发送消息
*
* @param msg
*/
@Override
public void push(Message msg) {
// 客户端ID
String id = msg.getId();
Channel channel = ChannelMap.getChannel(id);
if (null == channel) {
throw new RuntimeException("客户端已离线");
}
channel.writeAndFlush(msg);
}
}
注意:writeAndFlush
参数是自定义编码的泛型对象实例。如本文自定义的Message
消息解析类。
public class MessageEncodeHandler extends MessageToByteEncoder<Message> {
private static String delimiter;
public MessageEncodeHandler(String delimiter) {
this.delimiter = delimiter;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
out.writeBytes(
(message.toJsonString() delimiter)
.getBytes(CharsetUtil.UTF_8)
);
}
}
之后再编写一个Controller
类(这里省略),在Controller
类中调用PushMsgService
中pushff,就可以完成对客户端的消息发送。