多线程的几种创建方式

2022-10-25 15:48:14 浏览数 (3)

1. 继承Thread 重写run() 方法

代码语言:javascript复制
package com.xiepanpan.locks.lockstest.service;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * @author: xiepanpan
 * @Date: 2020/2/27
 * @Description:
 */
@Slf4j
public class Thread01 extends Thread{
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            log.error("创建线程异常:{}",e);
        }
        log.info("Thread01-当前线程:{}", Thread.currentThread());
    }
}
代码语言:javascript复制
package com.xiepanpan.locks.lockstest.service;

import lombok.extern.slf4j.Slf4j;

/**
 * @author: xiepanpan
 * @Date: 2020/2/27
 * @Description: 多线程测试类
 */
@Slf4j
public class ThreadTest {

    public static void main(String[] args) {
        log.info("主线程开始。。。");
        Thread01 thread01 = new Thread01();
        //异步化
        new Thread(thread01).start();
        log.info("主线程结束......");
    }

}

运行结果:

2.实现Runable 重写run()方法

代码语言:javascript复制
package com.xiepanpan.locks.lockstest.service;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * @author: xiepanpan
 * @Date: 2020/2/27
 * @Description:
 */
@Slf4j
public class Thread02 implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            log.error("创建线程异常:{}",e);
        }
        log.info("Thread02-当前线程:{}" Thread.currentThread());
    }
}
代码语言:javascript复制
package com.xiepanpan.locks.lockstest.service;

import lombok.extern.slf4j.Slf4j;

/**
 * @author: xiepanpan
 * @Date: 2020/2/27
 * @Description: 多线程测试类
 */
@Slf4j
public class ThreadTest {

    public static void main(String[] args) {
        log.info("主线程开始。。。");
        new Thread(new Thread02()).start();
        log.info("主线程结束......");
    }

}

运行结果:

3. 使用Callable

以上没法使用线程里面的返回值。而使用Callable可以

代码语言:javascript复制
package com.xiepanpan.locks.lockstest.service;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
 * @author: xiepanpan
 * @Date: 2020/2/27
 * @Description:
 */
public class Thread03 implements Callable<String> {
    @Override
    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(3);
        return "OK";
    }
}
代码语言:javascript复制
package com.xiepanpan.locks.lockstest.service;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * @author: xiepanpan
 * @Date: 2020/2/27
 * @Description: 多线程测试类
 */
@Slf4j
public class ThreadTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程开始。。。");
        FutureTask<String> task = new FutureTask<>(new Thread03());
        new Thread(task).start();
        String s = task.get();
        log.info("主线程哈哈哈。。。。");
        log.info("异步获取到的结果是:{}",s);
        log.info("主线程结束......");
    }

}
代码语言:javascript复制
    log.info("异步获取到的结果是:{}",s);

要获取s的结果 此时主线程处于等待状态 所以主线程结束 最后执行

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。 Future提供了三种功能:   1)判断任务是否完成;   2)能够中断任务;   3)能够获取任务执行结果。 FutureTask是Future和Runable的实现

代码语言:javascript复制
package java.util.concurrent;


public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

4. ExecutorService 使用线程池

为什么要使用线程池

假如不使用线程池

代码语言:javascript复制
package com.xiepanpan.locks.lockstest.service;

import lombok.extern.slf4j.Slf4j;

import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

/**
 * @author: xiepanpan
 * @Date: 2020/2/27
 * @Description: 多线程测试类
 */
@Slf4j
public class ThreadTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程开始。。。");

        for (int i=0;i<10;i  ) {
            new Thread(()->{
                log.info("当前线程开始:{}",Thread.currentThread());
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    log.error("线程异常:{}",e);
                }
                log.info("当前线程结束: {}",Thread.currentThread());
            }).start();
        }
        log.info("主线程结束......");
    }

}

执行结果:

循环十次要创建十个线程

使用线程池

代码语言:javascript复制
package com.xiepanpan.locks.lockstest.service;

import lombok.extern.slf4j.Slf4j;

import java.util.UUID;
import java.util.concurrent.*;

/**
 * @author: xiepanpan
 * @Date: 2020/2/27
 * @Description: 多线程测试类
 */
@Slf4j
public class ThreadTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程开始。。。");
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        for (int i=0;i<10;i  ) {
            Thread thread = new Thread(() -> {
                log.info("当前线程开始:{}", Thread.currentThread());
                try {
                    TimeUnit.SECONDS.sleep(5);
                    int  j= 5/0;
                } catch (InterruptedException e) {
                    log.error("线程异常:{}", e);
                }
                log.info("当前线程结束: {}", Thread.currentThread());
            });
            给线程池提交任务
            threadPool.submit(thread);
        }
        log.info("主线程结束......");
    }

}

执行结果:

可以看到 只用两个线程再执行 使用线程池控制系统资源,防止线程资源耗尽

线程池种类

ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); Java通过Executors提供四种线程池,分别为:

  • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

默认的线程池里面的Queue是一个无界队列。 极限情况。线程全部放进队列。无界队列撑爆内存。 ThreadPool:拒绝策略。四种:自己总结;默认出异常。

使用线程池有个问题 我写了个int j= 5/0; 但不报错 任务交给线程池,出现异常无法感知。

5 CompletableFuture(异步编排)

Future接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:

  • 将多个异步计算的结果合并成一个
  • 等待Future集合中的所有任务都完成
  • Future完成事件(即,任务完成以后触发执行动作)
代码语言:javascript复制
package com.xiepanpan.locks.lockstest.service;

import lombok.extern.slf4j.Slf4j;

import java.util.UUID;
import java.util.concurrent.*;

/**
 * @author: xiepanpan
 * @Date: 2020/2/27
 * @Description: 多线程测试类
 */
@Slf4j
public class ThreadTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程开始。。。");
        ExecutorService threadPool = Executors.newFixedThreadPool(2);

        CompletableFuture.supplyAsync(()->{
            log.info("当前线程开始: {}",Thread.currentThread());
            String uuid = UUID.randomUUID().toString();
//            uuid =10/0;
            log.info("当前线程结束:{}",Thread.currentThread());
            return uuid;
        },threadPool).thenApply((r)->{
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                log.error("线程睡眠异常:{}",e);
            }
            log.info("上一步执行结果:{}",r);
            int i = 10/0;
            return r 10/0;
        }).whenComplete((r,e)->{
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException ex) {
                log.error("线程睡眠异常:{}",ex);
            }
            log.info("最终结果:{}",r);
            log.error("异常信息:{}",e);
        });
        log.info("主线程结束......");

    }

}

执行结果:

有异常就输出了 实际使用:

代码语言:javascript复制
package com.xiepanpan.locks.lockstest.service;

import lombok.extern.slf4j.Slf4j;

import java.util.UUID;
import java.util.concurrent.*;

/**
 * @author: xiepanpan
 * @Date: 2020/2/27
 * @Description: 多线程测试类
 */
@Slf4j
public class ThreadTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程开始。。。");

        ExecutorService threadPool = Executors.newFixedThreadPool(10);    
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            log.info("查询商品基本数据...");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                log.error("线程睡眠异常:{}",e);
            }
            return "华为";
        }, threadPool).whenComplete((r, e) -> {
            log.info("结果是:{}", r);
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            log.info("查询商品属性数据...");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                log.error("线程睡眠异常:{}",e);
            }
            return "金色";
        }, threadPool).whenComplete((r, e) -> {
            log.info("结果是:{}", r);
        });

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            log.info("查询商品营销数据...");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                log.error("线程睡眠异常:{}",e);
            }
            return "满199减100";
        }, threadPool).whenComplete((r, e) -> {
            log.info("结果是:{}"   r);
        });
        CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
        //线程插队
        allOf.join();
        log.info("所有人都完事了:{}",future1.get() future2.get() future3.get());

    }

}

这个方法5秒完事 一个线程的话需要10秒

以后异步任务的编程模式: CompletableFuture.supplyAsync(()->{},pool).whenComplete()

CompletableFuture的方法:
thenApply

当前阶段正常完成以后执行,而且当前阶段的执行的结果会作为下一阶段的输入参数。thenApplyAsync默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。 thenApply相当于回调函数(callback)(如ajax的success,error等回调)

thenAccept与thenRun

thenAccept和thenRun都是无返回值的。

如果说thenApply是不停的输入输出的进行生产,那么thenAccept和thenRun就是在进行消耗。它们是整个计算的最后两个阶段。 同样是执行指定的动作,同样是消耗,二者也有区别:

  • thenAccept接收上一阶段的输出作为本阶段的输入
  • thenRun根本不关心前一阶段的输出,根本不不关心前一阶段的计算结果,因为它不需要输入参数
thenAccept与thenAccpetAsync
代码语言:javascript复制
1、thenAccept只接受上一步的结果

          thenAccept(r){
              r:上一步的结果
         }
2、	thenApply(r){
              r:把上一步的结果拿来进行修改再返回,
          }

3、thenAccpet(){} 上一步结果1s 本次处理2s=3s
         
4、thenAccpetAsync(){} 上一步1s 异步2s = 最多等2s
thenCombine整合两个计算结果
whenComplete
allOf()

0 人点赞