在之前文章Java自定义异步功能实践中,我仿造Go语言中的go
定义了fun
作为Groovy/Java异步执行的关键字。通过一个定长的线程池执行异步任务。
经过一段时间的使用,效果非常好,既能满足当下的需求,在实现的过程中也锻炼了自己对线程池的理解。通常的使用场景分为:异步上报数据、大量任务需要多线程执行、做简单的并发测试。
但是来了一个新活儿,需要控制请求的QPS,而非通过线程池的大小控制并发的压力。这样第一能够更加准确,第二能适配不同的耗时的任务。
说来就来,经过查询资料,很多限流框架或者组件都比较好地实现了这个功能。但是功能设计相对我的需求来说,太复杂,太强大了。有点大材小用的感觉。所以查阅了之前文章如何mock固定QPS的接口中用到的java.util.concurrent.Semaphore
流量控制类。
看这个类的包路径就知道这是Java并发用到的,实际上在JDK自带并发相关功能类中,java.util.concurrent.Semaphore
使用的范围还是挺广的。这里就不多说了。这个类就是在线程安全的场景中,实现流量控制的。主要的API也没几个。
API简介
- 首先是构造方法:
java.util.concurrent.Semaphore#Semaphore(int)
这里参数可以理解为许可的总量。 - 获取许可:
java.util.concurrent.Semaphore#acquire()
由这个方法引申出来还有几个增加时间参数的API,这里也不多说了。 - 释放许可:
java.util.concurrent.Semaphore#release()
默认释放一个许可,最常用的也是这个。 - 获取和释放都支持多个许可,使用场景不多,不过多介绍了。
实现
思路,我还是通过异步线程池来实现,然后每一次获取一个许可,我就把任务当做一个简单的异步任务去执行,然后休眠1s之后再释放许可。
由于工具类com.funtester.frame.SourceCode
是个Java类,基本了不少工具函数,改成Groovy
的话面临兼容性的风险。所以功能实现还是用的Java语言,这里借助了Groovy closure 与Java function转换提供的兼容性方案解决Groovy和Java闭包的不兼容问题,语法上看着有点乱。
/**
* 已固定QPS,异步执行,默认16,调整方法{@link SourceCode#setMaxQps(int)}
*
* @param f
*/
public static void funner(Closure f) {
if (!ThreadPoolUtil.acquire()) return;
fun(JToG.toClosure(() -> {
sleep(1.0);
fun(JToG.toClosure(() -> {
f.call();
ThreadPoolUtil.release();
return null;
}));
return null;
}));
}
/**
* 设置异步执行任务最大QPS
*
* @param i
*/
public static void setMaxQps(int i) {
ThreadPoolUtil.setSemaphore(new Semaphore(i));
}
流量设置:
代码语言:javascript复制 /**
* 全局流量控制
*/
static Semaphore semaphore = new Semaphore(16)
static long AcquireTimeout = 8888
/**
* 获取许可
* @return
*/
static boolean acquire() {
semaphore.tryAcquire(AcquireTimeout, TimeUnit.MILLISECONDS)
}
/**
* 释放许可
* @return
*/
static def release() {
semaphore.release()
}
测试用例
思路就是设置一个小的QPS,然后循环添加异步任务,异步任务完成打印时间。
代码语言:javascript复制 static void main(String[] args) {
setMaxQps(2)
10.times {
funner {
output(Time.getDate())
}
}
}
控制台输出:
代码语言:javascript复制 ###### # # # # ####### ###### ##### ####### ###### #####
# # # ## # # # # # # # #
#### # # # # # # #### ##### # #### #####
# # # # # # # # # # # # #
# ##### # # # ###### ##### # ###### # #
17:50:07.675 Deamon 守护线程开启!
17:50:08.695 F-3 2022-10-08 17:50:08
17:50:08.695 F-4 2022-10-08 17:50:08
17:50:09.700 F-7 2022-10-08 17:50:09
17:50:09.700 F-7 2022-10-08 17:50:09
17:50:10.706 F-3 2022-10-08 17:50:10
17:50:10.706 F-1 2022-10-08 17:50:10
17:50:11.711 F-6 2022-10-08 17:50:11
17:50:11.711 F-5 2022-10-08 17:50:11
17:50:12.715 F-4 2022-10-08 17:50:12
17:50:12.716 F-2 2022-10-08 17:50:12
17:50:12.757 Deamon 异步线程池等待执行1次耗时:1,028 ms
17:50:12.759 Deamon 异步线程池关闭!
进程已结束,退出代码0
这里有个问题就是线程池满了之后会导致任务积压,这里我考虑了一下现实情况,暂时没有处理,后续遇到再说。