一 为什么需要线程池
- 官方解答
是维护的数据库连接的缓存,以便在将来需要对数据库发出请求时可以重用连接。 连接池用于提高在数据库上执行命令的性能。为每个用户打开和维护数据库连接,尤其是对动态数据库驱动的网站应用程序发出的请求,既昂贵又浪费资源。在连接池中,创建连接之后,将连接放在池中并再次使用,这样就不必创建新的连接。如果所有连接都正在使用,则创建一个新连接并将其添加到池中。连接池还减少了用户必须等待创建与数据库的连接的时间。
ClickHouse 原生ConnectionPool 缺点
- ClickHouse 官方 对于Connnection的实现过于死板,ConnectionPool 只能适用于ClickHouse TCP Connenction
class ConnectionPool : public IConnectionPool, private PoolBase<Connection>
{
public:
using Entry = IConnectionPool::Entry;
using Base = PoolBase<Connection>;
ConnectionPool(unsigned max_connections_,
const String & host_,
UInt16 port_,
const String & default_database_,
const String & user_,
const String & password_,
const String & cluster_,
const String & cluster_secret_,
const String & client_name_,
Protocol::Compression compression_,
Protocol::Secure secure_,
Int64 priority_ = 1)
: Base(max_connections_,
&Poco::Logger::get("ConnectionPool (" host_ ":" toString(port_) ")")),
host(host_),
port(port_),
default_database(default_database_),
user(user_),
password(password_),
cluster(cluster_),
cluster_secret(cluster_secret_),
client_name(client_name_),
compression(compression_),
secure(secure_),
priority(priority_)
{
}
2. PoolBase 构造函数需要继承自类ConnnectionPool 重新实现
代码语言:javascript复制 PoolBase(unsigned max_items_, Poco::Logger * log_)
: max_items(max_items_), log(log_)
{
items.reserve(max_items);
}
/** Creates a new object to put into the pool. */
virtual ObjectPtr allocObject() = 0;
- 目的
- 实现模版类,更加轻量化的实现
二 准备工作
基本C 概念
代码语言:javascript复制std::mutex 锁
std::unique_lock 唯一锁
std::lock_guard
std::shared_ptr 指针,带引用计数器 use_count
std::vector 数据
class 类
template class 模版类
网线限制参数
代码语言:javascript复制connection_timeout
send_timeout
receive_timeout
tcp_keep_alive_timeout
http_keep_alive_timeout
secure_connection_timeout
handshake_timeout
包参数
代码语言:javascript复制secure 安全模式 http?https
compression 数据传输是否压缩
INodeInfo 节点信息
代码语言:javascript复制ip 节点ip
role 节点的角色
IClusterInfo 集群信息
代码语言:javascript复制http/tcp port 集群访问的端口
user 用户名
password 密码
std::vector<NodeInfo> 集群节点
xxx 其他
网络限制参数
网络传输参数
Connection
代码语言:javascript复制IClusterInfo
socket/client Server 放提供的链接方式的client 的封装
ConnectionPool
三 类
3.1 集群信息
- 1 NodeInfo
struct NodeInfo
{
explicit NodeInfo(std::string host_, std::string role_ = "follower") : host(host_), role(role_) { }
std::string host;
std::string role;
};
using NodeInfoPtr = std::shared_ptr<NodeInfo>;
using NodeInfoPtrs = std::vector<NodeInfoPtr>;
- 2 ClusterInfo
struct ClusterInfo
{
enum class Compression
{
Disable = 0,
Enable = 1,
};
enum class Secure
{
Disable = 0,
Enable = 1,
};
explicit ClusterInfo(const Poco::Util::AbstractConfiguration & config, std::string config_name);
//to do ,memory ClusterInfo(xxx);
NodeInfoPtrs node_info_ptrs;
std::int32_t port;
std::string user = "root";
std::string password;
Secure security = Secure::Disable;
Compression compression = Compression::Enable;
ConnectionTimeouts timeouts;
};
using ClusterInfoPtr = std::shared_ptr<ClusterInfo>;
ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfiguration & config)
{
bool is_secure = config.getBool("secure", false);
security = is_secure ? Protocol::Secure::Enable : Protocol::Secure::Disable;
host = config.getString("host", "localhost");
port = config.getInt(
"port", config.getInt(is_secure ? "tcp_port_secure" : "tcp_port", is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
default_database = config.getString("database", "");
/// changed the default value to "default" to fix the issue when the user in the prompt is blank
user = config.getString("user", "default");
bool password_prompt = false;
if (config.getBool("ask-password", false))
{
if (config.has("password"))
throw Exception("Specified both --password and --ask-password. Remove one of them", ErrorCodes::BAD_ARGUMENTS);
password_prompt = true;
}
else
{
password = config.getString("password", "");
/// if the value of --password is omitted, the password will be set implicitly to "n"
if (password == "n")
password_prompt = true;
}
if (password_prompt)
{
#if !defined(ARCADIA_BUILD)
std::string prompt{"Password for user (" user "): "};
char buf[1000] = {};
if (auto * result = readpassphrase(prompt.c_str(), buf, sizeof(buf), 0))
password = result;
#endif
}
/// By default compression is disabled if address looks like localhost.
compression = config.getBool("compression", !isLocalAddress(DNSResolver::instance().resolveHost(host)))
? Protocol::Compression::Enable : Protocol::Compression::Disable;
timeouts = ConnectionTimeouts(
Poco::Timespan(config.getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),
Poco::Timespan(config.getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0),
Poco::Timespan(config.getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
Poco::Timespan(config.getInt("tcp_keep_alive_timeout", 0), 0));
}
- 3 ConnectionCache to do 数据缓存
class ConnectionCache
{
}
3.2 Connection
代码语言:javascript复制class IConnection
{
public:
IConnection() = default;
virtual ~IConnection() = default;
virtual void connect() = 0;
virtual void close() = 0;
virtual void heartbeat() = 0;
virtual void isLeader() = 0;
virtual void getleader() = 0;
virtual void getCluster() = 0;
virtual void send() = 0;
virtual void receive() = 0;
virtual bool inUse() = 0;
};
xxxx
3.2 ClickHouse ConnectionPool
- 优化
template<typename TObject>
class ConnectionPool {
public:
using ObjectPtr = std::shared_ptr<TObject>;
using ObjectPtrs = std::vector<ObjectPtr>;
ObjectPtr get() {
std::unique_lock<std::mutex> lock(object_mutex);
// 通过shared_ptr 直接简化封装,和use_count 判断object 是否在使用
while (true) {
for (auto object_ptr : object_ptrs)
if (object_ptr.use_count() == 2)
return object_ptr;
auto object_index = object_ptrs.size();
if (object_index < max_pool_size) {
alloObject(object_index);
continue;
}
condition_variable.wait_for(lock, std::chrono::milliseconds(10));
}
}
ConnectionPool(ClusterInfoPtr cluster_info_ptr_, size_t min_pool_size_, size_t max_pool_size_)
: cluster_info_ptr(std::move(cluster_info_ptr_)), min_pool_size(min_pool_size_),
max_pool_size(max_pool_size_) {
reserve();
}
void reserve() {
std::lock_guard<std::mutex> lock(object_mutex);
for (size_t object_index = object_ptrs.size(); object_index < min_pool_size; object_index )
alloObject(object_index);
}
void alloObject(size_t object_index) {
ObjectPtr object_ptr
= std::make_shared<TObject>(
cluster_info_ptr->node_info_ptrs[object_index % cluster_info_ptr->node_info_ptrs.size()]);
object_ptrs.emplace_back(object_ptr);
}
//to do 集群信息更新,更新ClusterInfo
~ConnectionPool() = default;
private:
ClusterInfoPtr cluster_info_ptr;
ObjectPtrs object_ptrs;
size_t min_pool_size;
size_t max_pool_size;
std::mutex object_mutex;
std::condition_variable condition_variable;
};
3.2 ClickHouse ConnectionPool
ClickHouse Object 管理
- 使用Entry 绑定Object 作代码调用的返回值
- Entry 来实际操作 Object
- 用户在使用的时候 必须接受Entry 的返回值
- 使用shared_ptr 来完成最终object 自我销毁
/** What is given to the user. */
class Entry
{
public:
friend class PoolBase<Object>;
Entry() = default; /// For deferred initialization.
/** The `Entry` object protects the resource from being used by another thread.
* The following methods are forbidden for `rvalue`, so you can not write a similar to
*
* auto q = pool.get()->query("SELECT .."); // Oops, after this line Entry was destroyed
* q.execute (); // Someone else can use this Connection
*/
Object * operator->() && = delete;
const Object * operator->() const && = delete;
Object & operator*() && = delete;
const Object & operator*() const && = delete;
Object * operator->() & { return &*data->data.object; }
const Object * operator->() const & { return &*data->data.object; }
Object & operator*() & { return *data->data.object; }
const Object & operator*() const & { return *data->data.object; }
bool isNull() const { return data == nullptr; }
PoolBase * getPool() const
{
if (!data)
throw DB::Exception("Attempt to get pool from uninitialized entry", DB::ErrorCodes::LOGICAL_ERROR);
return &data->data.pool;
}
private:
std::shared_ptr<PoolEntryHelper> data;
explicit Entry(PooledObject & object) : data(std::make_shared<PoolEntryHelper>(object)) { }
};
virtual ~PoolBase() = default;
/** Allocates the object. Wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite. */
Entry get(Poco::Timespan::TimeDiff timeout)
{
std::unique_lock lock(mutex);
while (true)
{
for (auto & item : items)
if (!item->in_use)
return Entry(*item);
if (items.size() < max_items)
{
ObjectPtr object = allocObject();
items.emplace_back(std::make_shared<PooledObject>(object, *this));
return Entry(*items.back());
}
LOG_INFO(log, "No free connections in pool. Waiting.");
if (timeout < 0)
available.wait(lock);
else
available.wait_for(lock, std::chrono::microseconds(timeout));
}
}
四 如何使用
eg: Redis
1 创建 RedisConnection
代码语言:javascript复制
class RedisConnection : public IConnection
{
public:
RedisConnection(NodeInfoPtr nodeInfoPtr);
RedisConnection(ClusterInfoPtr clusterInfoPtr);
~RedisConnection();
//redis client object
xxxxx
};
2 ConnectionPool 模版 Client 实现
代码语言:javascript复制class RedisClient : public SConnectionPool<RedisConnection>
{
public:
RedisClient(ClusterInfoPtr clusterInfoPtr_, size_t min_pool_size_, size_t max_pool_size_)
: ConnectionPool(clusterInfoPtr_, min_pool_size_, max_pool_size_), cluster_info_ptr(clusterInfoPtr_)
{
}
~RedisClient() = default;
};
想较 ClickHouse Connection Pool 更加轻量化
demo 后面放到个人github
代码语言:javascript复制people1 method5
xxxxxxx 1
people1 method1
xxxxxxx 2
people1 method2
xxxxxxx 4
people1 method4
xxxxxxx 3
people1 method3
xxxxxxx 1
people1 method1
xxxxxxx 5
people1 method5
xxxxxxx 4
people1 method4
xxxxxxx 5
people1 method5
xxxxxxx 1
people1 method1
xxxxxxx 2
people1 method2
xxxxxxx 3
people1 method3
xxxxxxx 1
people1 method1
xxxxxxx 5
people1 method5
xxxxxxx 3
people1 method3
xxxxxxx 4
people1 method4
xxxxxxx 4
people1 method4
xxxxxxx 3
people1 method3
xxxxxxx 1
people1 method1
xxxxxxx 2
people1 method2
xxxxxxx 5
people1 method5
xxxxxxx 4
people1 method4
感谢阅读!