模块
rpc-api
rpc-consumer
rpc-provider
依赖:
代码语言:javascript复制<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
rpc-api
代码
代码语言:javascript复制// 接口, consumer和provider分别添加api的依赖
public interface ISayHelloService {
String say(String name);
}
// 请求参数封装
public class Request{
private String methodName; // 方法名
private String className; // 全类名
private Object[] values; // 实参列表
}
rpc-provider
代码:
代码语言:javascript复制public class Server {
private void start() throws InterruptedException {
// 初始化
ServerHandler h = new ServerHandler();
h.init();
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class)
.group(boosGroup, workGroup)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4,
0, 4))
.addLast(new LengthFieldPrepender(4))
.addLast("encoder", new ObjectEncoder())
.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
.addLast(h);
}
})
.option(ChannelOption.SO_BACKLOG, 10)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(8080).sync();
future.channel()
.closeFuture()
.sync();
System.out.println("server running, listener port : 8080 !");
} finally {
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
try {
new Server().start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 具体业务处理
public class ServerHandler extends ChannelInboundHandlerAdapter {
private static String bashScanPackage = "com.data.service.impl";
private ClassLoader classLoader = this.getClass().getClassLoader();
private static final Map<String, Mapping> SERVICES = new HashMap<>();
/**
* 初始化
*
* @throws ClassNotFoundException
*/
public void init() {
URL url = classLoader.getResource(bashScanPackage.replaceAll("\.", "/"));
String filePath = url.getFile();
File file = new File(filePath);
for (String s : file.list()) {
s = s.substring(0, s.indexOf("."));
Class clazz = null;
try {
clazz = Class.forName(bashScanPackage "." s);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
Method[] methods = clazz.getDeclaredMethods();
String interfaceName = clazz.getInterfaces()[0].getName();
for (Method m : methods) {
Mapping mapping = new Mapping();
mapping.setMethod(m);
mapping.setParameters(m.getParameterTypes());
try {
mapping.setTarget(clazz.newInstance());
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
SERVICES.putIfAbsent(interfaceName "." m.getName(), mapping);
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request request = (Request) msg;
Object result;
String key = request.getClassName() "." request.getMethodName();
if (!SERVICES.containsKey(key)) {
return;
}
Mapping clazz = SERVICES.get(key);
result = clazz.getMethod().invoke(clazz.getTarget(), request.getValues());
ctx.write(result);
ctx.flush();
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
// 业务提供方封装的参数
public class Mapping {
private Object target;
private Method method;
private Class[] parameters;
}
rpc-consumer
代码
代码语言:javascript复制// 动态代理类
public class ProxyHandler {
public static <T> T create(Class<?> clazz) {
MethodProxy proxy = new MethodProxy(clazz);
Class<?>[] interfaces = clazz.isInterface() ?
new Class[]{clazz} :
clazz.getInterfaces();
T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), interfaces, proxy);
return result;
}
private static class MethodProxy implements InvocationHandler {
private Class<?> clazz;
public MethodProxy(Class<?> clazz) {
this.clazz = clazz;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
return invoke(method, args);
}
public Object invoke(Method method, Object[] args) {
Request msg = new Request();
msg.setClassName(this.clazz.getName());
msg.setMethodName(method.getName());
msg.setValues(args);
msg.setParameters(method.getParameterTypes());
EventLoopGroup group = new NioEventLoopGroup();
RPCBusinessHandler handler = new RPCBusinessHandler();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,
4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("encoder", new ObjectEncoder());
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler", handler);
}
});
ChannelFuture future = b.connect("localhost", 8080).sync();
future.channel().writeAndFlush(msg).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
return handler.getResult();
}
}
// 客户端逻辑处理
public class RPCBusinessHandler extends ChannelInboundHandlerAdapter {
private Object result;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
this.result = msg;
}
public Object getResult(){
return this.result;
}
// 测试
ISayHelloService service = new ProxyHandler().create(ISayHelloService.class);
System.out.println(service.say("tony"));