TAF 必修课(四):过载保护

2021-08-03 14:56:08 浏览数 (1)

作者:温昂展

经过实习过程中,leader和导师在思维逻辑上的指导,自己再有意识的加以训练,我觉得非常受益。就如这部分的理解,目前就加深了很多。所以说,思维决定行为、行为决定习惯、习惯决定了代码写得好不好啊...以前高中数学老师经常说的一句话 :“ 这道题半夜醒来我都会做的 ”,我想他思维挺好。

扯远了,进入正题

一、过载

何为过载? 简单来说,就是当前负载超过了系统的最大处理能力,如:

系统实际每秒能处理的请求量为100个,但实际每秒的请求量却远大于100个,可以判定系统过载。

究其原因,我们可以把原因归结为:请求量的上升

那么如何保护? 其实就是对请求量的控制问题

二、 过载保护

依据之前介绍的TAF线程模型进行的分析,首先一个新的请求到达服务端先是建立一个连接,接着直到该连接被释放,期间发生的读写请求都会占用线程池的资源。因此,过载保护就是要:

其一,限定最大连接数,

其二,限定最大能够同时处理的请求量。

三、TAF实现

1. 限定最大连接数

上一节已经分析过,TAF通过SessionManagerImpl管理器对连接进行管理,因此我们可以考虑在这个管理器上面做文章。仔细一想,管理器上不是可以注册监听器么?这样不就可以通过监听连接来简单实现么: 对创建和释放操作进行计数,同时在系统部署上线之前,预估好系统的处理能力,设定一个系统最大可承载的连接数阈值,一旦连接数到达阈值或是接近阈值,触发拒绝连接。

问题解决,看代码:

代码语言:txt复制
public class ConnectionSessionListener implements SessionListener {

    private final AtomicInteger connStat = new AtomicInteger(0);

    private final int MaxConnCount;

    public ConnectionSessionListener(int connCount) {
        MaxConnCount = connCount;
        System.out.println("MaxConnCount="   MaxConnCount);
    }

    @Override
    public synchronized void onSessionCreated(SessionEvent se) {
        System.out.println("onSessionCreated: "   connStat.get());
        
        if (connStat.get() >= MaxConnCount) {
            try {
                System.out.println("reached the max connection threshold, close the session.");
                se.getSession().close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }

            return;
        }

        connStat.incrementAndGet();
    }

    @Override
    public synchronized void onSessionDestoryed(SessionEvent se) {
        System.out.println("onSessionDestoryed: "   connStat.get());
        if (se.getSession() != null && se.getSession().getStatus() == SessionStatus.SERVER_CONNECTED && connStat.get() > 0) {
            connStat.decrementAndGet();
        }        
    }

}

以上就是监听器的事件处理函数实现,建立连接时触发计数器加一,释放连接时触发计数器减一,一旦计数器到达最大阈值限定,直接关闭新的连接

其中,连接的释放,每个session在配置中会有一个会话超时时间sessionTimeout,管理器的另一大作用就是不断地轮询地检查每个连接距离上次的操作(发生IO事件)时间是否超时,若超时则回收该连接,代码如下:

代码语言:txt复制
public synchronized void start() 
{
	if (started) return ;
	
	new Thread((Runnable)()->{
		System.out.println("[NAMI_CORE] The session manager service started...");
		
		long lastUpdateOperationTime = -1;
		
		while(true) 
		{
			try 					
			{
				for(Session session : sessionList) 
				{
					lastUpdateOperationTime = session.getLastOperationTime();
					//会话超时
					if ((System.currentTimeMillis() - lastUpdateOperationTime) > timeout) {
						String s = "The session has timed out. [from ip: "   session.getRemoteIp()   " port: "
										  session.getRemotePort()   "]";
						if (Utils.getLogger() != null) {
							Utils.getLogger().warn(s);
						} else {
							System.out.println("[NAMI_CORE] "   s);
						}
						session.asyncClose();
						unregisterSession(session); 
					}
				}
				
				Thread.sleep(interval);
			}
			catch (Exception ex)
			{
			}
		}	
	}, "SessionManageImpl Thread").start();
	started = true;
}

// 更新会话的上次操作时间实现,即发生IO,省略无关代码
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();				
while (iter.hasNext()) 
{
	SelectionKey key = iter.next();
	iter.remove();
	if( !key.isValid() ) continue;
 
	//1. Update the last operation time
	if (key.attachment() != null && key.attachment() instanceof Session) {
		((Session)key.attachment()).updateLastOperationTime();
	}
}

那么还有问题是,最大连接数这个阈值怎么获得?设定多少为合理?

前面已经分析,TAF多个Obj都会对同一个监听端口复用,也就是说在服务接收到请求的时候是没有办法直接区分开是哪个Obj的请求的,只有在后面的业务线程处理时才会分发到各个服务Obj上处理。因此,目前目前最大连接数只能是按整个服务的总量

每个Obj可以配置一个连接最大值,通过读取配置文件累加得到整个服务的连接数总量最大值,实现如下:

代码语言:txt复制
int connCount = 0;
for (Entry<String, ServantAdapterConfig> adapterConfigEntry : ConfigurationManager.getInstance()
		.getserverConfig().getServantAdapterConfMap().entrySet()) {
	if (OmConstants.AdminServant.equals(adapterConfigEntry.getKey())) {
		continue;
	}
	connCount  = adapterConfigEntry.getValue().getMaxConns();
}
ConnectionSessionListener sessionListener = new ConnectionSessionListener(connCount);
sessionManager.addSessionListener(sessionListener);

设定多少合理需要一定的经验,在系统上线之前就预估好系统的处理能力,同时根据 “动态运营” 的思想,大胆假设,小心验证,用运营数据支撑开发,验证我们的用户假设。

2. 限定最大请求量

服务端对请求怎么管理,由于涉及到生产消费模式,很容易地想到就是要维持一个缓存队列,请求到达时先把请求放入缓冲队列中。那么应该让系统同时从队列里取出几个请求进行处理呢?前面提到,由于请求的处理是解析request之后创建任务放到业务线程池去跑,那么这个问题其实就是线程池要开多大的问题。

是的,线程池提供了一个天然的缓冲区,我的理解是: 队列相当于一个管道,请求Tps决定了一次有多少流量流入管道,线程数大小决定了一次有多少流量从管道中流出,如果在一定的状态下(取出的请求有效)能稳定维持一个流入流出的平衡关系,那么就不会出现过载的问题了!但是任何平衡态都是难以维持的,总有被打破的可能,从这个理解出发,我认为过载保护的意义就在于:一方面尽可能地维持这一平衡态,另一方面,当这一平衡态被打破的时候提供保护机制,不至于搞垮整个服务乃至整个系统。

值得注意的地方是,每个服务Obj对应各自的线程池,也即上一节提到的业务线程池TaskThreadPoolExecutor,所有的请求会分发到各自的业务线程池去处理。因此在配置文件中,我们可以对每个业务线程池的大小theads,和缓存队列长度queuecap分别进行配置。代码如下:

代码语言:txt复制
private static synchronized Executor createExecutor(JceServiceRequest request) {
	String key = null, contextName = null, serviceName = null;
	Executor executor = null;

	if (request == null) return createDefaultExecutor(null);

	serviceName = request.getServantName();
	key = contextName   '_'   serviceName;

	executor = threadExecutors.get(key);
	if (executor != null) return executor;

	int minPoolSize = -1, maxPoolSize = -1, queueSize = -1;
	AppContainer container = ContainerManager.getContainer(AppContainer.class);
	AppContextImpl context = container.getDefaultAppContext();

	// 客户端请求错误的app或service时,返回默认的executor.
	if (context == null) {
		return getDefaultExecutor();
	}
	ServiceHomeSkeleton service = context.getCapHomeSkeleton(serviceName);
	if (service == null) {
		return getDefaultExecutor();
	}

	//设定队列长度,线程数(线程池大小)
	minPoolSize = service.getMinThreadPoolSize();
	maxPoolSize = service.getMaxThreadPoolSize();
	queueSize = service.getQueueSize();

	if (minPoolSize < 0 || maxPoolSize < 0 || queueSize < 0) {
		return createDefaultExecutor(key);
	}

	TaskQueue taskqueue = new TaskQueue(queueSize);
	TaskThreadPoolExecutor pool = new TaskThreadPoolExecutor(minPoolSize, maxPoolSize, 120, TimeUnit.SECONDS, taskqueue, new TaskThreadFactory("taserverThreadPool-exec-"   contextName   '.'   serviceName   '-'));
	taskqueue.setParent(pool);
	threadExecutors.put(key, pool);

	return pool;
}

保持平衡态就是要设定合理的初始值,但和前面对连接数的管理一样,线程池大小和缓冲队列长度的设定是不那么容易的,需要服务端处理能力的准确评估。

那么,平衡态被打破,怎么来做限定保护机制?

简单的过载保护就相当于只要定义一个拒绝策略,当其缓存队列放不下的就根据该策略对新来的请求进行处理,在TAF的实现上默认采用了直接丢弃(DiscardPolicy)的策略。某种层面来讲,这就实现了一个对请求量的限定,亦即我们说的过载保护。

当然,限定肯定不能等到队列满的时候(此时系统已经满负载运行)才去以丢弃的策略触发保护。 简单的这种“ 满则丢弃 ” 不就把所有新来的请求都拒绝了么?(那表现出来的形式就是服务不可用了!) 此时队列中的保存的请求是不是都有效呢,要不要优先丢弃掉? 总之问题很多,我们还需要在队列满之前就需要采取一些相关策略快速拒绝掉一些请求。

问题不单是这么简单,在队列满之前还可以采取什么策略呢?

既然前面是在空间对请求量做限定,容易想到可以考虑从时间这个切面上对请求量做限定,即:依据请求处理的时延来判断是否过载,而且这个想法应该是要比直接限定空间大小更加直观和接近真理的。回到最开始我们对过载的定义:“当前负载超过了系统的最大处理能力”, 因此过载的直接表征应该是系统出现了很多请求来不及处理! 相反的,如果认真考虑从空间上的限定则是不那么准确的,当前缓存队列满了固然说明负载超过了系统的处理能力(前提是队列长度设置合理),但是当队列没有满的时候是不是就一定说明系统没有过载呢? 显然不是,我把队列开到接近无限大呢?

实现上,这个请求处理时延的计算就要控制好了,请求到达并解析成功后,获取系统时间记为该请求的产生时间 bornTime,放入队列,在该请求从队列中取出后,真正发起业务处理前,获取当前系统时间计为该请求的处理时间 processTime,计算两个时间戳的差即为该请求处理时延(delayTime = processTime - bornTime)。可以在之前就直接设定好一个队列超时时间queueTimeout,处理该请求前先判断,若请求处理时延大于队列超时时间(delayTime > queueTimeout),我们可以判定当前服务过载且该请求失效,直接丢弃该请求。

除了以请求处理时延来判断系统是否过载,设定队列超时时间还是很有必要的,这里关系到和客户端的交互问题。设想如果请求在队列中等待了很长时间,但客户端对每个请求都设定了一个调用超时时间sync_invoke_timeout/ async_invoke_timeout(单向调用除外),如果超过这个时间服务端再去处理这个请求显然是没有意义的,而且会带来了无用的负载!

弄清楚处理逻辑,代码实现就不难了:

代码语言:txt复制
//接收请求处理函数
public void readRequest() throws IOException
{
	Request request = null; IoBuffer tempBuffer = null;
	
	try {
		tempBuffer = readBuffer.duplicate().flip();
		
		while(true) 
		{
			tempBuffer.mark(); 
			
			if (tempBuffer.remaining() > 0)
			{
				request = selectorManager.getProtocolFactory().getDecoder().decodeRequest(tempBuffer, this);
			}
			else
			{
				request = null; 
			}
			
			if (request != null) {
				try {
					//设定请求产生时间
					request.resetBornTime();
					selectorManager.getThreadPool().execute(new WorkThread(request, selectorManager));
				} catch (Exception ex) {
					ex.printStackTrace();
				}
			} else {
				tempBuffer.reset();
				readBuffer = resetIoBuffer(tempBuffer);
				break;
			}
		}
	} catch (ProtocolException ex) {
		close();
		ex.printStackTrace();
	}
}

//业务线程处理函数,省略无关代码
if (req != null) //处理server端request请求
{   
	req.setProcessTime(System.currentTimeMillis());
	req.init();
	Response res = selectorManager.getProcessor().process(req, req.getIoSession());
	if (!res.isAsyncMode()) req.getIoSession().write(res);
}

long startTime = req.getProcessTime();

// 2. Directly to give up if exceeds the value of maxWaitingTimeInQueue
int maxWaitingTimeInQueue = ConfigurationManager.getInstance().getserverConfig().getServantAdapterConfMap().get(request.getServantName()).getQueueTimeout();
waitingTime = (int) (startTime - req.getBornTime());
if (waitingTime > maxWaitingTimeInQueue) {
	throw new InternalException("Wait too long, server busy.");
}

以上不管是在队列长度还是队列超时时间上做限制,都隔着一个缓存队列,经过过期丢弃后实际发起业务处理的请求数是多少呢?我们考虑实现像sessionManager那样直接对连接数精确计数的方式。

容易想到,由于TAF基于容器理念,在管理上各个服务Obj是相互独立的,因此对每个服务设定一个最大负载max-load并维持一个调用计数器,在每次服务被调用时计数加一,服务调用结束计数减一,当计数器的值大于最大负载时可判定当前服务过载,直接丢弃该请求。

特别要注意的,服务的最大负载max-load配置是在容器初始化是从service.xml文件中加载的。至于为什么不是和最大连接数、队列长度等参数统一在管理平台上或是配置文件中配置, 我认为可能是设计者认为最大负载量应该和服务本身结合得更紧密些吧,但是我目前认为没有必要,统一配置可能更容易管理和理解。代码实现很简单:

代码语言:txt复制
public Object invoke(Method method, Object... args) throws Exception {
	Object value = null;
	Context<JceServiceRequest, JceServiceResponse> context = null;
	try {
		context = ContextManager.getContext();
		preInvokeCapHomeSkeleton(context);

		value = method.invoke(this.service, fixParamValueType(method, args));
	} finally {
		if (!ContextManager.getContext().response().isAsyncMode()) {
			postInvokeCapHomeSkeleton(context);
		}
	}
	return value;
}
//发起调用前计数加一
public void preInvokeCapHomeSkeleton(Context<JceServiceRequest, JceServiceResponse> context) {
	if (this.maxLoadLimit == -1) {
		return;
	}
	this.invokeNumbers.incrementAndGet();
	if (this.invokeNumbers.intValue() > this.maxLoadLimit) {
		throw new RuntimeException(this.name   " is overload. limit="   this.maxLoadLimit);
	}
}
//调用结束计数减一
public void postInvokeCapHomeSkeleton(Context<JceServiceRequest, ?> context) {
	if (this.maxLoadLimit == -1) {
		return;
	}
	this.invokeNumbers.decrementAndGet();
}

综上,这就是TAF实现的过载保护机制,从本质上出发目标都是一致的,具体实现上多种手段相结合。

3. 另一角度

以上都是请求量的角度展开思考,因为请求量的上升是导致系统过载的根本原因,如果换一种角度,我们很容易会产生一种设想: 系统过载的表现则为系统资源的耗尽,那么我们是不是能在服务端对CPU,内存、IO等系统资源进行监控呢?比如: 设定阈值,若系统资源利用率超过阈值则触发拒绝请求机制。

认真思考后,我认为这种方式在某些场景下是可行的,但是并不通用。原因是,

资源资源数据的耗尽并不意味着出现过载,如:服务开了一个较大的内存池,看起来内存资源耗尽了,实际上负载是足够的;又如现在都是多核服务器跑着多进程/多线程,单一的CPU耗尽也不能够代表服务就出现过载。

过载并一定就会出现资源耗尽,如:当前所有服务都正在等待后端处理逻辑的返回,但是并没有占用多少CPU,内存或IO资源。从这里我们也应该想到,过载除了体现在内部请求消息队列之外,还可能体现在外部对接的服务(如:数据库),此时过载会体现在外部连接的发送队列过长。

可见思考问题的思维逻辑方式非常重要,在这里其实就是一个因果关系的确定,如果在开始的时候切入问题的角度不恰当,可能得到的结论就不那么严谨了。当然,平时在思考的时候还是要多从几个角度多想想,有意识地加强训练,共勉。

四、过载预防

除了在过载发生时进行保护,在系统设计之初,还可以考虑做一些预防措施:

1. 负载均衡

虽然负载均衡的出发点并不在过载,但是我认为在某种程度上说将请求量分发,降低单个服务的请求量,这就相当于起到负载保护的作用了。在具体实现上,TAF的负载均衡主要是放在客户端TAF-Client实现的,之后再展开探讨。

2. 模块分离

在系统设计上可能各个模块的负载不尽相同,考虑将负载高的一些模块独立开来,轻重分离,按需单独部署; 这也是高内聚低耦合在系统层面的体现,使得局部的过载不至于扩散到整个系统。

3. 流控

同样在Client端,可以做限流,在实习mini项目中我就采用了一个令牌桶算法,往后可展开探讨一下。

4. 监控告警

如之前所述,系统资源耗尽在一定程度上可以说明系统过载。在服务监控中,可以将CPU、内存等资源作为监控指标,若占用率到达阈值及时告警。在TAF监控的实现上,确实也是这么做的。除了对请求流量和相关的业务指标做上报监控,系统资源监控,JVM监控必不可少。

感谢阅读,有错误之处还请不吝赐教。

0 人点赞