延迟任务多种实现姿势--下

2022-09-29 13:11:38 浏览数 (1)

延迟任务多种实现姿势--下

  • 基于Mq实现的延迟任务
    • 编码实现
    • 1.0版本缺陷
    • 延迟交换机进行优化
  • 小结

本文给出的只是核心代码,完整源码请fork源码仓库查看:

https://gitee.com/DaHuYuXiXi/deley-task

如果对mq不太了解的,建议先看一下我在源码仓库中提供的Rabbitmq小书:


基于Mq实现的延迟任务

基于mq来实现延迟任务方案,相信各位很容易就可以想到mq中的延迟队列,延时队列就是用来存放需要在指定时间被处理的元素的队列。

基于mq延迟队列实现的大体思路如下:

  • 消费者投放订单延迟任务到订单交换机中
  • 订单交换机将消息投递到订单队列中
  • 订单队列等待消息过期后,将订单交给订单死信交换机处理
  • 订单死信交换机将消息投递到订单死信队列中
  • 处理到期的订单任务

编码实现

  • 定义好相关常量
代码语言:javascript复制
package com.delayTask.rabbitmq;

/**
 * @author zdh
 */
public class RabbitmqConstants {

    //--------------------EXCHANGE--------------------------

    public static final String ORDER_EXCHANGE = "orderExchange";
    public static final String ORDER_DEAD_EXCHANGE = "orderDeadExchange";

    //--------------------QUEUE--------------------------

    public static final String ORDER_QUEUE = "orderQueue";
    public static final String ORDER_DEAD_QUEUE = "orderDeadQueue";

    //------------------ROUTE_KEY-----------------------------

    public static final String ORDER_ROUTE_KEY="orderKey";
    public static final String ORDER_DEAD_ROUTE_KEY="orderDeadKey";
}
  • mq延迟任务实现
代码语言:javascript复制
package com.delayTask.rabbitmq;

import com.delayTask.DelayTaskEvent;
import com.delayTask.DelayTaskQueue;
import com.delayTask.delayQueue.OrderDelayEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;

import static com.delayTask.rabbitmq.RabbitmqConstants.*;


/**
 * @author zdh
 */
@Data
@Builder
@Slf4j
@Component
@RequiredArgsConstructor
public class MqDelayQueue implements DelayTaskQueue<OrderDelayEvent,OrderDelayEvent> {
    private final RabbitTemplate rabbitTemplate;
    private final ObjectMapper objectMapper=new ObjectMapper();
    /**
     * 存放被取消的延迟任务集合
     */
    private final Set<Long> cancelDelayTask=new ConcurrentSkipListSet<>();

    /**
     * <p>
     * 生成一个延迟任务加入延迟队列中去
     * </p>
     *
     * @param delayTaskEvent
     * @return 可以定位此次延迟任务的标记
     */
    @Override
    public OrderDelayEvent produce(DelayTaskEvent delayTaskEvent) {
        try {
            rabbitTemplate.convertAndSend(ORDER_EXCHANGE,ORDER_ROUTE_KEY,objectMapper.writeValueAsString(delayTaskEvent),msg-> {
                msg.getMessageProperties().setExpiration(String.valueOf(delayTaskEvent.getDelay(TimeUnit.MILLISECONDS)));
                return msg;
            });
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
         return (OrderDelayEvent) delayTaskEvent;
    }

    /**
     * 处理到期的延迟任务
     *
     * @param taskId
     */
    @Override
    public void consume(OrderDelayEvent taskId) {}

    @RabbitListener(queues = ORDER_DEAD_QUEUE)
    public void consume(Message message, Channel channel) throws Exception {
        OrderDelayEvent orderDelayEvent = objectMapper.readValue(new String(message.getBody()), OrderDelayEvent.class);
        log.info("消息队列中接收到一条消息: {}",orderDelayEvent);
        //被取消的延迟任务,不再进行处理
        if(cancelDelayTask.contains(orderDelayEvent.getId())){
            cancelDelayTask.remove(orderDelayEvent.getId());
            log.info("当前任务已被客户提交: {}",orderDelayEvent);
            return;
        }
        orderDelayEvent.handleDelayEvent();
    }

    /**
     * <p>
     * 取消taskId对应的延迟任务
     * </p>
     *
     * @param taskId 延迟任务标记
     */
    @Override
    public void cancel(OrderDelayEvent taskId) {
         cancelDelayTask.add(taskId.getId());
         taskId.getOrder().submitOrder();
    }
}
  • 测试
代码语言:javascript复制
package com.dhy.mq;

import com.delayTask.DelayTaskMain;
import com.delayTask.delayQueue.OrderDelayEvent;
import com.delayTask.delayQueue.OrderDelayFactory;
import com.delayTask.rabbitmq.MqDelayQueue;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;


import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * @author 大忽悠
 * @create 2022/9/19 9:51
 */
@SpringBootTest(classes = DelayTaskMain.class)
public class MqDelayQueueTest {
    @Resource
    private MqDelayQueue mqDelayQueue;

    @Test
    public void testMqDelayQueue() throws InterruptedException {
        OrderDelayEvent orderDelay =  OrderDelayFactory.newOrderDelay("大忽悠", "小风扇", 13.4, 10L);
        OrderDelayEvent orderDelay1 = OrderDelayFactory.newOrderDelay("小朋友", "冰箱", 3000.0, 15L);
        mqDelayQueue.produce(orderDelay);
        mqDelayQueue.produce(orderDelay1);

        Thread.sleep(TimeUnit.SECONDS.toMillis(5));
        mqDelayQueue.cancel(orderDelay1);

        Thread.sleep(TimeUnit.SECONDS.toMillis(30L));
    }
}

1.0版本缺陷

目前看起来似乎没什么问题,但是如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。

为了解决这个问题,我们可以使用mq官方提供的插件来实现,该插件实现的思路是利用交换机来控制延迟消息何时推送给对应的队列。

延迟交换机: 通过给每个消息指定延迟发送时间,延迟交换机拿到这些消息后,不会立刻将其路由到某个队列,而是先保存起来,然后等待消息的延迟时间结束后,再将消息发送到指定的队列中去。

  • 安装插件
代码语言:javascript复制
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.0/rabbitmq_delayed_message_exchange-3.10.0.ez
#如果下载速度比较慢,大家可以尝试使用下面的镜像连接进行下载
wget http://110.40.155.17/download/rabbitmq_delayed_message_exchange-3.10.0.ez
#解压
tar -zxvf 3.10.0.tar.gz
  • 将插件拷贝到docker容器内rabbitmq安装目录下的 plgins 目录下
代码语言:javascript复制
          #宿主机文件                                  #容器ID(可缩写)或者容器名:容器内插件目录
docker cp rabbitmq_delayed_message_exchange-3.10.0.ez cf:/plugins
  • 执行下面命令让该插件生效,然后重启 RabbitMQ
代码语言:javascript复制
docker exec -it cf rabbitmq-plugins enable rabbitmq_delayed_message_exchange

  • 重启rabbitmq服务
代码语言:javascript复制
docker restart cf

延迟交换机进行优化

使用延迟交换机之后,我们整体的设计如下:

  • 配置
代码语言:javascript复制
package com.delayTask.rabbitmq;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

import static com.delayTask.rabbitmq.RabbitmqConstants.*;

/**
 * @author zdh
 */
@Configuration
public class RabbitConfigAdvanced {

   //--------------------EXCHANGE--------------------------

   @Bean(ORDER_DELAYED_EXCHANGE)
   public CustomExchange orderExchange()
   {
      Map<String, Object> arguments = new HashMap<>();
      //自定义交换机的类型
      arguments.put("x-delayed-type","direct");
      /*
       * 1.交换机的名称
       * 2.交换机的类型
       * 3.是否需要持久化
       * 4.是否需要自动删除
       * 5.其他的参数
       * */
      return new CustomExchange(ORDER_DELAYED_EXCHANGE,
              //延迟交换机类型
              "x-delayed-message",
              false,false,arguments);
   }

   //--------------------QUEUE--------------------------

   @Bean(ORDER_DELAYED_QUEUE)
   public Queue orderQueue(){
      return QueueBuilder.nonDurable(ORDER_DELAYED_QUEUE).build();
   }

   //--------------------bind--------------------------

   @Bean
   public Binding orderDelayedBinding(@Qualifier(ORDER_DELAYED_QUEUE) Queue delayedQueue,
                                 @Qualifier(ORDER_DELAYED_EXCHANGE) CustomExchange orderDelayedExchange){
      return BindingBuilder.bind(delayedQueue).to(orderDelayedExchange).with(ORDER_DELAYED_ROUTE_KEY).noargs();
   }

}
  • mq延迟队列
代码语言:javascript复制
package com.delayTask.rabbitmq;

import com.delayTask.DelayTaskEvent;
import com.delayTask.DelayTaskQueue;
import com.delayTask.delayQueue.OrderDelayEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.Builder;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;

import static com.delayTask.rabbitmq.RabbitmqConstants.*;


/**
 * @author zdh
 */
@Data
@Builder
@Slf4j
@Component
@RequiredArgsConstructor
public class MqDelayQueueAdvanced implements DelayTaskQueue<OrderDelayEvent,OrderDelayEvent> {
    private final RabbitTemplate rabbitTemplate;
    private final ObjectMapper objectMapper=new ObjectMapper();
    /**
     * 存放被取消的延迟任务集合
     */
    private final Set<Long> cancelDelayTask=new ConcurrentSkipListSet<>();

    /**
     * <p>
     * 生成一个延迟任务加入延迟队列中去
     * </p>
     *
     * @param delayTaskEvent
     * @return 可以定位此次延迟任务的标记
     */
    @Override
    public OrderDelayEvent produce(DelayTaskEvent delayTaskEvent) {
        try {
            rabbitTemplate.convertAndSend(ORDER_DELAYED_EXCHANGE,ORDER_DELAYED_ROUTE_KEY,objectMapper.writeValueAsString(delayTaskEvent),msg-> {
                // 发送消息的时候 延迟时长 单位ms
                msg.getMessageProperties().setDelay((int) delayTaskEvent.getDelay(TimeUnit.MILLISECONDS));
                return msg;
            });
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
         return (OrderDelayEvent) delayTaskEvent;
    }

    /**
     * 处理到期的延迟任务
     *
     * @param taskId
     */
    @Override
    public void consume(OrderDelayEvent taskId) {}

    @RabbitListener(queues = ORDER_DELAYED_QUEUE)
    public void consume(Message message) throws Exception {
        OrderDelayEvent orderDelayEvent = objectMapper.readValue(new String(message.getBody()), OrderDelayEvent.class);
        log.info("消息队列中接收到一条消息: {}",orderDelayEvent);
        //被取消的延迟任务,不再进行处理
        if(cancelDelayTask.contains(orderDelayEvent.getId())){
            cancelDelayTask.remove(orderDelayEvent.getId());
            log.info("当前任务已被客户提交: {}",orderDelayEvent);
            return;
        }
        orderDelayEvent.handleDelayEvent();
    }

    /**
     * <p>
     * 取消taskId对应的延迟任务
     * </p>
     *
     * @param taskId 延迟任务标记
     */
    @Override
    public void cancel(OrderDelayEvent taskId) {
         cancelDelayTask.add(taskId.getId());
         taskId.getOrder().submitOrder();
    }
}
  • 测试
代码语言:javascript复制
package com.dhy.mq;

import com.delayTask.DelayTaskMain;
import com.delayTask.delayQueue.OrderDelayEvent;
import com.delayTask.delayQueue.OrderDelayFactory;
import com.delayTask.rabbitmq.MqDelayQueueAdvanced;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * @author 大忽悠
 * @create 2022/9/19 10:34
 */
@SpringBootTest(classes = DelayTaskMain.class)
public class MqDelayQueueAdvancedTest {
    @Resource
    private MqDelayQueueAdvanced mqDelayQueueAdvanced;

    @Test
    public void test() throws InterruptedException {
        OrderDelayEvent orderDelay =  OrderDelayFactory.newOrderDelay("大忽悠", "小风扇", 13.4, 15L);
        OrderDelayEvent orderDelay1 = OrderDelayFactory.newOrderDelay("小朋友", "冰箱", 3000.0, 10L);
        mqDelayQueueAdvanced.produce(orderDelay);
        mqDelayQueueAdvanced.produce(orderDelay1);

        Thread.sleep(TimeUnit.SECONDS.toMillis(5));
        mqDelayQueueAdvanced.cancel(orderDelay1);

        Thread.sleep(TimeUnit.SECONDS.toMillis(30L));
    }
}

小结

本文只是简单的利用mq实现了一些延迟任务,代码细节上还存在诸多漏洞,希望能够给大家带来一些启发。

0 人点赞