jediscluster 关闭 连接池_Redis——JedisCluster

2022-11-18 15:17:35 浏览数 (1)

smart客户端

实现原理(追求性能,不使用代理)

从集群中选一个可运行节点,使用cluster slots初始化槽和节点映射。

将cluster slots的结果映射到本地,为每个节点创建JedisPool。

执行命令

执行命令

执行命令的过程简单来说,就是通过CRC16计算出key的槽,根据节点映射直接访问目标节点,如果出错,就随机挑选一个节点,通过moved重定向访问目标节点,并且重新初始化节点映射。

好吧,直接上源码

JedisClusterCommand.java

//命令的执行过程

public T run(String key) {

if (key == null) {

throw new JedisClusterException(“No way to dispatch this command to Redis Cluster.”);

}

//这里是真正的执行函数,参数分别为UTF-8编码的key二进制数组,重定向的次数,是否尝试连接随机节点,是否ask重定向

return runWithRetries(SafeEncoder.encode(key), this.redirections, false, false);

}

private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) {

if (redirections <= 0) {

//对尝试连接目标节点的次数做判断,超过预设次数抛出异常

throw new JedisClusterMaxRedirectionsException(“Too many Cluster redirections?”);

}

Jedis connection = null;

try {

if (asking) {

//是否ask重定向

// TODO: Pipeline asking with the original command to make it

// faster….

connection = askConnection.get();

connection.asking();

// if asking success, reset asking flag

asking = false;

} else {

if (tryRandomNode) {

//是否尝试连接随机节点

connection = connectionHandler.getConnection();

} else {

//计算出key的槽位置,然后从本地缓存中获取目标主机的信息

connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));

}

}

//执行命令

return execute(connection);

} catch (JedisConnectionException jce) {

//连接出错,是否尝试随机节点

if (tryRandomNode) {

// maybe all connection is down

throw jce;

}

// release current connection before recursion释放当前连接

releaseConnection(connection);

connection = null;

//重新尝试连接,redirections -1

// retry with random connection

return runWithRetries(key, redirections – 1, true, asking);

} catch (JedisRedirectionException jre) {

// if MOVED redirection occurred,

if (jre instanceof JedisMovedDataException) {

// it rebuilds cluster’s slot cache

// recommended by Redis cluster specification

this.connectionHandler.renewSlotCache(connection);

}

// release current connection before recursion or renewing

releaseConnection(connection);

connection = null;

if (jre instanceof JedisAskDataException) {

asking = true;

askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));

} else if (jre instanceof JedisMovedDataException) {

} else {

throw new JedisClusterException(jre);

}

return runWithRetries(key, redirections – 1, false, asking);

} finally {

releaseConnection(connection);

}

}

JedisClusterConnectionHandler:连接持有者,实际上Handler内部维护了一个JedisClusterInfoCache ,也就是节点和槽信息映射,通过这些信息来获取连接池,换句话说,内置了所有节点的连接池

JedisClusterInfoCache .java

//集群节点信息转换器

public static final ClusterNodeInformationParser nodeInfoParser = new ClusterNodeInformationParser();

//节点–连接池映射 每个节点都分配了一个连接池

private Map nodes = new HashMap();

//槽–连接池映射 每个槽也分配了一个连接池

private Map slots = new HashMap();

//通过读写锁来分离对两个映射Map的访问,保证了集群信息的正确性

private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

private final Lock r = rwl.readLock();

private final Lock w = rwl.writeLock();

自己动手写一个客户端连接工具测试一下,有个小bug,使用jedis2.8的时候会报host转换的异常,所以使用了2.9:

public final class ClusterUtil {

private ClusterUtil() {

}

public static JedisCluster getJedisCluster() {

return RedisClusterPoolHolder.getInstance();

}

private static final class RedisClusterPoolHolder {

//使用pool单例

private static final ClusterPool CLUSTER_POOL = new ClusterPool();

private RedisClusterPoolHolder() {

}

private static JedisCluster getInstance() {

return CLUSTER_POOL.getJedisCluster();

}

}

private static class ClusterPool {

/**

* redis-Cluster节点地址

*/

private static final HostAndPort CLUSTER_NODE_1 = new HostAndPort(“120..151.31”, 6379);

private static final HostAndPort CLUSTER_NODE_2 = new HostAndPort(“120..151.31”, 6380);

private static final HostAndPort CLUSTER_NODE_3 = new HostAndPort(“120..151.31”, 6381);

private static final HostAndPort CLUSTER_NODE_4 = new HostAndPort(“122..201.233”, 6379);

private static final HostAndPort CLUSTER_NODE_5 = new HostAndPort(“122..233”, 6380);

private static final HostAndPort CLUSTER_NODE_6 = new HostAndPort(“122..201.233”, 6381);

//Cluster节点地址集合

private static final Set NODES = new HashSet() {

{

add(CLUSTER_NODE_1);

add(CLUSTER_NODE_2);

add(CLUSTER_NODE_3);

add(CLUSTER_NODE_4);

add(CLUSTER_NODE_5);

add(CLUSTER_NODE_6);

}

};

/**

* 访问密码

*/

private static final String AUTH = “woshishei”;

private static final String HOST = “120.79.151.31”;

/**

* 可用连接实例的最大数目,默认值为8; 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。

*/

private static final int MAX_ACTIVE = 1024;

/**

* 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。

*/

private static final int MAX_IDLE = 200;

/**

* 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;

*/

private static final int MAX_WAIT = 10000;

private static final int TIMEOUT = 10000;

/**

* 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;

*/

private static final boolean TEST_ON_BORROW = true;

/**

* JedisCluster

*/

private static JedisCluster JEDIS_CLUSTER = null;

ClusterPool() {

/**

* 初始化Redis-Cluster连接池.

*/

try {

// maxActive ==> maxTotal

// maxWait ==> maxWaitMillisl

/*

* 配置JedisPool*/

JedisPoolConfig CONFIG = new JedisPoolConfig();

CONFIG.setMaxTotal(MAX_ACTIVE);

CONFIG.setMaxIdle(MAX_IDLE);

CONFIG.setMaxWaitMillis(MAX_WAIT);

CONFIG.setTestOnBorrow(TEST_ON_BORROW);

JEDIS_CLUSTER = new JedisCluster(NODES, TIMEOUT, CONFIG);

} catch (Exception e) {

e.getMessage();

e.printStackTrace();

}

}

private JedisCluster getJedisCluster() {

return JEDIS_CLUSTER;

}

}

}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/183350.html原文链接:https://javaforall.cn

0 人点赞