项目推荐 I 手写RPC框架(二)

2022-04-11 18:40:33 浏览数 (1)

前言

RPC框架代码量较多,将仅对核心过程进行梳理,完整代码见:https://github.com/wdw87/wRpc

在这篇推文中,将实现Rpc的远程通信。远程通信是 RPC 的根本,本 RPC 框架还是采用 Netty 来作为通信框架。

在本项目的系统推文中,将对项目进行详细的介绍

主要将按照下面的内容进行分配(蓝色字体可戳):

手写RPC框架(一)

RPC简介、技术栈介绍、测试Demo

手写RPC框架(二)

远程通信实现

手写RPC框架(三)

制定协议与编解码器、动态代理

手写RPC框架(四)

注册中心

Rpc框架示意图

四、实现远程通信

远程通信是 RPC 的根本,本 RPC 框架还是采用 Netty 来作为通信框架。远程通信必然会有一个 Server 和一个 Client 的实现。下面就先介绍该 RPC 框架的实现:

完整代码见:https://github.com/wdw87/wRpc

1. 服务端

服务端负责接收客户端的请求,并做出响应。

一个Netty服务端主要包括两部分组成

  • 配置服务端的启动类,比如下方的NettyServer
  • 处理请求的逻辑类, 比如下方的 ServiceRequestHandler
NettyServer
代码语言:javascript复制
public class NettyServer {

...
   //服务端启动的核心方法
   public void start(){
...
  }

...

}

其中,start方法中的核心内容如下:

代码语言:javascript复制
...

//Netty提供的服务端启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
  .channel(NioServerSocketChannel.class)
  .option(ChannelOption.SO_KEEPALIVE, true)   //启用tcp协议层面的keep-live
  .childHandler(new ChannelInitializer<SocketChannel>() {
       @Override
       protected void initChannel(SocketChannel socketChannel) throws Exception {
           //拆包Handler
           socketChannel.pipeline().addLast(new Spliter());
           //解码Handler
           socketChannel.pipeline().addLast(new Decoder());
           //编码Handler
           socketChannel.pipeline().addLast(new Encoder());
          ...

           //处理客户端请求的Handler
           socketChannel.pipeline().addLast(serviceRequestHandler);

          ...

      }
  });

...

});
//等待服务端关闭
channelFuture.channel().closeFuture().sync();

可以看出在服务端启动时,会为服务端配置相应的Handler;

当有请求到达时,这些Handler会依次对请求做相应的处理,比如首先Spliter对通信数据包进行拆包粘包,保证数据包的完整性,然后Decoder对数据包进行解码,得到请求内容,最后serviceRequestHandler处理请求内容,并且做出响应。

ServiceRequestHandler

ServiceRequestHandler类继承了Netty框架提供的SimpleChannelInboundHandler类,并且重写了三个方法:channelActive()、channelInactive()和channelRead0()。

前两个方法顾名思义,在客户端建立连接和断开连接时执行回调,最后一个方法在收到客户端请求时执行回调,是处理请求的核心方法。

代码语言:javascript复制
public class ServiceRequestHandler extends SimpleChannelInboundHandler<ServiceRequestPacket> {

   public static final ServiceRequestHandler INSTANCE = new ServiceRequestHandler();

   private ServiceInvoker serviceInvoker = ServiceInvoker.INSTANCE;

   @Override
   public void channelActive(ChannelHandlerContext ctx) throws Exception {
       log.info("客户端建立了连接");
       super.channelActive(ctx);
  }

   @Override
   protected void channelRead0(ChannelHandlerContext channelHandlerContext, ServiceRequestPacket serviceRequest) throws Exception {
       ServiceResponsePacket responsePacket = new ServiceResponsePacket();

       //设置响应id
       responsePacket.setRequestId(serviceRequest.getId());

      ...

       if (serviceConfig == null) {
           log.info("No such service : "   serviceRequest);
           responsePacket.setCode(1);
           responsePacket.setMessage("No such service");
      } else {
           //获取服务的实现类
           Object object = context.getBean(serviceConfig.getRef());
           //找到请求的方法
           Method method = object.getClass().getMethod(serviceRequest.getMethodName(), serviceRequest.getParameterTypes());
           //通过反射调用请求的方法,得到结果
           Object result = serviceInvoker.invoke(object, method, serviceRequest);
           log.info("service id : "   serviceRequest.getId());
           log.info("Service invoked : "   serviceRequest);
           //将请求代码(0:成功, 1:失败)和调用结果封装在响应包中
           responsePacket.setCode(0);
           responsePacket.setData(result);
      }
//将响应包通过Netty发送给客户端
       channelHandlerContext.channel().writeAndFlush(responsePacket);

  }

   @Override
   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
       log.info("客户端断开了连接");
       super.channelInactive(ctx);
  }
}

客户端发来的请求主要包括:

  • 请求id
  • 要请求的服务(即哪个类的哪个方法)
  • 请求参数(要传入该方法的参数)

通过对代码分析可以梳理出处理的流程:

  • 根据请求内容找到相应类的相应方法
  • 通过反射调用该方法,并传入请求中的参数
  • 得到结果,封装入响应包
  • 将响应发送给客户端

2. 客户端

客户端主要的工作是连接服务器、发送消息、等待服务端的消息响应以及该响应消息、关闭与服务端的连接。

一个 Netty 的客户端同样有两个部分:

  • 配置服务以及服务启动逻辑类,比如下方的 NettyClient 类。
  • 实现从客户端接收到的消息的处理逻辑类:比如下方的 ClientHandler 类。

完整代码见:https://github.com/wdw87/wRpc

NettyClient

代码较长,省略了非关键部分

代码语言:javascript复制
public class NettyClient {

  ...

   public NettyClient(String host, int port) {

       this.host = host;
       this.port = port;
//启动类
       bootstrap = new Bootstrap();
       NioEventLoopGroup workerGroup = new NioEventLoopGroup();

       bootstrap.group(workerGroup)
              .channel(NioSocketChannel.class)
              .option(ChannelOption.SO_KEEPALIVE, true)
              .handler(new ChannelInitializer<SocketChannel>() {
                   @Override
                   protected void initChannel(SocketChannel socketChannel) throws Exception {

                       //拆包Handler
                       socketChannel.pipeline().addLast(new Spliter());
                       //解码Handler
                       socketChannel.pipeline().addLast(new Decoder());
                       //编码Handler
                       socketChannel.pipeline().addLast(new Encoder());

                      ...

                       //处理服务端响应的Handler
                       socketChannel.pipeline().addLast(serviceResponseHandler);

                  }
              });
       try {
           this.connect(host, port);
      } catch (InterruptedException e) {
           e.printStackTrace();
      }
  }
//连接服务端
   public void connect(String host, int port) throws InterruptedException {

      ...

       this.channel = channelFuture.sync().channel();
  }
//发送请求
   public Object send(ServiceRequestPacket requestPacket) throws InterruptedException {
       if(channel != null && channel.isActive()){
           //发送请求
           SynchronousQueue<Object> queue = serviceResponseHandler.sendRequest(requestPacket, channel);
           //阻塞等待响应包
           ServiceResponsePacket result = (ServiceResponsePacket)queue.take();
           //得到响应包中的请求结果
           Class<?> returnType = requestPacket.getReturnType();
           Object newdata = parseReturnType(returnType, result.getData());
           result.setData(newdata);
           return result;
      }else {
           ServiceResponsePacket responsePacket = new ServiceResponsePacket();
           responsePacket.setCode(1);
           responsePacket.setMessage("未正确连接到服务器.请检查相关配置信息!");
           return responsePacket;
      }

  }

  ...

}

NettyClient类虽然代码较长,但是结构十分简单,客户端在构造函数中初始化,与服务端一样,也有拆包和编解码过程,核心Handler是处理服务端响应的serviceResponseHandler。

值得注意的是,在send() 方法中,首先调用

serviceResponseHandler.sendRequest()方法,该方法会发出请求,同时将一个SynchronousQueue以请求id为key,放入一个ConcurrentHashMap中;

在客户端收到响应后,同样以请求id为key,得到这个SynchronousQueue,并放入响应包,这样在响应传回时就可以获得响应的响应包了。

ServiceResponseHandler
代码语言:javascript复制
public class ServiceResponseHandler extends SimpleChannelInboundHandler<ServiceResponsePacket> {

   private Map<String, SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();

  ...

   @Override
   protected void channelRead0(ChannelHandlerContext channelHandlerContext, ServiceResponsePacket serviceResponsePacket) throws Exception {
       //得到请求id
       String id = serviceResponsePacket.getRequestId();
       //根据id得到相应的SynchronousQueue
       SynchronousQueue<Object> queue = queueMap.get(id);
       if(queue != null){
           //将响应包放入SynchronousQueue,之后,前文所述的send()方法将解除阻塞,并得响应
           queue.put(serviceResponsePacket);
           queueMap.remove(id);
      }else{
           log.error("request id error !!!");
      }
  }

   public SynchronousQueue<Object> sendRequest(ServiceRequestPacket requestPacket, Channel channel){
       SynchronousQueue<Object> queue = new SynchronousQueue<>();
       //以请求id为key,放入一个SynchronousQueue,此时SynchronousQueue为空队列
       queueMap.put(requestPacket.getId(), queue);
       //发出请求
       channel.writeAndFlush(requestPacket);
       return queue;
  }
}

与server中的Handler相似,在重写的channelRead0()方法中处理响应。

完整代码见:https://github.com/wdw87/wRpc

作者:好吃懒做贪玩东

编辑:西瓜媛

0 人点赞