rpc系列4-处理超时场景.及提供hook

2022-05-12 14:31:57 浏览数 (1)

问题:客户端发起远程调用,如果服务端长时间不返回怎么办?

这就涉及到一个调用超时的问题,平时我们应用中很多场景都会规定超时时间,比如:sql查询超时,http请求超时等。那么如果服务端方法执行的时间超过规定的timeout时间,那么客户端就需要调出当前调用,抛出TimeoutException。

好了,下面开始对RpcBuidler进行改造了,让其支持超时情况的处理。同样,先给出预期的测试方案和结果:

代码语言:javascript复制
// 业务类UserService在之前的基础上增加超时调用的方法:
public interface UserService {
    
    // other method
    
    /**
     * 超时测试
     */
    public boolean timeoutTest();
    
}
//实现类
public class UserServiceImpl implements UserService {
    
     // other method
    
    @Override
    public boolean timeoutTest() {
        try {
            //模拟长时间执行
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {}
        return true;
    }
}

ClientTest中测试代码:

代码语言:javascript复制
    @Test
    public void timeoutTest(){
        long beginTime = System.currentTimeMillis();
        try {
            boolean result = userService.timeoutTest(); 
        } catch (Exception e) {
            long period = System.currentTimeMillis() - beginTime;
            System.out.println("period:"   period);
            Assert.assertTrue(period < 3100);
        }
    }

有了异步方法的实现经验,其实这个超时处理过程和异步非常类似,都是利用Future机制来实现的,下面对doInvoke方法进行重构,返回一个异步任务:

代码语言:javascript复制
    private Future<RpcResponse> doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{

        //构造并提交FutureTask异步任务
        Future<RpcResponse> retVal = (Future<RpcResponse>) handlerPool.submit(new Callable<RpcResponse>(){
            @Override
            public RpcResponse call() throws Exception {
                Object res = null;
                try{
                    //创建连接,获取输入输出流
                    Socket socket = new Socket(host,port);
                    try{
                        ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
                        ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
                        try{
                            //发送
                            out.writeObject(request);
                            //接受server端的返回信息---阻塞
                            res = in.readObject();
                        }finally{
                            out.close();
                            in.close();
                        }
                    }finally{
                        socket.close();
                    }
                }catch(Exception e){
                    throw e;
                }
                return (RpcResponse)res;
            }
        });
        return retVal;
    }

回调方法invoke修改如下:

代码语言:javascript复制
    @Override
    public Object invoke(Object proxy, Method method,
            Object[] args) throws Throwable {
        //如果是异步方法,立即返回null
        if(asyncMethods.get().contains(method.getName())) return null;
        Object retVal = null;

        RpcRequest request = new RpcRequest(method.getName(), method.getParameterTypes(),args,RpcContext.getAttributes());
        RpcResponse rpcResp  = null;
        try{
            Future<RpcResponse> response = doInvoke(request);
            //获取异步结果
            rpcResp  = (RpcResponse)response.get(TIMEOUT,TimeUnit.MILLISECONDS);
        }catch(TimeoutException e){
            throw e;
        }catch(Exception e){}
        
        if(!rpcResp.isError()){
            retVal = rpcResp.getResponseBody();
        }else{
            throw new RpcException(rpcResp.getErrorMsg());
        }
        return retVal;
    }

可见,经过这样改造后,所有的方法调用都是通过Future获取结果。

提供Hook,让开发人员进行RPC层面的AOP。

首先看下题目提供的Hook接口:

代码语言:javascript复制
public interface ConsumerHook {
    public void before(RpcRequest request);
    public void after(RpcRequest request);
}
//实现类
public class UserConsumerHook implements ConsumerHook{
    @Override
    public void before(RpcRequest request) {
        RpcContext.addAttribute("hook key","this is pass by hook");
    }

    @Override
    public void after(RpcRequest request) {
        System.out.println("I have finished Rpc calling.");
    }
}

hook实现的功能很简单,即在客户端进行远程调用的前后执行before和after方法。

代码语言:javascript复制
public final class RpcConsumer implements InvocationHandler{

    //。。。
    
    //钩子
    private ConsumerHook hook;

    public RpcConsumer hook(ConsumerHook hook){
        this.hook = hook;
        return this;
    }

static{
        userService = (UserService)consumer.targetHostPort(host, port)
                            .interfaceClass(UserService.class)
                            .timeout(TIMEOUT)
                            .hook(new UserConsumerHook())//新增钩子
                            .newProxy();
    }
//。。。
}

//UserServiceImpl中的测试方法
public Map<String, Object> getMap() {
        Map<String,Object> newMap = new HashMap<String,Object>();
        newMap.put("name","getMap");
        newMap.putAll(RpcContext.getAttributes());
        return newMap;
}

我们只需要在doInvoke方法开始出添加钩子函数的执行逻辑即可。如下:

代码语言:javascript复制
    private Future<RpcResponse> doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{
        //插入钩子
        hook.before(request);
        //。。。
}

同时在asyncCall和invoke方法的结束添加after的执行逻辑。具体实现可以看源码。

github附上源码

rpc

0 人点赞