lagou 爪哇 3-1 分布式理论、架构设计(自定义RPC)笔记

2022-05-17 16:37:24 浏览数 (1)

分布式系统概念 分布式系统是一个硬件或软件组件分布在不同的网络计算机上,彼此之间仅仅通过消息传递进行通信和协调的系统。俗的理解,所谓分布式系统,就是一个业务拆分成多个子业务,分布在不同的服务器节点,共同构成的系统称为分布式系统,同一个分布式系统中的服务器节点在空间部署上是可以随意分布的,这些服务器可能放在不同的机柜中,也可能在不同的机房中,甚至分布在不同的城市。

在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技 术,例如:RMI、Hessian、SOAP、ESB和JMS等,它们背后到底是基于什么原理实现的呢

1.基本原理 要实现网络机器间的通讯,首先得来看看计算机系统网络通信的基本原理,在底层层面去看,网络通信需要做的就是将流从一台计算机传输到另外一台计算机,基于传输协议和网络IO来实现,其中传输协议比较出名的有tcp、udp等等,tcp、udp都是在基于Socket概念上为某类应用场景而扩展出的传输协议,网络IO,主要有bio、nio、 aio三种方式,所有的分布式应用通讯都基于这个原理而实现,只是为了应用的易用,各种语言通常都会提供一些更为贴近应用易用的应用层协议。

2.什么是RPC RPC全称为remote procedure call,即远程过程调用。 借助RPC可以做到像本地调用一样调用远程服务,是一种进程间的通信方式 比如两台服务器A和B,A服务器上部署一个应用,B服务器上部署一个应用,A服务器上的应用想调用B服务器上的应用提供的方法,由于两个应用不在一个内存空间,不能直接调用,所以需要通过网络来表达调用的语义和传达调用的数据。 需要注意的是RPC并不是一个具体的技术,而是指整个网络远程调用过程。 RPC架构一个完整的RPC架构里面包含了四个核心的组件,分别是Client,Client Stub,Server以及Server Stub,这个Stub可以理解为存根。 客户端(Client),服务的调用方。客户端存根(Client Stub),存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方。 服务端(Server),真正的服务提供者。服务端存根(Server Stub),接收客户端发送过来的消息,将消息解包,并调用本地的方法。

netty 相关

Netty 是由 JBOSS 提供一个异步的、 基于事件驱动的网络编程框架。 Netty 可以帮助你快速、 简单的开发出一 个网络应用, 相当于简化和流程化了 NIO 的开发过程。 作为当前最流行的 NIO 框架, Netty 在互联网领域、 大数据分布式计算领域、 游戏行业、 通信行业等获得了广泛的应用, 知名的 Elasticsearch 、 Dubbo 框架内部都采用了 Netty。

为什么使用Netty NIO缺点 NIO 的类库和 API 繁杂,使用麻烦。你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、 ByteBuffer 等. 可靠性不强,开发工作量和难度都非常大 NIO 的 Bug。例如 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。

Netty优点 对各种传输协议提供统一的 API 高度可定制的线程模型——单线程、一个或多个线程池 更好的吞吐量,更低的等待延迟 更少的资源消耗 最小化不必要的内存拷贝

编程题

在基于Netty的自定义RPC的案例基础上,进行改造。

案例版本: 序列化方式为String, 并根据自定义的 providerName 做为通信协议,服务端判断是否以”UserService“开头完成的案例。

要求完成改造版本: 序列化协议修改为 JSON,使用 fastjson 作为 JSON 框架,并根据 RpcRequest实体作为通信协议,服务端需根据客户端传递过来的 RpcRequest 对象通过反射,动态代理等技术,最终能够执行目标方法,返回字符串 "success"。

1、提供资料:代码工程、验证及讲解视频。(仓库中只有本次作业内容) 2、讲解内容包含:题目分析、实现思路、代码讲解。 3、效果视频验证 3.1 使用fastjson作为json框架。 3.2 客户端发送给服务端RpcRequest请求,服务端接收后,利用反射、动态代理执行目标方法,返回“success”

要点提示: (1)客户端代理的 invoke 方法中需封装 RpcRequest 对象,将其当做参数进行传递。 (2)服务端的 UserServiceImpl 类上添加 @Service 注解,在启动项目时,添加到容器中。 (3)服务端要添加 @SpringBootApplication 注解,main方法中添加。SpringApplication.run(ServerBootstrap.class, args);,进行启动扫描(注意项目启动类位置:扫描路径)。 (4)服务端在收到参数,可以借助反射及动态代理(如需用到ApplicationContext对象,可以借助实现 ApplicationContextAware 接口获取),来调用UserServiceImpl方法,最终向客户端返回”success“即可。 (5)既然传递的是RpcRequest对象了,那么客户端的编码器与服务端的解码器需重新设置

示例:

代码语言:javascript复制
pipeline.addLast(new RpcDecoder(RpcRequest.class, new JSONSerializer()));

具体协议对象、序列化接口及实现类、编解码器如下

通信协议对象:

代码语言:javascript复制
public class RpcRequest{

    /**
     * 请求对象的ID
     */
    private String requestId;

    /**
     * 类名
     */
    private String className;

    /**
     * 方法名
     */
    private String methodName;

    /**
     * 参数类型
     */

    private Class<?>[] parameterTypes;

    /**
     * 入参
     */

    private Object[] parameters;
    /getter/setter方法.....
}

fastjson依赖:

代码语言:javascript复制
      <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.41</version>
    </dependency>

序列化接口:

代码语言:javascript复制
public interface Serializer {
    /**
     * java对象转换为二进制
     *
     * @param object
     * @return
     */

    byte[] serialize(Object object) throws IOException;

    /**
     * 二进制转换成java对象
     *
     * @param clazz
     * @param bytes
     * @param <T>
     * @return
     */
    <T> T deserialize(Class<T> clazz, byte[] bytes) throws IOException;
}

采用JSON的方式,定义JSONSerializer的实现类:(其他序列化方式,可以自行实现序列化接口)

代码语言:javascript复制
public class JSONSerializer implements Serializer{

    @Override
    public byte[] serialize(Object object) {
        return JSON.toJSONBytes(object);
    }

    @Override
    public <T> T deserialize(Class<T> clazz, byte[] bytes) {
        return JSON.parseObject(bytes, clazz);
    }
}

编码器实现:(现在传输的是RpcRequest, 需要编码器将请求对象转换为适合于传输的格式)

代码语言:javascript复制
public class RpcEncoder extends MessageToByteEncoder {
    private Class<?> clazz;
    private Serializer serializer;

    public RpcEncoder(Class<?> clazz, Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf byteBuf) throws Exception {
        if (clazz != null && clazz.isInstance(msg)) {
            byte[] bytes = serializer.serialize(msg);
            byteBuf.writeInt(bytes.length);
            byteBuf.writeBytes(bytes);
        }
    }
}

修改 common包

  1. commom 编辑 pom 包,添加依赖 netty 和 fastjson 依赖
  2. 修改 IUserService 接口 和复制 RpcRequest类。复制Serializer 序列化反序列化相关类。根据已有 RpcEncoder 类,和 已知 netty 的 StringDecoder类去仿照编写 RpcDecoder 类。

修改 server 包

  1. 添加 spring-boot 依赖,添加 ApplicationContextUtil 类,为了更好的getBean
  2. 修改 UserServiceHandler 类,判断 msg 类型如果是 RpcRequest 则 调用实现类将返回接口返回到一个 result=“success”的结果。 这里要求UserServiceImpl 需要被扫描到。
  3. 由于返回的是字符串,所以和 StringEncoder 匹配, 然后将原来的 UserServiceImpl 类中的 StringDecoder 替换为 RpcDecoder。当然 @Service 注解不要忘记添加,并完成 invoke 方法的设计。

修改 client 包

  1. 封装 RpcRequest 对象
  1. 自定义事件处理器将入参改为 Object[] params, call 方法取第 0 个元素
  2. 修改 RPCConsummer 类中的 jdk 动态代理对象 内容 入参传入 objects 即可。

遇到的问题 问题1:空指针异常

代码语言:javascript复制
客户端入参: RpcRequest{requestId='f5bb70e6-c39d-4c2b-82f4-19225ae53941', className='aa.bb.Dog', methodName='bark', parameterTypes=[int, class java.lang.Void, class java.lang.Enum], parameters=[tom, 666]}
UserClientHandler ### setParam
UserClientHandler ### call
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
    at com.sun.proxy.$Proxy0.invoke(Unknown Source)
    at com.lagou.boot.ConsumerBoot.main(ConsumerBoot.java:35)
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.lagou.client.RPCConsummer$2.invoke(RPCConsummer.java:88)
    ... 2 more
Caused by: java.lang.NullPointerException
    at com.lagou.handler.UserClientHandler.call(UserClientHandler.java:39)
    at com.lagou.handler.UserClientHandler.call(UserClientHandler.java:11)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
UserClientHandler ### channelActive 通道就绪事件

问题1:解决方法 RPCConsummer 的 createProxyEnhance 中 initClient 之后休息几秒

代码语言:javascript复制
// 休息几秒, 等待 channelActive 通道就绪才执行后来的submit
Thread.sleep(2000);

总结这次作业

位置很重要, encoder 和 decoder 看上去可以交换位置,但是 serviceHandler 在本项目中必须要放在 encoder 和 decoder 之后,如果提至首位,则代码会走不动了。

答题

1、下列属于分布式环境中的问题的是()[分值:17] 您的回答:A通信异常:B网络分区:C三态D节点故障√(得分:17)

2、关于CPA定理,下列说法正确的是()[分值:17] A、CA:不会碰到由于网络分区带来的负面影响。但放弃了系统的可扩展性 B、cp:一旦系统遇到网络分区或其 他故障或为了保证一致性时,会放弃可用性 C、ap:满足可用性和分区容错性,当出现网络分区,同时为了保证可用性,必 须让节点继续对外服务,这样必然导致失去一致性 D、分区容错性是一个分布式系统必然需要面对和解决的问题√(得分:17)

3、关于BASE理论,下列说法错误的是()[分值:16] A、(Basicaly Available)基本可用是指分布式系统在出现不可预知故障的时候,会处于暂时不可用,经过一段时间后,会恢复到可用状态√(得分:16)

4、以下属于2PC缺点的是()[份值:17 A、同步阻塞B、数据不一致iD、过于保守√(得分:17)

5、以下关于三阶段提交协议3PC说法正确的是()[分值:16] A、相比较2PC,3PC降低了参与者的阻塞范围: B、在3PC中,仍然存在数据不一致的情况出现 C、在3PC中,协调者和参与者都设置了超时机制: D、PreCommt是一个缓冲,保证了在最后提交阶段之前各参与节点的状态是一致 的√(得分:16)

6、以下属于Paxos角色的是 A Proposer:提索发起者 B Acceptor:决策者,可以批准提 D Leamers:最终决策的学习者

1、以下属于分布式系统设计策略的是( A、可用通过周期检测心跳机制、累计失效检测机制可以帮助判断节点是否“死亡”,如果判断“死亡”,会把该节点踢出集群: B、系统高可用性的常用设计模式包括三种:主备(Master-SLave)、互备(Active-Active)和集群(Cluster)模式。 C、容错指是系统对于错误包容的能力,而非允许故障产生 D、负载均衡的关键在于使用多台集群服务器共同分担计算任务

1、以下属于RPC核心组件的是() 您的回答: A、客户端(Client)√ B、客户端存根(Client Stub)√ C、服务端(Server)√ D、服务端存根(Server Stub)√

2、关于1O模型相关概念,下列说法正确的是:() C、使用同步10时,Java自己处理IO的读写:√ D、一般异步是配合非阻塞使用的,这样才能发挥异步的效用√

3、关于NIO编程,下列说法正确的是:() 您的回答: A、NIO编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接藏接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责√ B、NIO数据读写是以字节块为单位√ C、JDK的NIO底层由epol实现,容易出现空轮训bug√

4、关于Netty,下列说法正确的是: A、Netty基于NIO实现,同时在NIO模型操作上封装了一层异步事件驱动模型,所以整个Nety都是异步的√ B、Dubbo的底层就是基于Netty√ C、boostrap用来为Netty 程序的启动组装配置一些必须要组件√ D、addlast方法将一个一个的ChannelHandler添加到责任链上,在请求进来或者响应出去时都会经过链上所有ChannelHandler的处理)√

参考

Netty: Home https://netty.io/index.html

0 人点赞