JedisCluster 通过 Pipeline 实现两套数据轮换更新

2024-01-19 11:22:21 浏览数 (1)

前言

本文实现了通过定时任务来调用接口,使两套数据轮换更新。

因为要区分两套数据,所以 key 要设置前缀。

例如:一天数据一换,今天查的 A 开头的 key ,明天查 B 开头的 key ,后天又查 A 开头的 key 。今天查完后,明天更新 B 开头的 key ,但是 A 开头的 key 暂时不动,后天再查的时候,A开头的 key 要进行更新,先删再更新。


一、整体流程

1.1 大致流程

  1. 从数据库里查数据。
  2. 更新当前前缀。
  3. 往redis集群更新数据。

1.2 流程代码解释

代码语言:javascript复制
    @Override
    public R<String> updateCampToJedis() {
        R<String> r = new R<>();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM");
        String currentMonth = dateFormat.format(new Date());
        //1. 从数据库里查数据
        List<UserWideInfo> UserWideInfoList = UserWideInfoMapper.selectFromTable(currentMonth);
        if (UserWideInfoList.size() == 0) {
            r.setCode(R.ERROR_CODE);
            r.setMsg("没有数据");
            return r;
        }
        //2. 更新当前前缀
        updateCurrentPrefixIndex();
        r.setCode(R.SUCCESS_CODE);
        //3. 往redis集群存入数据
        insertToJedis(ZhmsUserWideInfoList);
        return r;
    }

二、从数据库里查数据

2.1 SQL语句

这里因为每个月查询的是不同月份的表,所有用到动态 sql 。

代码语言:javascript复制
    <select  id="selectFromTable" resultType="com.hopedata.zhmscloud.camp.entity.po.ZhmsUserWideInfo">
        SELECT * FROM USER_WIDE_INFO_M_${SysMonth}
    </select>

三、更新当前前缀

要做到更新当前前缀,需要有两套前缀不同的 key ,还需要一个能区分前缀的前缀索引 currentPrefixIndex 。

3.1 设置前缀常量

用 A 和 B 来区分两组 key 。

代码如下:

代码语言:javascript复制
    private static final String PREFIX_A = "A";
    private static final String PREFIX_B = "B";

3.2 初始化 currentPrefixIndex

向 redis集群中存入初始的 currentPrefixIndex 。

代码如下:

代码语言:javascript复制
    @GetMapping("/init")
    public String init() {
        return jedisCluster.set("currentPrefixIndex", "0");
    }

3.3 获取当日前缀

先取出当日的前缀索引 currentPrefixIndex ,与 2 取余数 ,来获取当日的前缀。

代码如下:

代码语言:javascript复制
     //获取当日前缀
    private String getKeyPrefix() {
        int currentPrefixIndex = Integer.parseInt(jedisCluster.get("currentPrefixIndex"));
        if (currentPrefixIndex % 2 == 0) {
            return PREFIX_A;
        } else {
            return PREFIX_B;
        }
    }

3.4 更新 currentPrefixIndex

每天需要更新前缀索引 currentPrefixIndex ,让 currentPrefixIndex 1 , 使区分读的数据。

代码如下:

代码语言:javascript复制
    // 重新设置currentPrefixIndex
    private void updateCurrentPrefixIndex() {
        String currentValue = jedisCluster.get("currentPrefixIndex");
        int newValue = Integer.parseInt(currentValue)   1;
        jedisCluster.set("currentPrefixIndex", String.valueOf(newValue));
    }

四、往redis集群更新数据

这其实是最重要的一步,因为同时存入大量的数据,所以要使用到 Pipeline 来实现。

4.1 大致流程

  1. 获取到当前前缀,查出相关的 key ,更新数据之前把旧数据删除。
  2. 把新数据解析后更新到 redis 集群。

注意:因为数据量大,为了减少网络性能消耗,删除和更新都要用 Pipeline 来操作。

代码如下:

代码语言:javascript复制
    private void insertToJedis(List<UserWideInfo> UserWideInfoList) {
        String keyPrefix = getKeyPrefix();
        List<String> keys = new ArrayList<>();
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        for (JedisPool node : clusterNodes.values()) {
            try (Jedis jedis = node.getResource()) {
                Set<String> nodeKeys = jedis.keys(keyPrefix   "*");
                keys.addAll(nodeKeys);
            }
        }
        Map<JedisPool, List<String>> delKey = assignKey(keys, jedisCluster);
        //先删旧的
        for (JedisPool jedisPool : delKey.keySet()) {
            try (Jedis jedis = jedisPool.getResource()){
                Pipeline pipelined = jedis.pipelined();
                List<String> keysList = delKey.get(jedisPool);
                for (String key : keysList) {
                    pipelined.del(key);
                }
                pipelined.sync();
            }
        }
        List<String> keyList =new ArrayList<>();
        HashMap<String, String> map = new HashMap<>();
        //填充keyList和value
        for (UserWideInfo UserWideInfo : UserWideInfoList) {
            String key = keyPrefix   "_"   UserWideInfo.getBillNo();
            keyList.add(key);
            //构建value
            ...
            ...
            map.put(key, value);
        }
        Map<JedisPool, List<String>> result = assignKey(keyList, jedisCluster);
        for (JedisPool jedisPool : result.keySet()) {
            try (Jedis jedis = jedisPool.getResource()){
                Pipeline pipelined = jedis.pipelined();
                // 获取当前JedisPool对应的键列表
                List<String> keysList = result.get(jedisPool); 
                // 将命令添加到Pipeline中
                for (String key : keysList) {
                    String value = map.get(key);
                    pipelined.set(key, value);
                }
                // 执行Pipeline中的所有命令
                pipelined.sync();
            }
        }
    }

五、JedisCluster 实现 Pipeline 操作

5.1 实现过程

因为 JedisCluster 不支持 Pipeline 操作,所以需要自己来实现。

代码如下:

代码语言:javascript复制
@Slf4j
public class JedisPipelineUtil {

    /**
     * jedis集群下使用pipeline之前先将key分配管道
     * Map<String, List<String>> 键值为节点ip和端口号 192.168.1.1:6397 value为redis存入的key
     *
     * @param list         存redis的key
     * @param jedisCluster
     * @return
     */
    public static Map<String, List<String>> assignSlot(List<String> list, JedisCluster jedisCluster) {
        Map<String, List<String>> hostPhoneMap = new HashMap<>();
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        Map.Entry<String, JedisPool> next = clusterNodes.entrySet().iterator().next();
        JedisPool jedisPool = next.getValue();
        Jedis jedis = jedisPool.getResource();
        Map<Integer, String> slots = discoverClusterSlots(jedis);
        for (String s : list) {
            String hostAndPort = slots.get(JedisClusterCRC16.getSlot(s));
            if (hostPhoneMap.containsKey(hostAndPort)) {
                hostPhoneMap.get(hostAndPort).add(s);
            } else {
                List<String> newList = new ArrayList<>();
                newList.add(s);
                hostPhoneMap.put(hostAndPort, newList);
            }
        }
        jedis.close();
        return hostPhoneMap;
    }

    /**
     * jedis集群下使用pipeline之前先将key分配管道
     * Map<JedisPool, List<String>> 键值为节JedisPool value为redis存入的key
     *
     * @param list         存redis的key
     * @param jedisCluster
     * @return
     */
    public static Map<JedisPool, List<String>> assignKey(List<String> list, JedisCluster jedisCluster) {
        Map<JedisPool, List<String>> map = new HashMap<>();
        Map<String, List<String>> var1 = assignSlot(list, jedisCluster);
        Iterator<Map.Entry<String, List<String>>> iterator = var1.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, List<String>> next = iterator.next();
            JedisPool jedisPool = jedisCluster.getClusterNodes().get(next.getKey());
            map.put(jedisPool, next.getValue());
        }
        return map;

    }

    private static Map<Integer, String> discoverClusterSlots(Jedis jedis) {
        Map<Integer, String> slotsMap = new HashMap<>();
        List<Object> slots = jedis.clusterSlots();
        Iterator var3 = slots.iterator();
        while (var3.hasNext()) {
            Object slotInfoObj = var3.next();
            List<Object> slotInfo = (List) slotInfoObj;
            if (slotInfo.size() > 2) {
                List<Integer> slotNums = getAssignedSlotArray(slotInfo);
                List<Object> hostInfos = (List) slotInfo.get(2);
                if (!hostInfos.isEmpty()) {
                    String targetNode = generateHostAndPort(hostInfos);
                    Iterator<Integer> var4 = slotNums.iterator();
                    while (var4.hasNext()) {
                        Integer slot = var4.next();
                        slotsMap.put(slot, targetNode);
                    }
                }
            }
        }
        return slotsMap;
    }

    private static List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
        List<Integer> slotNums = new ArrayList<>();

        for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue();   slot) {
            slotNums.add(slot);
        }

        return slotNums;
    }

    private static String generateHostAndPort(List<Object> hostInfos) {
        String host = SafeEncoder.encode((byte[]) hostInfos.get(0));
        int port = ((Long) hostInfos.get(1)).intValue();
        return host   ":"   port;
    }
}

使用 assignKey 方法就可以分配管道。

0 人点赞