作者:温昂展
一、 初始化
本质上,RPC客户端实现的原理就是Java动态代理技术,中间的网络传输就是协议编解码和序列化技术,协议部分放到后面探讨,本节将围绕客户端初始化过程,生成RPC代理对象和发起远程调用三个方面展开。
客户端生成RPC代理对象之前需要做一些准备工作,整个初始化过程可概括如下,下面分点叙述:
1. 初始化Communicator
Communicator,即远程通讯器,用于发起远程调用时创建出RPC代理对象,是整个RPC调用的入口。
通讯器Communicator采用工厂生成 单例模式,同一通讯器配置CommunicatorConfig只会对应一个通讯器对象,配置一样可以从模版文件中读取,主要配置项包括:主控Registry路由地址、同步调用超时时间、异步调用的超时时间、模块名moduleName、是否启用set分组、上报服务状态的周期、最大连接数(默认为4个)、最大线程数、队列长度、默认日志路径和级别等。
初始化过程中会根据以上配置的Locator配置项基于UUID算法为Communicator生成一个唯一性标识ID,接着初始化客户端日志Logger(设置默认日志路径和日志级别),构建客户端线程池,每个通讯器(或模块module)对应一个业务线程池。代码如下:
代码语言:txt复制private void initCommunicator(CommunicatorConfig config) throws ConfigException {
if (inited.get()) {
return;
}
lock.lock();
try {
if (!inited.get()) {
try {
ClientLogger.init(config.getLogPath(), config.getLogLevel());
if (StringUtils.isEmpty(config.getLocator())) {
this.id = UUID.randomUUID().toString().replaceAll("-", "");
} else {
this.id = UUID.nameUUIDFromBytes(config.getLocator().getBytes()).toString().replaceAll("-", "");
}
this.communicatorConfig = config;
this.threadPoolExecutor = ClientPoolManager.getClientThreadPoolExecutor(config);
inited.set(true);
} catch (Throwable e) {
inited.set(false);
throw new ConfigException(e);
}
}
} finally {
lock.unlock();
}
}
ServantProxy是事先定义好的远程代理接口,RPC动态代理生成时将会继承该接口,同时继承传入的业务接口,绑定调用处理器。动态代理类同样采用单例模式,同一个服务(objName标识)只会对应同一个动态代理对象。如图:
远程调用的核心逻辑就放在调用处理器ObjectProxy中,那么接下来要探究的核心问题就在于ObjectProxy如何根据RPC代理对象执行的不同方法触发相应的网络请求了,以下过程均用于创建并初始化调用处理器ObjectProxy。
2. 初始化服务代理配置
首先根据CommunicatorConfig初始化服务代理配置ServantProxyConfig,同时设置远程服务名;这里要特别说明的是服务代理配置中有一项为是否直连isDirectConnection的配置项,
- 直连即客户端不走主控直接通过 IP:Port 向服务节点发起连接请求;
- 非直连则客户端会定期通过主控查询刷新并拉取到该服务的所有部署上线的服务节点的路由地址信息,同时将路由信息保存到本地;之后如果从主控拉取失败则会直接从本地读取。
如果通过直连的方式进行连接,不仅增加了繁琐的配置工作,且假设这些配置的服务节点都宕机那么服务也就不可用了,但是通过主控可以实现服务路由地址的定期查询和更新,客户端无需事先知道实际的路由地址,也就是说这个过程对客户端开发是透明的,因此一般使用非直连的方式。
路由信息跟在服务名ObjName之后,配置规范如下:
代码语言:txt复制 tcp -h 127.0.0.1 -p 20000 -t 5000 -a 1 -s setDivision ---> -a 1表示active, -a 0表示nactive -s setDisivision,
3. 初始化负载均衡策略
TAF服务多节点部署,发起服务请求时应该选择哪个服务节点最为合理呢?
这就需要在请求路由分发时遵守一个支持负载均衡的策略,目前支持的负载均衡策略有Round-Robin轮询、带权重轮询、Hash、带权重Hash、一致性Hash; 默认使用的策略是Round-Robin轮询,若客户端调用时在请求上下文中(JceContext对应的map)有设置对应的hash参数则优先使用相应的hash策略,策略选用的优先级为: 一致性hash > hash > round-robin。
负载均衡策略具体放到后文深入探讨,这里不做展开。
4. 初始化Invoker构造器
根据接口类的注解声明生成对应的协议解析器(用于协议编解码),以此创建并初始化RPC服务协议的Invoker构造器 ProtocolInvoker,在其构造方法中执行初始化Invoker。
同时,取得业务接口中支持的Jce协议(之后可以考虑扩展到其他协议)的所有方法信息放入AnalystManager, 存储接口方法名对应方法参数和返回值
每个远程服务对应一个ProtocolInvoker,每个 ProtocolInvoker 对应一个 selectorManager 管理器,同时维护着该服务可用的服务节点列表 allInvoker;
5. 初始化Invoker
根据前面已经获取到的服务路由地址URL初始化Invoker列表,每个URL会对应创建一个Invoker,因此每个Invoker即对应一个远程服务节点,同时一个invoker默认有4个ServantClient(connections),每个ServantClient对应一个socket连接,初始化连接时(reConnect),注册在IO线程中 。通过URL参数指定了连接的相关属性,主要有 是否开启tcpNoDelay、连接超时时间、同步和异步调用超时时间、是否更改为UDP协议(默认为TCP)等,初始化代码如下:
代码语言:txt复制protected ServantClient initClient(URL url) {
ServantClient client = null;
try {
boolean tcpNoDelay = url.getParameter(Constants.TAF_CLIENT_TCPNODELAY, false);
long connectTimeout = url.getParameter(Constants.TAF_CLIENT_CONNECTTIMEOUT, Constants.default_connect_timeout);
long syncTimeout = url.getParameter(Constants.TAF_CLIENT_SYNCTIMEOUT, Constants.default_sync_timeout);
long asyncTimeout = url.getParameter(Constants.TAF_CLIENT_ASYNCTIMEOUT, Constants.default_async_timeout);
boolean udpMode = url.getParameter(Constants.TAF_CLIENT_UDPMODE, false);
if (this.selectorManager == null) {
this.selectorManager = ClientPoolManager.getSelectorManager(this.protocolFactory, this.threadPoolExecutor, true, udpMode, this.servantProxyConfig);
}
client = new ServantClient(url.getHost(), url.getPort(), this.selectorManager, udpMode);
client.setConnectTimeout(connectTimeout);
client.setSyncTimeout(syncTimeout);
client.setAsyncTimeout(asyncTimeout);
client.setTcpNoDelay(tcpNoDelay);
//将真正向服务端发起连接延迟到调用之前
} catch (Throwable e) {
throw new ProxyException(servantProxyConfig.getSimpleObjectName(), "Fail to create taf client|" url.toIdentityString() "|" e.getLocalizedMessage(), e);
}
return client;
}
发起请求时通过简单轮询的方式选择一个连接,以此达到一定的负载均衡,实现代码也很简单:
代码语言:txt复制private ServantClient getJceClient() {
return clients.length == 1 ? clients[0] : clients[(index.getAndIncrement() & Integer.MAX_VALUE) % clients.length];
}
以上所述关系图如下:
在完成初始化之后,还会创建两个定期执行的任务:
- 定期更新服务路由(若为非直连方式), 每次更新服务路由的同时更新Invoker列表;
- 定期上报服务的stat监控状态信息(如:成功率、耗时和流量等)。
附,客户端整体类图:
二、生成RPC代理对象
初始化工作完成,一切准备就绪,此时就可以指定要调用的服务接口(接口代码由定义接口描述语言IDL自动生成,参考:JCE协议部分),通过Communicator生成用于发起远程调用的RPC代理对象了。
通常每个远程服务对应一个RPC代理对象,使用动态代理技术生成,代码如下:
代码语言:txt复制private <T> Object createProxy(Class<T> clazz, ObjectProxy<T> objectProxy) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[] { clazz, ServantProxy.class }, objectProxy);
}
其中,第一个参数是类加载器对象,即指定用哪个类加载器来加载这个代理类到JVM的方法区,这里当然指定为当前线程的类加载器啦;第二个参数是代理接口,指定了生成的动态代理继承于那些接口,class即为我们要传进去的业务服务接口;第三个参数就是调用处理器类实例,即ObjectProxy,当RPC代理对象的某个方法被执行时,实际上会转而去执行该实例相应的处理方法(即发起网络请求request)。
需要注意的是,在Jdk 动态代理模式中,由于代理类都会实现Proxy类,受Java单继承特性的限制该方法只能针对接口创建代理类(考虑有些动态代理技术如:cglib、asm等就没有这种限制)。
下面看UML类图:
其中,代理接口ServantProxy就是代理类和委托类同时继承的接口,同时传入业务接口Prx,保证了行为的一致性,对于访问者Client来说两者没有什么区别,通过代理类这一中间层我们很好地屏蔽和保护了委托类对象,同时巧妙地加入了对委托类不同控制策略,在设计上获得了更大的灵活性。亦可以看到,Java动态代理机制近乎完美地实践了以上代理模式的设计理念,很好地避免了人工地去编写大量代理类重复代码,将代理类DynamicProxy交由编译器在运行时动态生成。
三、远程调用
经过上述过程(初始化、调用Communicator的stringToProxy方法),现在我们终于拿到了业务接口的RPC代理对象,调用代理对象的接口方法就能发起远程调用了。
实际上,所有的接口方法都会委托到调用处理器的invoke方法上,结合上述内容和之前介绍的TAF线程模型去理解,不难将发送和接收请求过程概括如下:
invoke核心实现如下:
代码语言:txt复制public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
//构造一次RPC调用的上下文
Invocation inv = protocolInvoker.invocation(proxy, method, args);
try {
//负载均衡容错,从服务列表中选择一台
Invoker<T> invoker = loadBalancer.select(protocolInvoker.getInvokers(), inv);
return invoker.invoke(inv);
} catch (Throwable e) {
if (ClientLogger.getLogger().isDebugEnabled()) {
ClientLogger.getLogger().debug(servantProxyConfig.getSimpleObjectName() " error occurred on invoke|" e.getLocalizedMessage(), e);
}
if (e instanceof NoInvokerException) {
throw new NoConnectionException(servantProxyConfig.getSimpleObjectName(), e.getLocalizedMessage(), e);
}
throw new ProxyException(servantProxyConfig.getSimpleObjectName(), e.getLocalizedMessage(), e);
}
}
值得注意的地方,在选取Invoker节点时除了考虑如上所述的负载均衡策略之外,客户端每次发起请求时都会对Invoker列表执行死活检查,屏蔽掉一定时间内异常的节点,根据一定的容错策略选取当前列表中的正常节点或重试被屏蔽的异常节点(重试后更新上次重试时间),在该Invoker执行请求结束后重新检查活性,具体的容错策略下节再具体探讨,这里也不做展开了。
如上所述,Invoker执行如下:
代码语言:txt复制protected Object doInvokeServant(final ServantInvocation inv) throws Throwable {
long begin = System.currentTimeMillis();
int ret = Constants.INVOKE_STATUS_SUCC;
boolean isAsync = JceHelper.isAsync(inv.getMethodName());
boolean isPromiseAsync = JceHelper.isPromiseAsync(inv.getMethodName());
boolean isOneWay = false;
String simpleMethodName = inv.getMethodName().replaceAll("promise_async_|async_", "");
try {
Method method = getApi().getMethod(inv.getMethodName(), inv.getParameterTypes());
//callback异步调用
if (isAsync) {
isOneWay = invokeWithAsync(method, inv.getArguments(), inv.getAttachments());
return null;
}
//Promise/Future调用
else if (isPromiseAsync) {
return invokeWithFuture(method, inv.getArguments(), inv.getAttachments());
}
//同步调用
else {
JceServiceResponse response = invokeWithSync(method, inv.getArguments(), inv.getAttachments());
ret = response.getRet() == JceHelper.JCESERVERSUCCESS ? Constants.INVOKE_STATUS_SUCC : Constants.INVOKE_STATUS_EXEC;
if (response.getRet() != JceHelper.JCESERVERSUCCESS) {
throw ServerException.makeException(response.getRet());
}
return response.getResult();
}
} catch (Throwable e) {
if (e instanceof TimeoutException) {
ret = Constants.INVOKE_STATUS_TIMEOUT;
} else if (e instanceof NotConnectedException) {
ret = Constants.INVOKE_STATUS_NETCONNECTTIMEOUT;
} else {
ret = Constants.INVOKE_STATUS_EXEC;
}
throw e;
} finally {
if (!(isAsync || isPromiseAsync) || ret != Constants.INVOKE_STATUS_SUCC) {
/** 同步调用或者发起异步调用失败 */
/** 死活检查 */
setAvailable(ServantnvokerAliveChecker.isAlive(getUrl(), config, ret));
/** 上报proxy的调用情况*/
InvokeStatHelper.getInstance().addProxyStat(objName).addInvokeTime(config.getModuleName(), objName, config.getSetDivision(), simpleMethodName, getUrl().getHost(), getUrl().getPort(), ret, System.currentTimeMillis() - begin);
} else if (isOneWay) {
/** 发起异步调用成功,但是无callback */
/** 死活检查 */
setAvailable(ServantnvokerAliveChecker.isAlive(getUrl(), config, ret));
}
//其余的异步调用在callback中进行死活检查和上报proxy调用情况
}
}
从代码中可以发现,上图中处理响应回包的过程根据业务需要有几种不同的方式(根据方法名做判别),根据客户端调用发起请求后是否等待,可将调用方式分成如下三种:
1. 同步调用
同步调用发起请求后会一直等待直到服务端响应回包或调用超时,实现上采用了一个闭锁CountdownLatch的同步工具类,另外出于非阻塞处理,引入了票据 Ticket 的概念来保存一次请求和响应的上下文,每次请求用一个唯一的 ticketNumber来标识,通过 TicketManager管理器定期检查回收过期的票据,同步调用实现代码如下:
代码语言:txt复制public <T extends ServantResponse> T invokeWithSync(ServantRequest request) throws IOException {
Ticket<T> ticket = null;
T response = null;
try {
ensureConnected();
request.setInvokeStatus(InvokeStatus.SYNC_CALL);
ticket = TicketManager.createTicket(request, session, this.syncTimeout);
Session current = session;
current.write(request);
//等待直到服务端响应回包或调用超时
if (!ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS)) {
if (current != null && current.getStatus() != SessionStatus.CLIENT_CONNECTED) {
throw new IOException("Connection reset by peer|" this.getAddress());
} else {
throw new TimeoutException("the operation has timeout, " this.syncTimeout "ms|" this.getAddress());
}
}
response = ticket.response();
if (response == null) {
throw new IOException("the operation is failed.");
}
return response;
} catch (InterruptedException e) {
ClientLogger.getLogger().error(e.getLocalizedMessage());
} finally {
if (ticket != null) {
TicketManager.removeTicket(ticket.getTicketNumber());
}
}
return response;
}
2. 异步调用
异步调用发起请求后不会等待响应回包而是继续往下执行,将回调callback注册到对应的 Ticket中,当接收到服务端响应回包时执行相应的回调方法(根据解析后response的响应码判别执行成功或异常回调);若超过异步调用超时时间,则票据被管理器回收,执行相应的过期处理方法。核心代码逻辑如下:
代码语言:txt复制public <T extends ServantResponse> void invokeWithAsync(ServantRequest request, Callback<T> callback) throws IOException {
Ticket<T> ticket = null;
try {
ensureConnected();
request.setInvokeStatus(InvokeStatus.ASYNC_CALL);
ticket = TicketManager.createTicket(request, session, this.asyncTimeout, callback);
Session current = session;
current.write(request);
//不等待
} catch (Exception ex) {
if (ticket != null) {
TicketManager.removeTicket(ticket.getTicketNumber());
}
throw new IOException("error occurred on invoker with async", ex);
}
}
//接收到响应回包
public void notifyResponse(T response)
{
this.response = response;
if (this.callback != null) this.callback.onCompleted(response);
if (ticketListener != null) ticketListener.onResponseReceived(this);
}
//TicketManager定期检查回收
static
{
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable()
{
long currentTime = -1;
public void run()
{
Collection<Ticket<?>> values = tickets.values();
currentTime = System.currentTimeMillis();
for(Ticket<?> t : values)
{
if ((currentTime - t.startTime) > t.timeout) {
removeTicket(t.getTicketNumber());
t.expired();
}
}
}
}, 500, 500, TimeUnit.MILLISECONDS);
}
//过期处理
public void expired()
{
this.expired = true;
if (callback != null) callback.onExpired();
this.countDown();
if (ticketListener != null) ticketListener.onResponseExpired(this);
}
3. Future调用
Future调用与同步调用比较类似,注册回调callback到 Ticket中(此回调函数与异步回调有所区别),将Ticket封装到JcePromiseFuture中作为返回值直接返回,此时不等待服务端响应,之后根据业务需要再调用Future的get方法获取到response的返回结果,此时客户端会停止等待直到接收到响应回包,实现代码如下:
代码语言:txt复制public <T extends ServantResponse, V> Future<V> invokeWithFuture(ServantRequest request,
Callback<T> callback) throws IOException {
Ticket<T> ticket = null;
try {
ensureConnected();
request.setInvokeStatus(InvokeStatus.FUTURE_CALL);
ticket = TicketManager.createTicket(request, session, this.syncTimeout, callback);
Session current = session;
current.write(request);
return new JcePromiseFuture<T, V>(ticket);
} catch (Exception ex) {
if (ticket != null) {
TicketManager.removeTicket(ticket.getTicketNumber());
}
throw new IOException("error occurred on invoker with future", ex);
}
}
4. 其他
此外,客户端调用还有一种单向调用的方式,即调用之后不需要任何服务端的响应和业务回调。实现上我们直接复用异步调用逻辑即可,此时callback函数为null,将JCEONEWAY标识设置到请求报文中,同时此类请求不计入节点服务stat上报中。
另外在异步调用的实现上C 版本有一种基于协程(coroutine)的方式,且当下比较火的Golang也在coroutine方式上提供了很好的支持(即goroutine),是可以从本质上解决业务逻辑的割裂问题的优雅方案,而Java在语言层面上不支持coroutine特性,但可以在JVM上做框架级或基于其他JVM语言(如Scala)的实现,如早期的Kilim以及后面比较成熟的Quasar,而Scala则从语言级支持了Actor Model。目前TAF-Java还没有实现这种方式,设想以后可以从这个方面多学习思考,对比分析一下。
感谢阅读,有错误之处还请不吝赐教。