redisson分布式远程调用
1. 分布式远程服务(Remote Service)
基于Redis的Java分布式远程服务,可以用来通过共享接口执行存在于另一个Redisson实例里的对象方法。换句话说就是通过Redis实现了Java的远程过程调用(RPC)。分布式远程服务基于可以用POJO对象,方法的参数和返回类不受限制,可以是任何类型。
分布式远程服务(Remote Service)提供了两种类型的RRemoteService实例:
服务端(远端)实例 - 用来执行远程方法(工作者实例即worker instance). 例如:
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceImpl someServiceImpl = new SomeServiceImpl();
// 在调用远程方法以前,应该首先注册远程服务
// 只注册了一个服务端工作者实例,只能同时执行一个并发调用
remoteService.register(SomeServiceInterface.class, someServiceImpl);
// 注册了12个服务端工作者实例,可以同时执行12个并发调用
remoteService.register(SomeServiceInterface.class, someServiceImpl, 12);
客户端(本地)实例 - 用来请求远程方法. 例如:
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceInterface service = remoteService.get(SomeServiceInterface.class);
String result = service.doSomeStuff(1L, "secondParam", new AnyParam());
客户端和服务端必须使用一样的共享接口,生成两者的Redisson实例必须采用相同的连接配置。客户端和服务端实例可以运行在同一个JVM里,也可以是不同的。客户端和服务端的数量不收限制。(注意:尽管Redisson不做任何限制,但是Redis的限制仍然有效。)
在服务端工作者可用实例数量 大于1 的时候,将并行执行并发调用的远程方法。
并行执行工作者数量计算方法如下: T = R * N
T - 并行执行工作者总数 R - Redisson服务端数量 N - 注册服务端时指定的执行工作者数量
超过该数量的并发请求将在列队中等候执行。
在服务端工作者实例可用数量为 1 时,远程过程调用将会按 顺序执行。这种情况下,每次只有一个请求将会被执行,其他请求将在列队中等候执行。
1.1. 分布式远程服务工作流程
分布式远程服务为每个注册接口建立了两个列队。一个列队用于请求,由服务端监听,另一个列队用于应答回执和结果回复,由客户端监听。应答回执用于判定该请求是否已经被接受。如果在指定的超时时间内没有被执行工作者执行将会抛出RemoteServiceAckTimeoutException错误。
下图描述了每次发起远程过程调用请求的工作流程。
1.2. 发送即不管(Fire-and-Forget)模式和应答回执(Ack-Response)模式
分布式远程服务通过org.redisson.core.RemoteInvocationOptions类,为每个远程过程调用提供了一些可配置选项。这些选项可以用来指定和修改请求超时和选择跳过应答回执或结果的发送模式。例如:
// 应答回执超时1秒钟,远程执行超时30秒钟
RemoteInvocationOptions options = RemoteInvocationOptions.defaults();
// 无需应答回执,远程执行超时30秒钟
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck();
// 应答回执超时1秒钟,不等待执行结果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noResult();
// 应答回执超时1分钟,不等待执行结果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().expectAckWithin(1, TimeUnit.MINUTES).noResult();
// 发送即不管(Fire-and-Forget)模式,无需应答回执,不等待结果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().noResult();
RRemoteService remoteService = redisson.getRemoteService();
YourService service = remoteService.get(YourService.class, options);
1.3. 异步调用
远程过程调用也可以采用异步的方式执行。异步调用需要单独提交一个带有@RRemoteAsync注释(annotation)的异步接口类。异步接口方法签名必须与远程接口的方法签名相符。异步接口的返回类必须是org.redisson.api.RFuture对象或其子对象。在调用RRemoteService.get方法时将对异步接口的方法进行验证。异步接口无须包含所有的远程接口里的方法,只需要包含要求异步执行的方法即可。
// 远程接口
public interface RemoteInterface {
Long someMethod1(Long param1, String param2);
void someMethod2(MyObject param);
MyObject someMethod3();
}
// 匹配远程接口的异步接口
@RRemoteAsync(RemoteInterface.class)
public interface RemoteInterfaceAsync {
RFuture someMethod1(Long param1, String param2);
RFuture someMethod2(MyObject param);
}
RRemoteService remoteService = redisson.getRemoteService();
RemoteInterfaceAsync asyncService = remoteService.get(RemoteInterfaceAsync.class);
1.4. 取消异步调用
通过调用Future.cancel()方法可以非常方便的取消一个异步调用。分布式远程服务允许在三个阶段中任何一个阶段取消异步调用:
远程调用请求在列队中排队阶段
远程调用请求已经被分布式远程服务接受,还未发送应答回执,执行尚未开始。
远程调用请求已经在执行阶段
想要正确的处理第三个阶段,在服务端代码里应该检查Thread.currentThread().isInterrupted()的返回状态。范例如下:
// 远程接口
public interface MyRemoteInterface {
Long myBusyMethod(Long param1, String param2);
}
// 匹配远程接口的异步接口
@RRemoteAsync(MyRemoteInterface.class)
public interface MyRemoteInterfaceAsync {
RFuture myBusyMethod(Long param1, String param2);
}
// 远程接口的实现
public class MyRemoteServiceImpl implements MyRemoteInterface {
public Long myBusyMethod(Long param1, String param2) {
for (long i = 0; i < Long.MAX_VALUE; i ) {
iterations.incrementAndGet();
if (Thread.currentThread().isInterrupted()) {
System.out.println("interrupted! " i);
return;
}
}
}
}
RRemoteService remoteService = redisson.getRemoteService();
ExecutorService executor = Executors.newFixedThreadPool(5);
// 注册远程服务的服务端的同时,通过单独指定的ExecutorService来配置执行线程池
MyRemoteInterface serviceImpl = new MyRemoteServiceImpl();
remoteService.register(MyRemoteInterface.class, serviceImpl, 5, executor);
// 异步调用方法
MyRemoteInterfaceAsync asyncService = remoteService.get(MyRemoteInterfaceAsync.class);
RFuture future = asyncService.myBusyMethod(1L, "someparam");
// 取消异步调用
future.cancel(true);