redis zset 延迟合并任务处理
代码语言:javascript复制@Autowired
public RedisTemplate redisTemplate;
##1.发送端:在接口中收集任务ID,累计时间段之后,合并处理。
##redis zset 主键,任务ID(不重复),时间戳
String key = "任务分组名称";
//延时
redisTemplate.opsForZSet().add(key,taskId,System.currentTimeMillis());
System.out.println("加入任务, taskId: " taskId ", exeTime: " System.currentTimeMillis() ", 当前时间:" LocalDateTime.now() ",key = " key);
##2.接收端
@Configuration
public class ExecutorServiceConf {
@Bean
public ExecutorService executorService() {
return new ThreadPoolExecutor(
20,
40,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("myMergeTask-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
}
}
@Service
public class ConsumerTask {
@Autowired
public RedisTemplate redisTemplate;
@Autowired
ExecutorService executorService;
@Value("${delayTime:2}")
private Integer delayTime;
//服务启动的时候加载
@PostConstruct
public void consumer() {
//启动线程池
executorService.submit(new Runnable() {
@Override
public void run() {
while (true) {
String key = "任务分组名称";
//rangeByScore 主键,0,当前时间,0,偏移量1000
Set<Integer> taskIdSet = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis(), 0, 1000);
if (taskIdSet == null || taskIdSet.isEmpty()) {
logger.info("没有任务");
} else {
//1.查询任务ID关联的记录
//2.任务合并处理
//3.处理成功后,清理redis
taskIdSet.forEach(id -> {
//4.根据taskId更新任务状态
//5.根据taskId删除redis
long result = redisTemplate.opsForZSet().remove(key, id);
if (result == 1L) {
logger.info("从延时队列中获取到任务,taskId:" id ",当前时间:" LocalDateTime.now());
}
});
}
try {
//间隔2分钟时间
TimeUnit.MINUTES.sleep(delayTime);
} catch (InterruptedException e) {
logger.error("定时扫描exception:",e);
}
}
}
});
}
}