前言
本文实现了通过定时任务来调用接口,使两套数据轮换更新。
因为要区分两套数据,所以 key 要设置前缀。
例如:一天数据一换,今天查的 A 开头的 key ,明天查 B 开头的 key ,后天又查 A 开头的 key 。今天查完后,明天更新 B 开头的 key ,但是 A 开头的 key 暂时不动,后天再查的时候,A开头的 key 要进行更新,先删再更新。
一、整体流程
1.1 大致流程
- 从数据库里查数据。
- 更新当前前缀。
- 往redis集群更新数据。
1.2 流程代码解释
代码语言:javascript复制 @Override
public R<String> updateCampToJedis() {
R<String> r = new R<>();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM");
String currentMonth = dateFormat.format(new Date());
//1. 从数据库里查数据
List<UserWideInfo> UserWideInfoList = UserWideInfoMapper.selectFromTable(currentMonth);
if (UserWideInfoList.size() == 0) {
r.setCode(R.ERROR_CODE);
r.setMsg("没有数据");
return r;
}
//2. 更新当前前缀
updateCurrentPrefixIndex();
r.setCode(R.SUCCESS_CODE);
//3. 往redis集群存入数据
insertToJedis(ZhmsUserWideInfoList);
return r;
}
二、从数据库里查数据
2.1 SQL语句
这里因为每个月查询的是不同月份的表,所有用到动态 sql 。
代码语言:javascript复制 <select id="selectFromTable" resultType="com.hopedata.zhmscloud.camp.entity.po.ZhmsUserWideInfo">
SELECT * FROM USER_WIDE_INFO_M_${SysMonth}
</select>
三、更新当前前缀
要做到更新当前前缀,需要有两套前缀不同的 key ,还需要一个能区分前缀的前缀索引 currentPrefixIndex 。
3.1 设置前缀常量
用 A 和 B 来区分两组 key 。
代码如下:
代码语言:javascript复制 private static final String PREFIX_A = "A";
private static final String PREFIX_B = "B";
3.2 初始化 currentPrefixIndex
向 redis集群中存入初始的 currentPrefixIndex 。
代码如下:
代码语言:javascript复制 @GetMapping("/init")
public String init() {
return jedisCluster.set("currentPrefixIndex", "0");
}
3.3 获取当日前缀
先取出当日的前缀索引 currentPrefixIndex ,与 2 取余数 ,来获取当日的前缀。
代码如下:
代码语言:javascript复制 //获取当日前缀
private String getKeyPrefix() {
int currentPrefixIndex = Integer.parseInt(jedisCluster.get("currentPrefixIndex"));
if (currentPrefixIndex % 2 == 0) {
return PREFIX_A;
} else {
return PREFIX_B;
}
}
3.4 更新 currentPrefixIndex
每天需要更新前缀索引 currentPrefixIndex ,让 currentPrefixIndex 1 , 使区分读的数据。
代码如下:
代码语言:javascript复制 // 重新设置currentPrefixIndex
private void updateCurrentPrefixIndex() {
String currentValue = jedisCluster.get("currentPrefixIndex");
int newValue = Integer.parseInt(currentValue) 1;
jedisCluster.set("currentPrefixIndex", String.valueOf(newValue));
}
四、往redis集群更新数据
这其实是最重要的一步,因为同时存入大量的数据,所以要使用到 Pipeline 来实现。
4.1 大致流程
- 获取到当前前缀,查出相关的 key ,更新数据之前把旧数据删除。
- 把新数据解析后更新到 redis 集群。
注意:因为数据量大,为了减少网络性能消耗,删除和更新都要用 Pipeline 来操作。
代码如下:
代码语言:javascript复制 private void insertToJedis(List<UserWideInfo> UserWideInfoList) {
String keyPrefix = getKeyPrefix();
List<String> keys = new ArrayList<>();
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
for (JedisPool node : clusterNodes.values()) {
try (Jedis jedis = node.getResource()) {
Set<String> nodeKeys = jedis.keys(keyPrefix "*");
keys.addAll(nodeKeys);
}
}
Map<JedisPool, List<String>> delKey = assignKey(keys, jedisCluster);
//先删旧的
for (JedisPool jedisPool : delKey.keySet()) {
try (Jedis jedis = jedisPool.getResource()){
Pipeline pipelined = jedis.pipelined();
List<String> keysList = delKey.get(jedisPool);
for (String key : keysList) {
pipelined.del(key);
}
pipelined.sync();
}
}
List<String> keyList =new ArrayList<>();
HashMap<String, String> map = new HashMap<>();
//填充keyList和value
for (UserWideInfo UserWideInfo : UserWideInfoList) {
String key = keyPrefix "_" UserWideInfo.getBillNo();
keyList.add(key);
//构建value
...
...
map.put(key, value);
}
Map<JedisPool, List<String>> result = assignKey(keyList, jedisCluster);
for (JedisPool jedisPool : result.keySet()) {
try (Jedis jedis = jedisPool.getResource()){
Pipeline pipelined = jedis.pipelined();
// 获取当前JedisPool对应的键列表
List<String> keysList = result.get(jedisPool);
// 将命令添加到Pipeline中
for (String key : keysList) {
String value = map.get(key);
pipelined.set(key, value);
}
// 执行Pipeline中的所有命令
pipelined.sync();
}
}
}
五、JedisCluster 实现 Pipeline 操作
5.1 实现过程
因为 JedisCluster 不支持 Pipeline 操作,所以需要自己来实现。
代码如下:
代码语言:javascript复制@Slf4j
public class JedisPipelineUtil {
/**
* jedis集群下使用pipeline之前先将key分配管道
* Map<String, List<String>> 键值为节点ip和端口号 192.168.1.1:6397 value为redis存入的key
*
* @param list 存redis的key
* @param jedisCluster
* @return
*/
public static Map<String, List<String>> assignSlot(List<String> list, JedisCluster jedisCluster) {
Map<String, List<String>> hostPhoneMap = new HashMap<>();
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
Map.Entry<String, JedisPool> next = clusterNodes.entrySet().iterator().next();
JedisPool jedisPool = next.getValue();
Jedis jedis = jedisPool.getResource();
Map<Integer, String> slots = discoverClusterSlots(jedis);
for (String s : list) {
String hostAndPort = slots.get(JedisClusterCRC16.getSlot(s));
if (hostPhoneMap.containsKey(hostAndPort)) {
hostPhoneMap.get(hostAndPort).add(s);
} else {
List<String> newList = new ArrayList<>();
newList.add(s);
hostPhoneMap.put(hostAndPort, newList);
}
}
jedis.close();
return hostPhoneMap;
}
/**
* jedis集群下使用pipeline之前先将key分配管道
* Map<JedisPool, List<String>> 键值为节JedisPool value为redis存入的key
*
* @param list 存redis的key
* @param jedisCluster
* @return
*/
public static Map<JedisPool, List<String>> assignKey(List<String> list, JedisCluster jedisCluster) {
Map<JedisPool, List<String>> map = new HashMap<>();
Map<String, List<String>> var1 = assignSlot(list, jedisCluster);
Iterator<Map.Entry<String, List<String>>> iterator = var1.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, List<String>> next = iterator.next();
JedisPool jedisPool = jedisCluster.getClusterNodes().get(next.getKey());
map.put(jedisPool, next.getValue());
}
return map;
}
private static Map<Integer, String> discoverClusterSlots(Jedis jedis) {
Map<Integer, String> slotsMap = new HashMap<>();
List<Object> slots = jedis.clusterSlots();
Iterator var3 = slots.iterator();
while (var3.hasNext()) {
Object slotInfoObj = var3.next();
List<Object> slotInfo = (List) slotInfoObj;
if (slotInfo.size() > 2) {
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
List<Object> hostInfos = (List) slotInfo.get(2);
if (!hostInfos.isEmpty()) {
String targetNode = generateHostAndPort(hostInfos);
Iterator<Integer> var4 = slotNums.iterator();
while (var4.hasNext()) {
Integer slot = var4.next();
slotsMap.put(slot, targetNode);
}
}
}
}
return slotsMap;
}
private static List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
List<Integer> slotNums = new ArrayList<>();
for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); slot) {
slotNums.add(slot);
}
return slotNums;
}
private static String generateHostAndPort(List<Object> hostInfos) {
String host = SafeEncoder.encode((byte[]) hostInfos.get(0));
int port = ((Long) hostInfos.get(1)).intValue();
return host ":" port;
}
}
使用 assignKey 方法就可以分配管道。