第二篇 连接池
连接池配置,请前往Thrift搭建分布式微服务(一)
下面要介绍的其实不是单一的连接池,应该说是连接池集合。因为它要管理多个Tcp Socket连接节点,每个服务节点都有设置了自己的最大激活连接数、最大空闲连接数、最小空闲连接数、等待连接时间。
代码语言:javascript复制 1 internal class ServiceTransportPool
2 {
3 public ServiceConfig ServiceConfig { get; set; }
4
5 public ConcurrentStack<TTransport> TransportPool { get; set; }
6
7 public AutoResetEvent ResetEvent { get; set; }
8
9 public int ActivedTransportCount { get; set; }
internal object sync_obeject = new object();
10 }
一个ServiceTransportPool类对应一个服务配置,一个服务配置对应一个服务节点。连接池集合应具有下列成员:
代码语言:javascript复制此代码由Java架构师必看网-架构君整理
1 internal class ThriftFactory
2 {
3 private static volatile List<ServiceTransportPool> transport_pools;
4
5 private static object sync_obj = new object();
6
7 private static IThriftFactoryMonitor monitor = new ThriftFactoryMonitor();
8 }
transport_pools实现了对服务节点的管理,monitor 用来监控连接池的状态,如上一篇所讲,等待连接超时怎么通知连接池管理者,就用monitor来实现。这里monitor有个默认的实现,后面再讲。
初始化连接池集合:
代码语言:javascript复制 1 static ThriftFactory()
2 {
3 if (transport_pools == null || transport_pools.Count == 0)
4 {
5 lock (sync_obj)
6 {
7 if (transport_pools == null || transport_pools.Count == 0)
8 {
9 var services = ConfigHelper.GetServiceConfigs();
10 transport_pools = new List<ServiceTransportPool>(services.Count);
11 foreach (var service in services)
12 {
13 ServiceTransportPool stp = new ServiceTransportPool()
14 {
15 ServiceConfig = service,
16 TransportPool = new ConcurrentStack<TTransport>(),
17 ResetEvent = new AutoResetEvent(false),
18 ActivedTransportCount = 0
19 };
20 transport_pools.Add(stp);
21 }
22 }
23 }
24 }
25 }
如何向连接池借出一个Socket连接:
代码语言:javascript复制此代码由Java架构师必看网-架构君整理
1 public static TTransport BorrowInstance(string serviceName)
2 {
3 var transpool = (from tp in transport_pools where tp.ServiceConfig.Name.ToUpper() == serviceName.ToUpper() select tp).FirstOrDefault();
4 if (transpool == null)
5 {
6 throw new ThriftException(string.Format("There Is No Service Named "{0}"", serviceName));
7 }
8
9 TTransport transport;
10 lock (transpool.sync_obeject)
11 {
12 if (transpool.TransportPool.Count() == 0)
13 {
14 if (transpool.ActivedTransportCount == transpool.ServiceConfig.MaxActive)
15 {
16 bool result = transpool.ResetEvent.WaitOne(transpool.ServiceConfig.WaitingTimeout);
17 if (!result)
18 {
19 monitor.TimeoutNotify(transpool.ServiceConfig.Name, transpool.ServiceConfig.WaitingTimeout);
20 }
21 }
22 else
23 {
24 transpool.TransportPool.Push(CreateTransport(transpool.ServiceConfig));
25 }
26 }
27
28 if (!transpool.TransportPool.TryPop(out transport))
29 {
30 throw new ThriftException("Connection Pool Exception");
31 }
32
33 transpool.ActivedTransportCount ;
34
35 if (transpool.TransportPool.Count() < transpool.ServiceConfig.MinIdle && transpool.ActivedTransportCount < transpool.ServiceConfig.MaxActive)
36 {
37 transpool.TransportPool.Push(CreateTransport(transpool.ServiceConfig));
38 }
39 }
40 if (!transport.IsOpen)
41 {
42 transport.Open();
43 }
44 Monitor();
45 return transport;
46 }
当实际激活的连接数达到服务节点配置的最大激活连接数,获取Socket连接的请求就将处于等待状态,超过等待时间设置,使用监视器方法monitor.TimeoutNotify()去通知管理者。连接池空闲的连接小于最小空闲连接数设置,每次请求连接都会建立一个新的连接放到池子里。
归还连接:
代码语言:javascript复制 1 public static void ReturnInstance(string serviceName,TTransport transport)
2 {
3 var transpool = (from tp in transport_pools where tp.ServiceConfig.Name.ToUpper() == serviceName.ToUpper() select tp).FirstOrDefault();
4 if (transpool == null)
5 {
6 throw new ThriftException("Connection Pool Exception");
7 }
8 if (transpool.TransportPool.Count() == transpool.ServiceConfig.MaxIdle)
9 {
10 transport.Flush();
11 if (transport.IsOpen)
12 {
13 transport.Close();
14 }
15 transport.Dispose();
16 }
17 else
18 {
19 lock (transpool.sync_obeject)
20 {
21 if (transport.IsOpen)
22 {
23 transport.Close();
24 }
25 transpool.TransportPool.Push(transport);
26 transpool.ActivedTransportCount--;
27 transpool.ResetEvent.Set();
28 }
29 }
30 Monitor();
31 }
当连接池最大空闲连接达到了服务节点设置的最大空闲连接数时,归还的连接将被销毁。借出连接和归还连接两段代码里都有一个Monitor()方法,此方法监控连接池连接的使用情况:
代码语言:javascript复制 /// <summary>
/// 监控连接池状态
/// </summary>
private static void Monitor()
{
List<Tuple<string, int, int>> tuples = new List<Tuple<string, int, int>>(transport_pools.Count);
foreach(var transpool in transport_pools)
{
Tuple<string, int, int> tuple = new Tuple<string, int, int>(transpool.ServiceConfig.Name, transpool.TransportPool.Count(), transpool.ActivedTransportCount);
tuples.Add(tuple);
}
monitor.Monitor(tuples);
}
此方法将每个服务连接池的空闲连接数、和激活的连接数传给前面提到的监视器。在连接等待超时和拿到连接池的运行参数时,最终进行什么动作还是由开发者去实现的。继承下面的接口,开发者可以自定义监视器。
代码语言:javascript复制 1 public interface IThriftFactoryMonitor
2 {
3 /// <summary>
4 /// 监控连接池运行状态
5 /// </summary>
6 /// <param name="tuple">元组集合,第一个元素表示服务名称、第二个元素表示空闲连接数量、第三个元素表示激活连接数量</param>
7 void Monitor(List<Tuple<string,int,int>> tuples);
8
9 /// <summary>
10 /// 等待连接超时
11 /// </summary>
12 void TimeoutNotify(string serviceName,int timeOut);
13 }
默认的监视器只是将连接池的运行状态记录到控制台:
代码语言:javascript复制 1 /// <summary>
2 /// 默认连接池状态监控类
3 /// </summary>
4 public class ThriftFactoryMonitor : IThriftFactoryMonitor
5 {
6 public virtual void Monitor(List<Tuple<string, int, int>> tuples)
7 {
8 foreach (var t in tuples)
9 {
10 Console.WriteLine(string.Format("{0}连接池,空闲连接数量:{1},激活连接数量:{2}", t.Item1, t.Item2, t.Item3));
11 }
12 }
13
14 public virtual void TimeoutNotify(string serviceName, int timeOut)
15 {
16 Console.WriteLine(string.Format("{0}连接池等待连接超时{1}", serviceName, timeOut));
17 }
18 }
开发者自己实现的监视器,如何能被连接池使用,其实在一开始的连接池初始化里,还有一段使用反射来初始化开发者定义的监视器的代码。开发者只需在上一篇介绍的Thrift.config里配置MonitorType,其他的就交给框架处理:
代码语言:javascript复制 1 static ThriftFactory()
2 {
3 ......
4 if(!string.IsNullOrWhiteSpace(ConfigHelper.ThriftConfig.MonitorType))
5 {
6 monitor = Invoker.CreateInstance(Type.GetType(ConfigHelper.ThriftConfig.MonitorType)) as IThriftFactoryMonitor;
7 if (monitor == null)
8 {
9 throw new ThriftException(string.Format("There Is No Monitor Implement Which Type Is "{0}"", ConfigHelper.ThriftConfig.MonitorType));
10 }
11 }
12 }
通过连接池与服务节点建立了Socket连接,下一篇将介绍客户端如何使用建立的Socket连接与服务端通信。
Thrift微服务代码下载Thrift.Utility