Java多线程案例

2023-10-25 14:43:17 浏览数 (1)

Java多线程案例

单例模式

单例模式能保证某个类在程序中只存在唯一一份实例, 而不会创建出多个实例

饿汉模式:类加载的同时, 创建实例

代码语言:javascript复制
public class Singleton {
    private static Singleton instance = new Singleton();//类加载时实例化
    private Singleton(){}//构造私有化
    public static Singleton getInstance(){
        return instance;
    }
}

懒汉模式:第一次使用的时候才创建实例

代码语言:javascript复制
public class Singleton {
    private volatile static Singleton instance = null;//volatile保证内存可见性
    private Singleton(){}//构造私有化
    public static Singleton getInstance(){
        if(instance == null) {//预判断 降低锁竞争
            synchronized (Singleton.class) {
                if(instance == null) {//获取锁后判断 保证数据原子性
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

阻塞式队列

阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则

阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:

  1. 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素
  2. 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素

阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型

生产者消费者模式:

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等 待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取

  1. 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
  2. 阻塞队列也能使生产者和消费者之间 解耦

标准库中的阻塞队列:

  1. BlockingQueue 是一个接口,真正实现的类是 LinkedBlockingQueue
  2. put 方法用于阻塞式的入队列, take 用于阻塞式的出队列
  3. BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性
代码语言:javascript复制
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();

阻塞队列实现:

  1. 通过 “循环队列” 的方式来实现,使用 synchronized 进行加锁控制
  2. put 插入元素的时候, 判定如果队列满了, 就进行 wait;take 取出元素的时候, 判定如果队列为空, 就进行 wait
  3. 注意被唤醒时不一条件不一定还满足, 因为同时可能是唤醒了多个线程,需要循环判断
代码语言:javascript复制
public class BlockingQueue {
    private int[] items = new int[1000];
    private volatile int head = 0;
    private volatile int tail = 0;

    public void put(int value) throws InterruptedException {
        synchronized (this) {
            while((tail 1)%items.length == head) {
                wait();
            }
            items[tail  ] = value;
            tail %= items.length;
            notifyAll();
        }
    }
    public int take() throws InterruptedException {
        int ret;
        synchronized (this) {
            while(head == tail) {
                wait();
            }
            ret = items[head  ];
            head %= items.length;
            notifyAll();
        }
        return ret;
    }
}

定时器

定时器也是软件开发中的一个重要组件. 类似于一个 “闹钟”. 达到一个设定的时间之后, 就执行某个指定好的代码

标准库中的定时器:

标准库中提供了一个 Timer 类. Timer 类的核心方法为 schedule schedule 包含两个参数:第一个参数指定即将要执行的任务代码, 第二个参数指定多长时间之后执行 (单位为毫秒)

代码语言:javascript复制
Timer timer = new Timer();
timer.schedule(new TimerTask() {
        @Override
        public void run() {
            System.out.println("hello");
        }
    }, 3000);

定时器的构成:

  1. 一个带优先级的阻塞队列(阻塞队列中的任务都有各自的执行时刻 (delay). 最先执行的任务一定是 delay 最小的. 使用带优先级的队列就可以高效的把这个 delay 最小的任务找出来)
  2. 队列中的每个元素是一个 Task 对象,Task 中带有一个时间属性, 队首元素就是即将执行的任务
  3. 有一个 worker 线程一直扫描队首元素, 看队首元素是否需要执行
  4. 插入新执行任务需要唤醒查看队头是否更新
代码语言:javascript复制
public class Timer {
    static class Task implements Comparable<Task> {
        private Runnable task;//执行的任务
        private long time;//多久执行
        public Task(Runnable task, long time) {
            this.task = task;
            this.time = System.currentTimeMillis()   time;//绝对时间
        }

        public void run() {
            task.run();//调用执行
        }
        @Override
        public int compareTo(Task o) {
            return (int)(time - o.time);//绝对时间小先执行
        }
    }
    private PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<Task>();//储存任务列表
    private Object lock = new Object();

    class worker extends Thread {//worker线程
        @Override
        public void run() {
            while(true) {
                try {
                    Task task = queue.take();
                    long curTime = System.currentTimeMillis();//当前时间
                    if(task.time > curTime) {//不可执行
                        queue.put(task);
                        synchronized (lock) {
                            lock.wait(task.time -curTime);//等待间隔时长
                        }
                    } else {
                        task.run();//可执行任务
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        }
    }
    public Timer() {
        worker worker = new worker();//创建启动工作线程
        worker.start();
    }
    public void schedule(Runnable comm, long after) {
        Task task = new Task(comm, after);
        queue.put(task);
        synchronized (lock) {
            lock.notify();//唤醒等待线程
        }
    }
}

线程池

线程池使用池化技术,将预先创建好批量线程,等任务到达进行获取执行。

线程池最大的好处就是减少每次启动、销毁线程的损耗。

标准库中的线程池:

  1. 使用 Executors.newFixedThreadPool(10) 能创建出固定包含 10 个线程的线程池
  2. 返回值类型为 ExecutorService
  3. 通过 ExecutorService.submit 可以注册一个任务到线程池中
代码语言:javascript复制
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(new Runnable() {
    @Override
    public void run() {
        System.out.println("hello");
    }
});

Executors 创建线程池:

  1. newFixedThreadPool: 创建固定线程数的线程池
  2. newCachedThreadPool: 创建线程数目动态增长的线程池
  3. newSingleThreadExecutor: 创建只包含单个线程的线程池
  4. newScheduledThreadPool: 设定延迟时间后执行命令,或者定期执行命令,是进阶版的 Timer

Executors 本质上是 ThreadPoolExecutor 类的封装,ThreadPoolExecutor 提供了更多的可选参数, 可以进一步细化线程池行为的设定

实现线程池:

  1. 核心操作为 submit, 将任务加入线程池中
  2. 使用 Worker 类描述一个工作线程,使用 Runnable 描述一个任务
  3. 使用一个 BlockingQueue 组织所有的任务,作为生产消费场所
  4. 每个 worker 线程要做的事情: 不停的从 BlockingQueue 中取任务并执行
代码语言:javascript复制
public class ThreadPool {
    private List<Thread> workers = new ArrayList<>();
    private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();

    class Worker extends Thread {
        @Override
        public void run() {
            try {
                while(!Thread.interrupted()){
                    Runnable runnable = queue.take();
                    runnable.run();
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
    public ThreadPool(int threadNums) {
        for (int i = 0; i < threadNums; i  ) {
            Worker worker = new Worker();
            worker.start();
            workers.add(worker);
        }
    }
    public void submit(Runnable comm) {
        try {
            queue.put(comm);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

0 人点赞