延迟消息处理

2021-12-28 12:37:48 浏览数 (1)

之前有这样一个需求,运营在后端配置一条系统消息或者营销活动等类型的消息等到了需要推送的时间以后会自动的将消息推送给用户APP端显示,一开始是采用的任务调度的方式(定时器),通过轮询扫表去做,因为具体什么时候推送消息没有固定的频率,固定的时间,因此需要每分钟扫表以避免消息在指定时间内未及时推送给APP端内.所以每次都是1分钟扫描一次,太过于频繁。所以不太适合(定时器适合那种固定频率或时间段处理)。

因此这里选取了几种延迟发送的方式:

1.rabbitMQ

2.redis

3.DelayedQueue(慎用)

代码部分(发送端):

代码语言:javascript复制
/**
 * 提供了一个公有的方法
 */
public interface ISysMessageDelayProcessor {
    long FIVE_MINUTES = 5 * 60 * 1000;
    /**
     * 发送消息的处理
     * @param msg<按需自行封装处理>
     * @param pushDate<推送时间>
     */
    void sendMessage(Object msg, LocalDateTime pushDate);

}
代码语言:javascript复制
/**
 * 基于RabbitMQ的实现方式(需要下载rabbitMQ插件)
 * 
 * 
 */
@Slf4j
@EnableBinding(SysMessageSink.class)
public class SysMessageRabbitMQDelayProcessorImpl implements ISysMessageDelayProcessor {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @Override
    public void sendMessage(Object msg LocalDateTime pushDate) {
        resolver.resolveDestination(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC_PRODUCER)
                .send(MessageBuilder.withPayload(msg)
                        .setHeader("x-delay",
                                Duration.between(LocalDateTime.now(), pushDate)
                                        .toMillis())
                        .build());
    }

}
代码语言:javascript复制
#配置系统消息的延迟发送
spring.cloud.stream.bindings.your-topic-producer.destination=your-topic
spring.cloud.stream.rabbit.bindings.your-topic-producer.producer.delayed-exchange=true
spring.cloud.stream.bindings.your-topic-consumer.destination=your-topic
spring.cloud.stream.rabbit.bindings.your-topic-consumer.consumer.delayed-exchange=true
spring.cloud.stream.bindings.your-topic-consumer.group=your-topic-group
代码语言:javascript复制
/**
 * 
 * 基于redis的实现
 * 
 */
public class SysMessageRedisDelayProcessorImpl implements ISysMessageDelayProcessor {
    @Autowired
    private RedisTemplate redisTemplate;


    @Override
    public void sendMessage(Object msg, LocalDateTime pushDate) {
        redisTemplate.opsForZSet().add(MQTopicConstant.SYS_MESSAGE_QUERY_DELAY_TOPIC,msg,
                  pushDate.toInstant(ZoneOffset.of(" 8")).toEpochMilli());
    }


}
代码语言:javascript复制
/**
 * 是一种补备用方案,当不满足redis,rabbitMQ的场景的时候使用
 * 是一种基于内存的方式,一旦宕机,或者重启那么内存中的数据就会丢失
 * 慎用!
 */
public class SysMessageDelayedQueueProcessorImpl implements ISysMessageDelayProcessor, Delayed {
    private LocalDateTime executeTime;
    private Object data;
    // send queue
    private static final DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue =
            new DelayQueue<>();
    // query queue
    private static final DelayQueue<SysMessageDelayedQueueProcessorImpl> queryDelayQueue =
            new DelayQueue<>();

    public SysMessageDelayedQueueProcessorImpl() {
        new Thread(new SysMessageDelayedQueueProcessorListener(sendDelayQueue, queryDelayQueue)).start();
    }

    public SysMessageDelayedQueueProcessorImpl(LocalDateTime executeTime, Object data) {
        this.executeTime = executeTime;
        this.data = data;
    }


    @Override
    public void sendMessage(Object msg, LocalDateTime pushDate) {
        sendDelayQueue.offer(new SysMessageDelayedQueueProcessorImpl(pushDate, msg));
    }


    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(Duration.between(LocalDateTime.now(), executeTime).toMillis(),
                TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

接收端:

代码语言:javascript复制
/**
 * 监听
 */
public abstract class ISysMessageDelayedListener implements Runnable {

    protected static final LinkedBlockingQueue<SysMessageVO> SEND_QUEUE =
            new LinkedBlockingQueue(1000);

    @Override
    public void run() {
        sendProcessor();
    }

    /**
     * 监听发送方法
     */
    public abstract void sendProcessor();
}
代码语言:javascript复制
/**
 * 只用来监听MQ延迟队列推送过来的数据包,及转发数据包
 * 不做其他业务处理
 *
 */
@Component
@EnableBinding(SysMessageSink.class)
@Slf4j
public class SysMessageRabbitMQDelayedProcessorListener extends ISysMessageDelayedListener {
    @Autowired
    private SysMessageQueryProcessor sysMessageQueryProcessor;

    private static final LinkedBlockingQueue<SysMessageVO> SEND_QUEUE =
            new LinkedBlockingQueue(1000);


    /**
     * 接受发送的数据
     *
     * @param 
     */
    @StreamListener(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC_CONSUMER)
    public void onSendHandle(Object msg) {
       
        try {
            // put
            SEND_QUEUE.put(sysMessage);
        } catch (InterruptedException e) {
            log.error("caught onSendHandle invoke fail,e:", e);
        }

    }

    @Override
    public void sendProcessor() {

    }
}
代码语言:javascript复制
/**
 * redis监听处理
 */
@Slf4j
public class SysMessageRedisDelayedProcessorListener extends ISysMessageDelayedListener {
    private static final String setNX = "lock:sysmessage:delay";
    public static final int LOCK_EXPIRE = 300; // ms

    @Autowired
    private RedisTemplate redisTemplate;

    public SysMessageRedisDelayedProcessorListener() {
        new Thread(new SysMessagePushWork(SEND_QUEUE)).start();
    }

    /**
     * 监听是否有到期的数据
     */
    private void monitorSendQueue() {
        while (true) {
            if (lock()) {
                Set<ZSetOperations.TypedTuple<Object>> set =
                        redisTemplate.opsForZSet().rangeWithScores(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC, 0, 0);
                Iterator<ZSetOperations.TypedTuple<Object>> iterator = set.iterator();
                while (iterator.hasNext()) {
                    ZSetOperations.TypedTuple<Object> next = iterator.next();
                    consumer(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC, next);
                }
            }
        }
    }

    /**
     * 获取分布式琐
     *
     * @return
     */
    private boolean lock() {
        try {
            long expireAt = System.currentTimeMillis()   LOCK_EXPIRE   1;
            return (Boolean) this.redisTemplate.execute((RedisCallback) connection -> {
                Boolean acquire = connection.setNX(setNX.getBytes(), String.valueOf(expireAt).getBytes());
                if (acquire) {
                    return true;
                }
                byte[] value = connection.get(setNX.getBytes());
                if (Objects.isNull(value) || value.length <= 0) {
                    return false;
                }
                long expireTime = Long.parseLong(new String(value));
                if (expireTime < System.currentTimeMillis()) {
                    // 如果锁已经过期
                    byte[] oldValue = connection.getSet(setNX.getBytes(),
                            String.valueOf(System.currentTimeMillis()   LOCK_EXPIRE   1).getBytes());
                    // 防止死锁
                    return Long.parseLong(new String(oldValue)) < System.currentTimeMillis();
                }
                return true;
            });
        } catch (Exception e) {
            log.error("obtain lock option fail, caught exception:", e);
            return false;
        }
    }

    /**
     * 删除过期的数据
     *
     * @param value
     */
    private void removeDataByExpireTime(String key, Object value) {
        redisTemplate.opsForZSet().remove(key, value);
    }

    /**
     * 消费
     *
     * @param next
     */
    private void consumer(String key, ZSetOperations.TypedTuple<Object> next) {
        // processor and remove
        if (!ifExpire(next.getScore())) {
            return;
        }
       if (MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC.equals(key)) {
            // in queue
            SEND_QUEUE.offer(sysMessage);
        }
        // remove
        removeDataByExpireTime(key, next.getValue());


    }

    /**
     * 过期判断
     *
     * @param expireTime
     * @return
     */
    private boolean ifExpire(Double expireTime) {
        return (expireTime.longValue()   1000) <= LocalDateTime.now().toInstant(ZoneOffset.of(" 8")).toEpochMilli();
    }

    @Override
    public void sendProcessor() {
        // 监听发送队列的变化
        monitorSendQueue();
    }
}
代码语言:javascript复制
/**
 *
 */
@Slf4j
public class SysMessageDelayedQueueProcessorListener extends ISysMessageDelayedListener {
    private DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue;

    public SysMessageDelayedQueueProcessorListener(DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue) {
        this.sendDelayQueue = sendDelayQueue;
        new Thread(new SysMessagePushWork(SEND_QUEUE)).start();
    }

  

    @Override
    public void sendProcessor() {
        CompletableFuture.runAsync(() -> {
            while (true) {
                try {
                    // processor
                    SysMessageDelayedQueueProcessorImpl queue = sendDelayQueue.take();
                    if (Objects.isNull(queue)) {
                        continue;
                    }
                   
                    // execute
                    SEND_QUEUE.offer(queue.getData());
                } catch (InterruptedException e) {
                    // 
                }
            }
        });

    }
}
代码语言:javascript复制
/**
 */
@Configuration
public class SysMessageConfiguration {

    /**
     * 基于rabbitMQ的延迟处理
     * @return
     */
    @Primary
    @ConditionalOnClass(name = "org.springframework.cloud.stream.binding.BinderAwareChannelResolver")
    @Bean
    public SysMessageRabbitMQDelayProcessorImpl createSysMessageRabbitMQDelayProcessor() {

        return new SysMessageRabbitMQDelayProcessorImpl();
    }

//    /**
//     * 基于redis的延迟处理
//     * @return
//     */
//    @ConditionalOnClass(RedisTemplate.class)
//    @ConditionalOnMissingClass("org.springframework.cloud.stream.binding.BinderAwareChannelResolver")
//    @Bean
//    public SysMessageRedisDelayProcessorImpl createSysMessageRedisDelayProcessor() {
//        return new SysMessageRedisDelayProcessorImpl();
//    }

    /**
     * 基于内存的延迟处理
     * @return
     */
    @ConditionalOnMissingClass({"org.springframework.cloud.stream.binding.BinderAwareChannelResolver",
            "org.springframework.data.redis.core.RedisTemplate"})
    @Bean
    public SysMessageDelayedQueueProcessorImpl createSysMessageDelayedQueueProcessor() {
        return new SysMessageDelayedQueueProcessorImpl();
    }
}
代码语言:javascript复制
   private ISysMessageDelayProcessor sysMessageDelayProcessor;

    @Autowired
    public xxxx(ISysMessageDelayProcessor sysMessageDelayProcessor) {
        this.sysMessageDelayProcessor = sysMessageDelayProcessor;
    }

其他部分业务代码按需处理即可

0 人点赞