一、在service执行完之前,保证执行完所有服务
判断三个服务最慢的那个,使用同步,其他的使用异步 并发计数器CountDownLatch,可以等两个服务都执行完毕了再结束
二、测试代码
三、批量请求的工具类SyncBatchCallUtil
代码语言:javascript复制package com.qf.talk.batch;
import ch.qos.logback.core.net.SyslogOutputStream;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 同步的批量请求的工具类
*/
public class SyncBatchCallUtil {
private static ExecutorService executorPool;
static{
//线程池维护线程的最少数量。
int corePoolSize=5;
//线程池维护线程的最大数量
int maximumPoolSize=10;
//空闲时间是10秒
long keepAliveTime=10;
TimeUnit unit=TimeUnit.SECONDS;
//队列
BlockingQueue<Runnable> workQueue=new ArrayBlockingQueue<>(6);
//工厂
ThreadFactory threadFactory=new UserThreadFactory("batch");
//handler
RejectedExecutionHandler handler=new MyRejectPolicy();
executorPool=new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
}
private CountDownLatch countDownLatch;
public boolean batch(Task... tasks){
//拿着N个任务创建countDownLatch对象
if(createPool(tasks)){
return false;
}
return true;
}
private boolean createPool(Task [] tasks){
//有多少任务,就tasks.length就是多少,那么就需要countDownLatch来控制。
countDownLatch=new CountDownLatch(tasks.length);
for(int i=0;i<tasks.length;i ){
tasks[i].setCountDownLatch(countDownLatch);
}
for(int i=0;i<tasks.length;i ){
executorPool.execute(tasks[i]);
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
return true;
}
return false;
}
static abstract class Task implements Runnable{
public abstract void exe();
private CountDownLatch countDownLatch;
public void setCountDownLatch( CountDownLatch countDownLatch){
this.countDownLatch=countDownLatch;
}
@Override
public void run() {
exe();
countDownLatch.countDown();
}
}
static class UserThreadFactory implements ThreadFactory{
private final String namePrefix;
private final AtomicInteger nextId=new AtomicInteger(1);
UserThreadFactory(String whatFeatureOfGroup){
namePrefix="FROM UserThreadFactory's " whatFeatureOfGroup "-Work-";
}
@Override
public Thread newThread(Runnable task) {
String name=namePrefix nextId.getAndIncrement();
Thread thread=new Thread(null,task,name);
return thread;
}
}
static class MyRejectPolicy implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r);
}
}
}
四、CountDownLatchTest
代码语言:javascript复制package com.qf.talk.batch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchTest {
public static void main(String[] args) {
//并发计数器,构造方法需要一个参数,描述了并发的总数量
CountDownLatch countDownLatch=new CountDownLatch(2);
//执行器 用这个东西执行线程
ExecutorService es1= Executors.newSingleThreadExecutor();
es1.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
System.out.println("第一个请求执行完成了");
} catch (InterruptedException e) {
e.printStackTrace();
}
//让计数器知道一个完事了
countDownLatch.countDown();
}
});
es1.shutdown();
//执行器 用这个东西执行线程
ExecutorService es2= Executors.newSingleThreadExecutor();
es2.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(6000);
System.out.println("第二个请求执行完成了");
} catch (InterruptedException e) {
e.printStackTrace();
}
//让计数器知道一个完事了
countDownLatch.countDown();
}
});
es2.shutdown();
//等待两个线程都执行完毕
System.out.println("等待两个线程都执行完成");
try {
//这个方法可以把当前线程阻塞
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//这里就确定两个都完事了
System.out.println("两个都完事了。");
}
}
五、Test
代码语言:javascript复制package com.qf.talk.batch;
import com.qf.talk.batch.SyncBatchCallUtil.Task;
import java.util.Arrays;
public class Test {
public static void main(String[] args) {
String [] result={"","",""};
Long time=System.currentTimeMillis();
new SyncBatchCallUtil().batch(new Task() {
@Override
public void exe() {
try {
Thread.sleep(5000);
result[0]="1";
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new Task() {
@Override
public void exe() {
try {
Thread.sleep(7000);
result[1]="2";
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new Task() {
@Override
public void exe() {
try {
Thread.sleep(6000);
result[2]="3";
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//希望result里面每一个元素都有值,且总耗时以7秒为准。
System.out.println(System.currentTimeMillis()-time);
System.out.println(Arrays.toString(result));
}
}