一次性下发100w的优惠券/短信/二维码,兼顾线程池参数可配置

2024-08-05 10:56:38 浏览数 (4)

1、场景需求分析

针对6.18,11.11这种场景,平台一次性发布500w张优惠券,或者对于锁单用户统一发下100w张确认信息,同时我们平时有抢购茅台的场景,京东一次性发布10w个验证码,主要是针对高并发多线程大数据批处理任务的场景,一般用于二维码、优惠券、邮件、短信等场景。

2、思路分析

3、线程池参数可配置的技术选型

1、JUC

java.util.concurrent.ThreadPoolExecutor 是 Java 中的一个线程池执行器,它允许你管理一组工作线程来执行异步任务。线程池是并发编程中的一个重要概念,它可以有效地管理线程资源,避免频繁创建和销毁线程所带来的开销。

具体参数详解,我之前有文章写到过:

  1. 详解 ThreadPoolExecutor 的参数含义及源码执行流程?
  2. 面试官:你用过线程池吗?在哪些场景下用到

2、Spring自带的线程池

Spring框架自带的线程池,注意和JUC里面原生的做对比

Spring:org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

ThreadPoolTaskExecutor是 Spring 提供的一个方便的线程池实现,用于异步执行任务或处理并发请求。在使用 ThreadPoolTaskExecutor作为 Spring Bean 注册到容器中后,Spring 会负责在应用程序关闭时自动关闭所有注册的线程池,所以不需要手动关闭。

这样不仅可以确保线程池中的线程正确地停止,还可以防止资源泄露和潜在的并发问题。

美团技术上有一篇还写的挺好的,大家也可参考学习一下:[Java线程池实现原理及其在美团业务中的实践](https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html)

4、案例代码

1、 线程池的配置类

1、编写config配置文件类

代码语言:javascript复制
@Data
@Configuration
@ConfigurationProperties(prefix = "thread.pool")
public class ThreadPoolProperties {

    /**
     * 核心线程池大小
     */
    private int corePoolSize;

    /**
     * 最大可创建的线程数
     */
    private int maxPoolSize;

    /**
     * 队列最大长度
     */
    private int queueCapacity;

    /**
     * 线程池维护线程所允许的空闲时间
     */
    private int keepAliveSeconds;
}

2、读取配置返回线程池

代码语言:javascript复制
@Configuration
public class ThreadPoolConfig
{
    /*
    @Value("${thread.pool.corePoolSize}")
    private String corePoolSize;

    @Value("${thread.pool.maxPoolSize}")
    private String maxPoolSize;

    @Value("${thread.pool.queueCapacity}")
    private String queueCapacity;

    @Value("${thread.pool.keepAliveSeconds}")
    private String keepAliveSeconds;
    */

    //线程池配置
    @Resource
    private ThreadPoolProperties threadPoolProperties;

    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();

        // 核心线程池大小
        threadPool.setCorePoolSize(threadPoolProperties.getCorePoolSize());
        // 最大可创建的线程数
        threadPool.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
        // 等待队列最大长度
        threadPool.setQueueCapacity(threadPoolProperties.getQueueCapacity());
        // 线程池维护线程所允许的空闲时间
        threadPool.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
        //异步方法内部线程名称
        threadPool.setThreadNamePrefix("spring默认线程池-");
        // 线程池对拒绝任务(无线程可用)的处理策略
        threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 任务都完成再关闭线程池
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        // 任务初始化
        threadPool.initialize();

        return threadPool;
    }
}

2、编写yml

代码语言:javascript复制
thread.pool.corePoolSize=16
thread.pool.maxPoolSize=32
thread.pool.queueCapacity=50
thread.pool.keepAliveSeconds=2

注意配置文件的前缀@ConfigurationProperties(prefix ="thread.pool")

3、主启动类

代码语言:javascript复制
SpringBootApplication
//@EnableDiscoveryClient
@MapperScan("com.atguigu.interview2.mapper") //import tk.mybatis.spring.annotation.MapperScan;
public class Interview2Application
{

    @Resource
    private ThreadPoolTaskExecutor threadPool;

    @PostConstruct
    public void getThreadPoolConfig()
    {
        System.out.println("*******测试threadPool getCorePoolSize: " threadPool.getCorePoolSize());
        System.out.println("*******测试threadPool getMaxPoolSize: " threadPool.getMaxPoolSize());
        System.out.println("*******测试threadPool getQueueCapacity: " threadPool.getQueueCapacity());
        System.out.println("*******测试threadPool getKeepAliveSeconds: " threadPool.getKeepAliveSeconds());
    }

    public static void main(String[] args)
    {
        SpringApplication.run(Interview2Application.class, args);
    }
}

这时候可以启动 项目,看配置文件是否读取成功

4、业务编写

这里主要是针对优惠券的业务进行编写

1、优惠券接口CouponService

代码语言:javascript复制
public interface CouponService
{
    public void batchTaskAction();
}

2、接口实现类CouponServiceImpl

代码语言:javascript复制
@Service
public class CouponServiceImpl implements CouponService
{
    //下发优惠卷数量
    public  static final Integer COUPON_NUMBER = 50;

    @Resource
    private ThreadPoolTaskExecutor threadPool;

    /**
     * 下发50条优惠卷
     */
    @Override
    public void batchTaskAction()
    {
        //1 模拟要下发的50条优惠卷,上游系统给我们的下发优惠卷源头
        List<String> coupons = new ArrayList<>(COUPON_NUMBER);
        for (int i = 1; i <= COUPON_NUMBER; i  )
        {
            coupons.add("优惠卷--" i);
        }

        //2 创建CountDownLatch,构造器参数为任务数量
        CountDownLatch countDownLatch = new CountDownLatch(coupons.size());

        long startTime = System.currentTimeMillis();
        try
        {
            //3 将优惠卷集合逐条发送进线程池高并发处理
            for (String coupon : coupons)
            {
                threadPool.execute(() -> {
                    try
                    {
                        //4 交个线程池处理的下发业务逻辑,可以提出成一个方法
                        System.out.println(String.format("【%s】发送成功", coupon));
                    }finally {
                        //5 发送一个少一个任务,计数减少一个
                        countDownLatch.countDown();
                    }
                });
            }
            //6 阻塞当前发送完毕后,方法才能继续向下走
            countDownLatch.await();
        }catch (Exception e){
            e.printStackTrace();
        }

        long endTime = System.currentTimeMillis();
        System.out.println("----任务处理完毕costTime: " (endTime - startTime)  " 毫秒");
    }
}

3、访问业务接口

代码语言:javascript复制
@RestController
@Slf4j
public class CouponController
{
    @Resource
    private CouponService couponService;

    //http://localhost:24618/coupon/sendv1
    @GetMapping(value = "/coupon/sendv1")
    public void sendv1()
    {
        couponService.batchTaskAction();
    }
}

4、结果

5、总结

本次其实就针对优惠券进行简单的打印,速度相比较来说还是挺快的,实际运用场景中会比这复杂的多,性能肯定也会有所下降的。

5、优化和改进

前面我们已经提到了其他的场景,比如二维码、优惠券、短信、邮件、理财产品收益等场景,那我们怎么才会坐到通用呢?能否做到通用的设计或工具类,给团队赋能,一开始我们肯定是一次性编写或思考不到,考虑不周的,需要我们先针对某个场景进行编写之后,后续再做到更完美!!!

1、编写高并发批处理下发的通用工具类TaskBatchSendUtils

代码语言:javascript复制
public class TaskBatchSendUtils
{
    public static <T> void send(List<T> taskList, Executor threadPool, Consumer<? super T> consumer) throws InterruptedException
    {
        if (taskList == null || taskList.size() == 0)
        {
            return;
        }

        if(Objects.isNull(consumer))
        {
            return;
        }

        CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
        //遍历消息、邮件等列表
        for (T couponOrShortMsg : taskList)
        {
            threadPool.execute(() ->
            {
                try
                {
                    consumer.accept(couponOrShortMsg);
                } finally {
                    //数量减一
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
    }

    public static void disposeTask(String task)
    {
        System.out.println(String.format("【%s】disposeTask下发优惠卷或短信成功", task));
    }

    public static void disposeTaskV2(String task)
    {
        System.out.println(String.format("【%s】disposeTask下发邮件成功", task));
    }

}

2、业务代码V2的版本

1、业务接口类

代码语言:javascript复制
public interface CouponServiceV2
{
    public void batchTaskActionV2();
}

2、业务接口实现类

代码语言:javascript复制
@Service
public class CouponServiceImplV2 implements CouponServiceV2
{
    //下发优惠卷数量
    public  static final Integer COUPON_NUMBER = 50;

    @Resource
    private ThreadPoolTaskExecutor threadPool;

    @SneakyThrows
    @Override
    public void batchTaskActionV2()
    {
        //1 模拟要下发的50条优惠卷,上游系统给我们的下发优惠卷源头
        List<String> coupons = getCoupons();

        long startTime = System.currentTimeMillis();

        //2 调用工具类批处理任务,这些优惠卷coupons,放入线程池threadPool,做什么业务disposeTask下发
        TaskBatchSendUtils.send(coupons,threadPool,TaskBatchSendUtils::disposeTask);
        //TaskBatchSendUtils.send(coupons,threadPool,TaskBatchSendUtils::disposeTaskV2);

        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: " (endTime - startTime)  " 毫秒");

    }

    private static List<String> getCoupons()
    {
        List<String> coupons = new ArrayList<>(COUPON_NUMBER);
        for (int i = 1; i <= COUPON_NUMBER; i  )
        {
            coupons.add("优惠卷--" i);
        }
        return coupons;
    }
}

3、业务访问接口

代码语言:javascript复制
@RestController
@Slf4j
public class CouponController
{
    @Resource
    private CouponService couponService;

    //http://localhost:24618/coupon/sendv1
    @GetMapping(value = "/coupon/sendv1")
    public void sendv1()
    {
        couponService.batchTaskAction();
    }

    @Resource
    private CouponServiceV2 couponServiceV2;
    //http://localhost:24618/coupon/sendv2
    @GetMapping(value = "/coupon/sendv2")
    public void sendv2()
    {
        couponServiceV2.batchTaskActionV2();
    }
}

0 人点赞