前言
RPC框架代码量较多,将仅对核心过程进行梳理,完整代码见:https://github.com/wdw87/wRpc
在这篇推文中,将实现Rpc的远程通信。远程通信是 RPC 的根本,本 RPC 框架还是采用 Netty 来作为通信框架。
在本项目的系统推文中,将对项目进行详细的介绍。
主要将按照下面的内容进行分配(蓝色字体可戳):
手写RPC框架(一) | RPC简介、技术栈介绍、测试Demo |
---|---|
手写RPC框架(二) | 远程通信实现 |
手写RPC框架(三) | 制定协议与编解码器、动态代理 |
手写RPC框架(四) | 注册中心 |
Rpc框架示意图
四、实现远程通信
远程通信是 RPC 的根本,本 RPC 框架还是采用 Netty 来作为通信框架。远程通信必然会有一个 Server 和一个 Client 的实现。下面就先介绍该 RPC 框架的实现:
完整代码见:https://github.com/wdw87/wRpc
1. 服务端
服务端负责接收客户端的请求,并做出响应。
一个Netty服务端主要包括两部分组成
- 配置服务端的启动类,比如下方的NettyServer
- 处理请求的逻辑类, 比如下方的 ServiceRequestHandler
NettyServer
代码语言:javascript复制public class NettyServer {
...
//服务端启动的核心方法
public void start(){
...
}
...
}
其中,start方法中的核心内容如下:
代码语言:javascript复制...
//Netty提供的服务端启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true) //启用tcp协议层面的keep-live
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//拆包Handler
socketChannel.pipeline().addLast(new Spliter());
//解码Handler
socketChannel.pipeline().addLast(new Decoder());
//编码Handler
socketChannel.pipeline().addLast(new Encoder());
...
//处理客户端请求的Handler
socketChannel.pipeline().addLast(serviceRequestHandler);
...
}
});
...
});
//等待服务端关闭
channelFuture.channel().closeFuture().sync();
可以看出在服务端启动时,会为服务端配置相应的Handler;
当有请求到达时,这些Handler会依次对请求做相应的处理,比如首先Spliter对通信数据包进行拆包粘包,保证数据包的完整性,然后Decoder对数据包进行解码,得到请求内容,最后serviceRequestHandler处理请求内容,并且做出响应。
ServiceRequestHandler
ServiceRequestHandler类继承了Netty框架提供的SimpleChannelInboundHandler类,并且重写了三个方法:channelActive()、channelInactive()和channelRead0()。
前两个方法顾名思义,在客户端建立连接和断开连接时执行回调,最后一个方法在收到客户端请求时执行回调,是处理请求的核心方法。
代码语言:javascript复制public class ServiceRequestHandler extends SimpleChannelInboundHandler<ServiceRequestPacket> {
public static final ServiceRequestHandler INSTANCE = new ServiceRequestHandler();
private ServiceInvoker serviceInvoker = ServiceInvoker.INSTANCE;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端建立了连接");
super.channelActive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ServiceRequestPacket serviceRequest) throws Exception {
ServiceResponsePacket responsePacket = new ServiceResponsePacket();
//设置响应id
responsePacket.setRequestId(serviceRequest.getId());
...
if (serviceConfig == null) {
log.info("No such service : " serviceRequest);
responsePacket.setCode(1);
responsePacket.setMessage("No such service");
} else {
//获取服务的实现类
Object object = context.getBean(serviceConfig.getRef());
//找到请求的方法
Method method = object.getClass().getMethod(serviceRequest.getMethodName(), serviceRequest.getParameterTypes());
//通过反射调用请求的方法,得到结果
Object result = serviceInvoker.invoke(object, method, serviceRequest);
log.info("service id : " serviceRequest.getId());
log.info("Service invoked : " serviceRequest);
//将请求代码(0:成功, 1:失败)和调用结果封装在响应包中
responsePacket.setCode(0);
responsePacket.setData(result);
}
//将响应包通过Netty发送给客户端
channelHandlerContext.channel().writeAndFlush(responsePacket);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端断开了连接");
super.channelInactive(ctx);
}
}
客户端发来的请求主要包括:
- 请求id
- 要请求的服务(即哪个类的哪个方法)
- 请求参数(要传入该方法的参数)
通过对代码分析可以梳理出处理的流程:
- 根据请求内容找到相应类的相应方法
- 通过反射调用该方法,并传入请求中的参数
- 得到结果,封装入响应包
- 将响应发送给客户端
2. 客户端
客户端主要的工作是连接服务器、发送消息、等待服务端的消息响应以及该响应消息、关闭与服务端的连接。
一个 Netty 的客户端同样有两个部分:
- 配置服务以及服务启动逻辑类,比如下方的 NettyClient 类。
- 实现从客户端接收到的消息的处理逻辑类:比如下方的 ClientHandler 类。
完整代码见:https://github.com/wdw87/wRpc
NettyClient
代码较长,省略了非关键部分
代码语言:javascript复制public class NettyClient {
...
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
//启动类
bootstrap = new Bootstrap();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//拆包Handler
socketChannel.pipeline().addLast(new Spliter());
//解码Handler
socketChannel.pipeline().addLast(new Decoder());
//编码Handler
socketChannel.pipeline().addLast(new Encoder());
...
//处理服务端响应的Handler
socketChannel.pipeline().addLast(serviceResponseHandler);
}
});
try {
this.connect(host, port);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//连接服务端
public void connect(String host, int port) throws InterruptedException {
...
this.channel = channelFuture.sync().channel();
}
//发送请求
public Object send(ServiceRequestPacket requestPacket) throws InterruptedException {
if(channel != null && channel.isActive()){
//发送请求
SynchronousQueue<Object> queue = serviceResponseHandler.sendRequest(requestPacket, channel);
//阻塞等待响应包
ServiceResponsePacket result = (ServiceResponsePacket)queue.take();
//得到响应包中的请求结果
Class<?> returnType = requestPacket.getReturnType();
Object newdata = parseReturnType(returnType, result.getData());
result.setData(newdata);
return result;
}else {
ServiceResponsePacket responsePacket = new ServiceResponsePacket();
responsePacket.setCode(1);
responsePacket.setMessage("未正确连接到服务器.请检查相关配置信息!");
return responsePacket;
}
}
...
}
NettyClient类虽然代码较长,但是结构十分简单,客户端在构造函数中初始化,与服务端一样,也有拆包和编解码过程,核心Handler是处理服务端响应的serviceResponseHandler。
值得注意的是,在send() 方法中,首先调用
serviceResponseHandler.sendRequest()方法,该方法会发出请求,同时将一个SynchronousQueue以请求id为key,放入一个ConcurrentHashMap中;
在客户端收到响应后,同样以请求id为key,得到这个SynchronousQueue,并放入响应包,这样在响应传回时就可以获得响应的响应包了。
ServiceResponseHandler
代码语言:javascript复制public class ServiceResponseHandler extends SimpleChannelInboundHandler<ServiceResponsePacket> {
private Map<String, SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();
...
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ServiceResponsePacket serviceResponsePacket) throws Exception {
//得到请求id
String id = serviceResponsePacket.getRequestId();
//根据id得到相应的SynchronousQueue
SynchronousQueue<Object> queue = queueMap.get(id);
if(queue != null){
//将响应包放入SynchronousQueue,之后,前文所述的send()方法将解除阻塞,并得响应
queue.put(serviceResponsePacket);
queueMap.remove(id);
}else{
log.error("request id error !!!");
}
}
public SynchronousQueue<Object> sendRequest(ServiceRequestPacket requestPacket, Channel channel){
SynchronousQueue<Object> queue = new SynchronousQueue<>();
//以请求id为key,放入一个SynchronousQueue,此时SynchronousQueue为空队列
queueMap.put(requestPacket.getId(), queue);
//发出请求
channel.writeAndFlush(requestPacket);
return queue;
}
}
与server中的Handler相似,在重写的channelRead0()方法中处理响应。
完整代码见:https://github.com/wdw87/wRpc
作者:好吃懒做贪玩东
编辑:西瓜媛