高并发编程-自定义简易的线程池(1),体会原理

2021-08-17 10:30:22 浏览数 (1)

概述

我们工作中,并发编程必然离不开jdk提供的j.u.c中的线程池 ,假设让我们自己去设计一个线程池,该从哪几个方面来着手考虑呢?

首先: 既然是线程池 , 那必然 有个初始化的线程数量 和 最大数量 ----> 两个属性 : init 和 max

其次:当线程池中的线程达到了 init 数量,但还没到max 数量的时候,将任务放入任务队列中,而不是直接开辟新的线程,是不是一种更好的设计? 所以 引入 任务队列,存放任务

最后:当线程池中的线程 把 任务队列也塞满了,也达到了Max的值,新的线程过来,我们改如何处理呢? 是不是指定拒绝策略更合适呢? 比如 直接拒绝,抛出异常,放入临时队列等等。所以 拒绝策略也是需要的

总结一下

  1. init (coreSize) / max , 线程池初始化大小和最大线程数
  2. 任务队列
  3. 拒绝策略
  4. 。。。

示例

逐步实现该需求, 先把基本的流程调通,这里我们先实现定义线程池数量的功能 ,代码如下

代码语言:javascript复制
package com.artisan.test;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.IntStream;

public class SimpleThreadPool {

    // 线程数
    private int size;

    // 默认线程数
    private static int DEFAULT_SIZE = 10;

    // 任务队列
    private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList();

    // 工作线程集合
    private final static List<WorkTask> THREAD_QUEUE = new ArrayList<>();

    private static volatile int seq = 0;
    private static final ThreadGroup GROUP = new ThreadGroup("Pool_Group");
    private static final String THREAD_PREFIX = "SIMPLE_THREAD_POOL-";

    /**
     * 默认构造函数
     * 默认10个线程
     */
    public SimpleThreadPool() {
        this(DEFAULT_SIZE);
    }

    /**
     * 线程池中的线程数量
     *
     * @param size
     */
    public SimpleThreadPool(int size) {
        this.size = size;
        // 初始化
        init();
    }


    /**
     * 初始化
     */
    private void init() {
        for (int i = 0; i < size; i  ) {
            createWorkTask();
        }
    }

    /**
     * 创建工作线程
     */
    private void createWorkTask() {
        // 创建工作线程
        WorkTask workTask = new WorkTask(GROUP, THREAD_PREFIX   (seq  ));
        // 启动
        workTask.start();
        // 加入到工作线程集合
        THREAD_QUEUE.add(workTask);
    }


    /**
     * 将任务runnable 提交到 TASK_QUEUE
     *
     * @param runnable
     */
    public void submit(Runnable runnable) {
        synchronized (TASK_QUEUE) {
            // 添加到队列的最后面
            TASK_QUEUE.addLast(runnable);
            // 唤醒所有正在等待的线程
            TASK_QUEUE.notifyAll();
        }
    }

    /**
     * 线程状态
     */
    enum TaskState {
        FREE,
        RUNNING,
        BLOCKED,
        DEAD
    }

    /**
     * private 私有类
     * 继承Thread 的 工作线程
     */
    private static class WorkTask extends Thread {
        // 默认状态  FREE
        private volatile TaskState state = TaskState.FREE;

        public WorkTask(ThreadGroup threadGroup, String name) {
            super(threadGroup, name);
        }

        /**
         * 获取线程状态
         *
         * @return
         */
        public TaskState getTaskState() {
            return this.state;
        }

        /**
         * 重写run方法
         */
        @Override
        public void run() {
            // 跳出while的标识
            OUTER:
            // 只要不是DEAD 状态,一直从队列中获取消息
            while (state != TaskState.DEAD) {

                Runnable runnable;

                // 对队列进行加锁操作
                synchronized (TASK_QUEUE) {
                    // 如果队列为空,wait
                    while (TASK_QUEUE.isEmpty()) {
                        try {
                            // 更改状态
                            state = TaskState.BLOCKED;
                            // wait 放弃该锁
                            TASK_QUEUE.wait();
                        } catch (InterruptedException e) {
                            // 如果有个线程在wait,终止该线程会被InterruptedException异常捕获,
                            // break OUTER 跳出while循环,从而达到终止这个线程
                            break OUTER;
                        }
                    }
                    // TASK_QUEUE不为空 从队列中获取一个Runnable对象
                    // Removes and returns the first element from this list
                    runnable = TASK_QUEUE.removeFirst();
                }

                if (runnable != null) {
                    // 运行状态
                    state = TaskState.RUNNING;
                    // 运行
                    runnable.run();
                    // 运行结束后 置为FREE
                    state = TaskState.FREE;
                }
            }
        }

        /**
         * 关闭线程
         */
        public void close() {
            this.state = TaskState.DEAD;
        }
    }

    /**
     * 测试
     *
     * @param args
     */
    public static void main(String[] args) {
        SimpleThreadPool simpleThreadPool = new SimpleThreadPool();
//        for (int i = 0; i < 50; i  ) {
//            simpleThreadPool.submit(()->{
//                try {
//                    System.out.println(Thread.currentThread().getName()   " is running" );
//                    Thread.sleep(1_000);
//                    System.out.println(Thread.currentThread().getName()   " finished" );
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//            });
//        }

        IntStream.range(0, 50)
                .forEach(i ->
                        simpleThreadPool.submit(() -> {
                            try {
                                System.out.println(Thread.currentThread().getName()    "____"   i    " start");
                                Thread.sleep(1_000);
                                System.out.println(Thread.currentThread().getName()   "____"   i  " finished");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }));

    }
}

运行:

代码语言:javascript复制
SIMPLE_THREAD_POOL-9____0 start
SIMPLE_THREAD_POOL-4____5 start
SIMPLE_THREAD_POOL-0____9 start
SIMPLE_THREAD_POOL-5____4 start
SIMPLE_THREAD_POOL-6____3 start
SIMPLE_THREAD_POOL-8____1 start
SIMPLE_THREAD_POOL-7____2 start
SIMPLE_THREAD_POOL-1____8 start
SIMPLE_THREAD_POOL-3____6 start
SIMPLE_THREAD_POOL-2____7 start
SIMPLE_THREAD_POOL-4____5 finished
SIMPLE_THREAD_POOL-6____3 finished
SIMPLE_THREAD_POOL-5____4 finished
SIMPLE_THREAD_POOL-3____6 finished
SIMPLE_THREAD_POOL-0____9 finished
SIMPLE_THREAD_POOL-9____0 finished
SIMPLE_THREAD_POOL-0____13 start
SIMPLE_THREAD_POOL-2____7 finished
SIMPLE_THREAD_POOL-5____12 start
SIMPLE_THREAD_POOL-1____8 finished
SIMPLE_THREAD_POOL-6____11 start
SIMPLE_THREAD_POOL-7____2 finished
SIMPLE_THREAD_POOL-4____10 start
SIMPLE_THREAD_POOL-8____1 finished
SIMPLE_THREAD_POOL-7____17 start
SIMPLE_THREAD_POOL-1____16 start
SIMPLE_THREAD_POOL-2____15 start
SIMPLE_THREAD_POOL-9____14 start
SIMPLE_THREAD_POOL-8____18 start
SIMPLE_THREAD_POOL-3____19 start
SIMPLE_THREAD_POOL-0____13 finished
SIMPLE_THREAD_POOL-0____20 start
SIMPLE_THREAD_POOL-5____12 finished
SIMPLE_THREAD_POOL-5____21 start
SIMPLE_THREAD_POOL-6____11 finished
SIMPLE_THREAD_POOL-6____22 start
SIMPLE_THREAD_POOL-4____10 finished
SIMPLE_THREAD_POOL-4____23 start
SIMPLE_THREAD_POOL-7____17 finished
SIMPLE_THREAD_POOL-7____24 start
SIMPLE_THREAD_POOL-1____16 finished
SIMPLE_THREAD_POOL-1____25 start
SIMPLE_THREAD_POOL-2____15 finished
SIMPLE_THREAD_POOL-2____26 start
SIMPLE_THREAD_POOL-9____14 finished
SIMPLE_THREAD_POOL-9____27 start
SIMPLE_THREAD_POOL-8____18 finished
SIMPLE_THREAD_POOL-8____28 start
SIMPLE_THREAD_POOL-3____19 finished
SIMPLE_THREAD_POOL-3____29 start
SIMPLE_THREAD_POOL-5____21 finished
SIMPLE_THREAD_POOL-2____26 finished
SIMPLE_THREAD_POOL-1____25 finished
SIMPLE_THREAD_POOL-7____24 finished
SIMPLE_THREAD_POOL-4____23 finished
SIMPLE_THREAD_POOL-0____20 finished
SIMPLE_THREAD_POOL-6____22 finished
SIMPLE_THREAD_POOL-0____35 start
SIMPLE_THREAD_POOL-4____34 start
SIMPLE_THREAD_POOL-7____33 start
SIMPLE_THREAD_POOL-1____32 start
SIMPLE_THREAD_POOL-8____28 finished
SIMPLE_THREAD_POOL-3____29 finished
SIMPLE_THREAD_POOL-2____31 start
SIMPLE_THREAD_POOL-9____27 finished
SIMPLE_THREAD_POOL-5____30 start
SIMPLE_THREAD_POOL-9____39 start
SIMPLE_THREAD_POOL-3____38 start
SIMPLE_THREAD_POOL-8____37 start
SIMPLE_THREAD_POOL-6____36 start
SIMPLE_THREAD_POOL-8____37 finished
SIMPLE_THREAD_POOL-2____31 finished
SIMPLE_THREAD_POOL-5____30 finished
SIMPLE_THREAD_POOL-3____38 finished
SIMPLE_THREAD_POOL-9____39 finished
SIMPLE_THREAD_POOL-6____36 finished
SIMPLE_THREAD_POOL-9____44 start
SIMPLE_THREAD_POOL-0____35 finished
SIMPLE_THREAD_POOL-4____34 finished
SIMPLE_THREAD_POOL-3____43 start
SIMPLE_THREAD_POOL-4____47 start
SIMPLE_THREAD_POOL-7____33 finished
SIMPLE_THREAD_POOL-7____48 start
SIMPLE_THREAD_POOL-5____42 start
SIMPLE_THREAD_POOL-1____32 finished
SIMPLE_THREAD_POOL-2____41 start
SIMPLE_THREAD_POOL-8____40 start
SIMPLE_THREAD_POOL-1____49 start
SIMPLE_THREAD_POOL-0____46 start
SIMPLE_THREAD_POOL-6____45 start
SIMPLE_THREAD_POOL-3____43 finished
SIMPLE_THREAD_POOL-5____42 finished
SIMPLE_THREAD_POOL-7____48 finished
SIMPLE_THREAD_POOL-4____47 finished
SIMPLE_THREAD_POOL-9____44 finished
SIMPLE_THREAD_POOL-6____45 finished
SIMPLE_THREAD_POOL-0____46 finished
SIMPLE_THREAD_POOL-1____49 finished
SIMPLE_THREAD_POOL-8____40 finished
SIMPLE_THREAD_POOL-2____41 finished

可知: 虽然定义了50个任务,但是根据日志来看,是由10个线程来完成的,符合预期。

0 人点赞