基于Netty实现的简单RPC调用

2021-12-28 12:43:27 浏览数 (1)

模块

rpc-api

rpc-consumer

rpc-provider

依赖:
代码语言:javascript复制
<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.25.Final</version>
 </dependency>
rpc-api代码
代码语言:javascript复制
// 接口, consumer和provider分别添加api的依赖
public interface ISayHelloService {
    String say(String name);
}

// 请求参数封装
public class Request{
     private String methodName; // 方法名
    private String className;  // 全类名
    private Object[] values; // 实参列表
}
rpc-provider代码:
代码语言:javascript复制
public class Server {
    private void start() throws InterruptedException {
       // 初始化
        ServerHandler h = new ServerHandler();
        h.init();
        
        EventLoopGroup boosGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.channel(NioServerSocketChannel.class)
                    .group(boosGroup, workGroup)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline()
                                    .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4,
                                            0, 4))
                                    .addLast(new LengthFieldPrepender(4))
                                    .addLast("encoder", new ObjectEncoder())
                                    .addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
                                    .addLast(h);
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 10)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture future = bootstrap.bind(8080).sync();
            future.channel()
                    .closeFuture()
                    .sync();
            System.out.println("server running, listener port : 8080 !");
        } finally {
            boosGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        try {
            new Server().start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
   // 具体业务处理
    
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    private static String bashScanPackage = "com.data.service.impl";
    private ClassLoader classLoader = this.getClass().getClassLoader();
    private static final Map<String, Mapping> SERVICES = new HashMap<>();


    /**
     * 初始化
     *
     * @throws ClassNotFoundException
     */
    public void init() {
        URL url = classLoader.getResource(bashScanPackage.replaceAll("\.", "/"));
        String filePath = url.getFile();
        File file = new File(filePath);
        for (String s : file.list()) {
            s = s.substring(0, s.indexOf("."));
            Class clazz = null;
            try {
                clazz = Class.forName(bashScanPackage   "."   s);
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            Method[] methods = clazz.getDeclaredMethods();
            String interfaceName = clazz.getInterfaces()[0].getName();
            for (Method m : methods) {
                Mapping mapping = new Mapping();
                mapping.setMethod(m);
                mapping.setParameters(m.getParameterTypes());
                try {
                    mapping.setTarget(clazz.newInstance());
                } catch (InstantiationException e) {
                    e.printStackTrace();
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
                SERVICES.putIfAbsent(interfaceName   "."   m.getName(), mapping);
            }

        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request = (Request) msg;
        Object result;
        String key = request.getClassName()   "."   request.getMethodName();
        if (!SERVICES.containsKey(key)) {
            return;
        }
        Mapping clazz = SERVICES.get(key);
        result = clazz.getMethod().invoke(clazz.getTarget(), request.getValues());
        ctx.write(result);
        ctx.flush();
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

// 业务提供方封装的参数
public class Mapping {
    private Object target;
    private Method method;
    private Class[] parameters;
}
rpc-consumer代码
代码语言:javascript复制
// 动态代理类
public class ProxyHandler {
    public static <T> T create(Class<?> clazz) {
        MethodProxy proxy = new MethodProxy(clazz);
        Class<?>[] interfaces = clazz.isInterface() ?
                new Class[]{clazz} :
                clazz.getInterfaces();
        T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), interfaces, proxy);
        return result;
    }

    private static class MethodProxy implements InvocationHandler {
        private Class<?> clazz;

        public MethodProxy(Class<?> clazz) {
            this.clazz = clazz;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            return invoke(method, args);
        }

        public Object invoke(Method method, Object[] args) {

            Request msg = new Request();
            msg.setClassName(this.clazz.getName());
            msg.setMethodName(method.getName());
            msg.setValues(args);
            msg.setParameters(method.getParameterTypes());
            EventLoopGroup group = new NioEventLoopGroup();
            RPCBusinessHandler handler = new RPCBusinessHandler();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,
                                        4, 0, 4));
                                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                                pipeline.addLast("encoder", new ObjectEncoder());
                                pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                pipeline.addLast("handler", handler);
                            }
                        });

                ChannelFuture future = b.connect("localhost", 8080).sync();
                future.channel().writeAndFlush(msg).sync();
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
            return handler.getResult();
        }
    }
    
// 客户端逻辑处理
public class RPCBusinessHandler extends ChannelInboundHandlerAdapter {
    private Object result;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        this.result = msg;
    }



    public Object getResult(){
        return this.result;
    }    
    
    
// 测试
 ISayHelloService service = new ProxyHandler().create(ISayHelloService.class);
 System.out.println(service.say("tony"));    

0 人点赞