组合式服务的封装

2021-01-20 16:00:57 浏览数 (1)

一、在service执行完之前,保证执行完所有服务

判断三个服务最慢的那个,使用同步,其他的使用异步 并发计数器CountDownLatch,可以等两个服务都执行完毕了再结束

二、测试代码

三、批量请求的工具类SyncBatchCallUtil

代码语言:javascript复制
package com.qf.talk.batch;

import ch.qos.logback.core.net.SyslogOutputStream;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 同步的批量请求的工具类
 */
public class SyncBatchCallUtil {
    private static ExecutorService executorPool;
    static{
        //线程池维护线程的最少数量。
        int corePoolSize=5;
        //线程池维护线程的最大数量
        int maximumPoolSize=10;

        //空闲时间是10秒
        long keepAliveTime=10;

        TimeUnit unit=TimeUnit.SECONDS;

        //队列
        BlockingQueue<Runnable> workQueue=new ArrayBlockingQueue<>(6);
        //工厂
        ThreadFactory threadFactory=new UserThreadFactory("batch");
        //handler
        RejectedExecutionHandler handler=new MyRejectPolicy();

        executorPool=new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
    }



    private CountDownLatch countDownLatch;
    public boolean batch(Task... tasks){
        //拿着N个任务创建countDownLatch对象
        if(createPool(tasks)){
            return false;
        }
        return true;
    }

    private boolean createPool(Task [] tasks){
        //有多少任务,就tasks.length就是多少,那么就需要countDownLatch来控制。
        countDownLatch=new CountDownLatch(tasks.length);
        for(int i=0;i<tasks.length;i  ){
            tasks[i].setCountDownLatch(countDownLatch);
        }
        for(int i=0;i<tasks.length;i  ){
            executorPool.execute(tasks[i]);
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return true;
        }
        return false;
    }

    static abstract class Task implements Runnable{
        public abstract void exe();
        private CountDownLatch countDownLatch;
        public void setCountDownLatch( CountDownLatch countDownLatch){
            this.countDownLatch=countDownLatch;
        }

        @Override
        public void run() {
            exe();
            countDownLatch.countDown();
        }
    }


    static class UserThreadFactory implements ThreadFactory{

        private final String namePrefix;
        private final AtomicInteger nextId=new AtomicInteger(1);

        UserThreadFactory(String whatFeatureOfGroup){
            namePrefix="FROM UserThreadFactory's " whatFeatureOfGroup "-Work-";
        }

        @Override
        public Thread newThread(Runnable task) {
            String name=namePrefix nextId.getAndIncrement();

            Thread thread=new Thread(null,task,name);
            return thread;
        }
    }

    static class MyRejectPolicy implements RejectedExecutionHandler{

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println(r);
        }
    }
}

四、CountDownLatchTest

代码语言:javascript复制
package com.qf.talk.batch;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchTest {
    public static void main(String[] args) {
        //并发计数器,构造方法需要一个参数,描述了并发的总数量
        CountDownLatch countDownLatch=new CountDownLatch(2);

        //执行器  用这个东西执行线程
        ExecutorService es1= Executors.newSingleThreadExecutor();

        es1.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                    System.out.println("第一个请求执行完成了");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //让计数器知道一个完事了
                countDownLatch.countDown();
            }
        });

        es1.shutdown();

        //执行器  用这个东西执行线程
        ExecutorService es2= Executors.newSingleThreadExecutor();

        es2.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(6000);
                    System.out.println("第二个请求执行完成了");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //让计数器知道一个完事了
                countDownLatch.countDown();
            }
        });

        es2.shutdown();

        //等待两个线程都执行完毕
        System.out.println("等待两个线程都执行完成");
        try {
            //这个方法可以把当前线程阻塞
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //这里就确定两个都完事了
        System.out.println("两个都完事了。");
    }
}

五、Test

代码语言:javascript复制
package com.qf.talk.batch;
import com.qf.talk.batch.SyncBatchCallUtil.Task;

import java.util.Arrays;

public class Test {
    public static void main(String[] args) {
        String [] result={"","",""};
        Long time=System.currentTimeMillis();
        new SyncBatchCallUtil().batch(new Task() {
            @Override
            public void exe() {
                try {
                    Thread.sleep(5000);
                    result[0]="1";
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, new Task() {
            @Override
            public void exe() {
                try {
                    Thread.sleep(7000);
                    result[1]="2";
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, new Task() {
            @Override
            public void exe() {
                try {
                    Thread.sleep(6000);
                    result[2]="3";
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //希望result里面每一个元素都有值,且总耗时以7秒为准。
        System.out.println(System.currentTimeMillis()-time);
        System.out.println(Arrays.toString(result));


    }
}

0 人点赞