概述
为何要池化RabbitMq的连接?这就涉及到了两个基本的RabbitMq概念:Connection
和Channel
。
Connection
Connection对象,就是一个TCP连接对象。
Channel
虚拟连接。虚拟连接建立在上面Connection对象的TCP连接中。数据流动都是在Channel中进行的。每个Connection对象的虚拟连接也是有限的,如果单个Connnection的Channel对象超出指定范围了,也会有性能问题,另外一个TCP连接上的多个虚拟连接,实际在传输数据时,传输数据的虚拟连接还是独占了TCP连接,其它虚拟连接在排队等待。
池化设计
参考线程池、DB连接池的设计,同样抽象一个池化对象来管理Connection以及Channel两个对象,回收复用连接。
实现落地
ObjectPool实现
其实是基于微软官方实现的池扩展库来做的,描述请见 ObjectPool对象池设计模式。
具体实现见 Publishing RabbitMQ Message In ASP.NET Core
代码请见 https://github.com/anehir/PooledRabbitClient
ABP中的IConnectionPool
在ABP的Volo.Abp.RabbitMQ库中也实现了IConnection以及IChannel两者的池化管理。
在ABP中,通过一个简单的并发字典来缓存已有的RabbitMq连接,如果连接有的话,就直接返回否则就创建。
GetOrAdd方法并不是线程安全的,但如果是基于lazy来实现则是线程安全。
代码语言:javascript复制var lazyConnection = Connections.GetOrAdd(
connectionName, () => new Lazy<IConnection>(() =>
{
var connection = Options.Connections.GetOrDefault(connectionName);
var hostnames = connection.HostName.TrimEnd(';').Split(';');
// Handle Rabbit MQ Cluster.
return hostnames.Length == 1 ? connection.CreateConnection() : connection.CreateConnection(hostnames);
})
);
return lazyConnection.Value;