[JDK]了解多线程以及如何进行并发编程?
简介
本文主要介绍多线程基本知识,以及如何讲解进行并发编程?
硬件层面软件层面并发和并行JAVA中的线程线程的基础源码分析线程的启动线程的中断异步并发异步 Future异步Callback异步编排 CompletableFuture小结更多
手机用户请
横屏
获取最佳阅读体验,REFERENCES
中是本文参考的链接,如需要链接和更多资源,可以扫码加入『知识星球』(文末)获取长期知识分享服务。
多线程
多线程意味着您在同一应用程序中具有多个执行线程。线程就像一个执行应用程序的独立CPU。因此,多线程应用程序就像具有多个CPU同时执行代码的不同部分的应用程序。
为什么要使用多线程?
- 更好地利用单个CPU。
最常见的原因之一是能够更好地利用计算机中的资源。例如,如果一个线程正在等待对通过网络发送的请求的响应,则另一线程可以同时使用CPU来执行其他操作。此外,如果计算机具有多个CPU,或者CPU具有多个执行核心,则多线程还可以帮助您的应用程序利用这些额外的CPU核心。
- 更好地利用多个CPU或CPU内核。
如果计算机包含多个CPU或CPU包含多个执行核心,则您需要为应用程序使用多个线程才能使用所有CPU或CPU核心。单个线程最多只能使用一个CPU,如上所述,有时甚至不能完全利用单个CPU。
- 在响应性上提供更好的用户体验。
使用多线程的另一个原因是为了提供更好的用户体验。例如,如果您单击GUI中的按钮,并导致通过网络发送请求,那么哪个线程执行此请求就很重要。如果使用的线程也正在更新GUI,则在GUI线程等待请求响应时,用户可能会遇到GUI“挂起”的情况。取而代之的是,这样的请求可以由背景线程执行,因此GUI线程可以自由地同时响应其他用户请求。
- 在公平性上提供更好的用户体验。
是在用户之间更公平地共享计算机资源。例如,假设一台服务器接收来自客户端的请求,并且只有一个线程来执行这些请求。如果客户端发送的请求需要很长时间才能处理,则所有其他客户端的请求都必须等待,直到一个请求完成。通过使每个客户端的请求都由其自己的线程执行,则没有一个任务可以完全垄断CPU。
硬件层面
硬件资源
CPU、内存、硬盘、网络
软件层面
最大化利用硬件资源
线程数量、JVM内存分配大小、网络通信机制(NIOAIOBIO)、磁盘IO
进程和线程
- 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位.
- 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.
线程数量提升服务端的并发数量
Tomecat并发处理请求线程、线程池的线程数目配置。
.
事实上,一个线程并不等于一个CPU。通常,一个CPU会在多个线程之间共享它的执行时间,在给定的时间内执行每个线程之间进行切换。也可以让应用程序的线程由不同的cpu执行。
并发和并行
- 并行:多个事件同一时刻发生;
- 并发:多个事件在同一时间间隔内发生。
JAVA中的线程
实现方式
- Thread 类
新建一个类继承Thread后覆盖父类的run方法。
代码语言:javascript复制public class ThreadDemo extends Thread {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("ThreadDemo run end!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
Thread thread = new ThreadDemo();
thread.start();
}
}
- Runnable 接口
本身Thread
也实现了Runnable
接口
public class Thread implements Runnable {
//...
}
代码语言:javascript复制public class RunnableDemo implements Runnable {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("RunnableDemo run END!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
new Thread(new RunnableDemo()).start();
}
}
- Callable/Future 接口
我们知道Thread
或是Runnable
的使用,其run
方法是没有返回值的,对于异步执行的线程,当我们需要返回值时可以使用Callable/Future
接口来实现。
public class CallableDemo implements Callable<CallableResult> {
@Override
public CallableResult call() throws Exception {
Thread.sleep(10000);
return new CallableResult("CallableDemo");
}
}
public class CallableResult{
private String name;
public CallableResult() {
}
public CallableResult(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "CallableResult{"
"name='" name '''
'}';
}
}
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableDemo callableDemo = new CallableDemo();
Future<CallableResult> submit = Executors.newFixedThreadPool(2).submit(callableDemo);
System.out.println(submit.get());//阻塞等待返回结果
}
}
使用场景
- 网络请求分发
- 文件导入
- 短信发送等异步场景
线程的基础
线程的生命周期
.
代码语言:javascript复制线程的状态
public class Thread implements Runnable {
public enum State {
NEW, // 初始化状态
RUNNABLE, // 可运行/运行状态
BLOCKED, // 阻塞状态
WAITING, // 无时限等待
TIMED_WAITING, // 有时限等待
TERMINATED; // 终止状态
}
//...
}
阻塞状态:
- WAITING
- TIME_WAITING
- BLOCKED
- IO阻塞
需要注意的是,Thread中定义了6种状态码,但是一个线程真实存在的状态(操作系统层面 ),只有5种:
- ready : 表示线程已经被创建,正在等待系统调度分配CPU使用权;
- running : 表示线程获得了CPU使用权,正在进行运算;
- waiting : 表示线程休眠、等待、挂起,让出CPU资源给其他线程使用。运行状态的线程,如果调用一个阻塞的API或者等待某个事件,那么线程的状态就会转换到休眠状态。一般有以下几种情况:
- 同步阻塞:锁被其它线程占用
- 主动阻塞:调用Thread的某些方法,主动让出CPU执行权,比如:sleep()、join()等方法
- 等待阻塞:执行了wait()方法
- 新建:是线程被创建且未启动的状态。这里的创建,仅仅是在JAVA的这种编程语言层面被创建,而在操作系统层面,真正的线程还没有被创建;
- 死亡:线程执行完( run()方法执行结束)或者出现异常就会进入终止状态;
interrupt作用
- 优雅的Thread.interrupt()方法中断线程,
interrupt()
方法仅仅是通知线程,线程有机会执行一些后续操作,同时也可以无视这个通知,这个方法通过修改了调用线程的中断状态来告知那个线程,说它被中断了,线程可以通过isInterrupted()
方法,检测是不是自己被中断。 - 当线程被阻塞的时候,比如被Object.wait, Thread.join和Thread.sleep三种方法之一阻塞时;调用它的
interrput()
方法,会产生InterruptedException异常。
waiting状态(BLOCKED、WAITING、TIMED_WAITING)与RUNNING状态的转换
- RUNNING状态与BLOCKED状态的转换
- 线程等待 synchronized 的隐式锁,RUNNING —> BLOCKED
- 线程获得 synchronized 的隐式锁,BLOCKED —> RUNNING
- RUNNING状态与WAITING状态的转换
- 获得 synchronized 隐式锁的线程,调用无参数的Object.wait()方法
- 调用无参数Thread.join()方法
- 调用LockSupport.park()方法,线程阻塞切换到WAITING状态
- 调用notify()、notifyAll()、unpark()方法,可唤醒线程,从WAITING状态切换到RUNNING状态
- RUNNING状态与TIMED_WAITING状态的转换
- 调用带超时参数的 Thread..sleep(long millis)方法
- 获得 synchronized 隐式锁的线程,调用带超时参数的Object.wait(long timeout)方法
- 调用带超时参数的Thread.join(long millis)方法
- 调用带超时参数的LockSupport.parkNanos(Object blocker,long deadline)方法
- 调用带超时参数的LockSupport.parkUntil(long deadline)方法
- 调用notify()、notifyAll()、unpark()方法,可唤醒线程,从TIMED_WAITING状态切换到RUNNING状态
代码语言:javascript复制线程阻塞
public class ThreadStateDemo {
public static void main(String[] args) {
//Sleep阻塞
new Thread(() -> {
while(true){
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"STATUS_DEMO_O1").start();
//类锁WAIT阻塞
new Thread(()->{
while (true){
try {
synchronized (ThreadStateDemo.class){
ThreadStateDemo.class.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"STATUS_DEMO_02").start();
//锁争抢阻塞
new Thread(new BlockDemo(),"STATUS_DEMO_03").start();
new Thread(new BlockDemo(),"STATUS_DEMO_04").start();
}
static class BlockDemo extends Thread {
@Override
public void run() {
synchronized (BlockDemo.class){
//让抢到锁的线程一直持有类锁
while (true){
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
打印线程栈信息:
jps
& jstack [pid] >> block.txt
"STATUS_DEMO_04" #26 prio=5 os_prio=31 tid=0x00007fdc89093000 nid=0x6603 waiting for monitor entry [0x00007000083a7000]
//抢占锁失败进入阻塞状态
java.lang.Thread.State: BLOCKED (on object monitor)
at com.yiyuery.sample.states.ThreadStateDemo$BlockDemo.run(ThreadStateDemo.java:60)
- waiting to lock <0x0000000715c640b0> (a java.lang.Class for com.yiyuery.sample.states.ThreadStateDemo$BlockDemo)
at java.lang.Thread.run(Thread.java:748)
"STATUS_DEMO_03" #24 prio=5 os_prio=31 tid=0x00007fdc68973800 nid=0x9503 waiting on condition [0x00007000082a4000]
//带有超时时间阻塞
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.yiyuery.sample.states.ThreadStateDemo$BlockDemo.run(ThreadStateDemo.java:60)
- locked <0x0000000715c640b0> (a java.lang.Class for com.yiyuery.sample.states.ThreadStateDemo$BlockDemo)
at java.lang.Thread.run(Thread.java:748)
"STATUS_DEMO_02" #22 prio=5 os_prio=31 tid=0x00007fdc89879000 nid=0x6403 in Object.wait() [0x00007000081a1000]
//wait 阻塞等待状态
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000071576a1e0> (a java.lang.Class for com.yiyuery.sample.states.ThreadStateDemo)
at java.lang.Object.wait(Object.java:502)
at com.yiyuery.sample.states.ThreadStateDemo.lambda$main$1(ThreadStateDemo.java:39)
- locked <0x000000071576a1e0> (a java.lang.Class for com.yiyuery.sample.states.ThreadStateDemo)
at com.yiyuery.sample.states.ThreadStateDemo$$Lambda$2/1164175787.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
"STATUS_DEMO_O1" #21 prio=5 os_prio=31 tid=0x00007fdc89092000 nid=0x6203 waiting on condition [0x000070000809e000]
//带有超时时间阻塞
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.yiyuery.sample.states.ThreadStateDemo.lambda$main$0(ThreadStateDemo.java:27)
at com.yiyuery.sample.states.ThreadStateDemo$$Lambda$1/1012570586.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
源码分析
线程的启动
线程调度
.
源码
- JDK
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
//调用JVM的本地方法
private native void start0();
- JVM
//hotspot-87ee5ee27509/src/os/linux/vm/os_linux.cpp
MemNotifyThread::MemNotifyThread(int fd): Thread() {
assert(memnotify_thread() == NULL, "we can only allocate one MemNotifyThread");
_fd = fd;
//创建线程
if (os::create_thread(this, os::os_thread)) {
_memnotify_thread = this;
//设置优先级
os::set_priority(this, NearMaxPriority);
//启动线程
os::start_thread(this);
}
}
bool os::create_thread(Thread* thread, ThreadType thr_type, size_t stack_size) {
assert(thread->osthread() == NULL, "caller responsible");
// Allocate the OSThread object
OSThread* osthread = new OSThread(NULL, NULL);
if (osthread == NULL) {
return false;
}
// set the correct thread state
osthread->set_thread_type(thr_type);
// Initial state is ALLOCATED but not INITIALIZED
osthread->set_state(ALLOCATED);
thread->set_osthread(osthread);
// init thread attributes
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
// stack size
if (os::Linux::supports_variable_stack_size()) {
// calculate stack size if it's not specified by caller
if (stack_size == 0) {
stack_size = os::Linux::default_stack_size(thr_type);
switch (thr_type) {
case os::java_thread:
// Java threads use ThreadStackSize which default value can be
// changed with the flag -Xss
assert (JavaThread::stack_size_at_create() > 0, "this should be set");
stack_size = JavaThread::stack_size_at_create();
break;
case os::compiler_thread:
if (CompilerThreadStackSize > 0) {
stack_size = (size_t)(CompilerThreadStackSize * K);
break;
} // else fall through:
// use VMThreadStackSize if CompilerThreadStackSize is not defined
case os::vm_thread:
case os::pgc_thread:
case os::cgc_thread:
case os::watcher_thread:
if (VMThreadStackSize > 0) stack_size = (size_t)(VMThreadStackSize * K);
break;
}
}
stack_size = MAX2(stack_size, os::Linux::min_stack_allowed);
pthread_attr_setstacksize(&attr, stack_size);
} else {
// let pthread_create() pick the default value.
}
// glibc guard page
pthread_attr_setguardsize(&attr, os::Linux::default_guard_size(thr_type));
ThreadState state;
{
// Serialize thread creation if we are running with fixed stack LinuxThreads
bool lock = os::Linux::is_LinuxThreads() && !os::Linux::is_floating_stack();
if (lock) {
os::Linux::createThread_lock()->lock_without_safepoint_check();
}
pthread_t tid;
int ret = pthread_create(&tid, &attr, (void* (*)(void*)) java_start, thread);
pthread_attr_destroy(&attr);
if (ret != 0) {
if (PrintMiscellaneous && (Verbose || WizardMode)) {
perror("pthread_create()");
}
// Need to clean up stuff we've allocated so far
thread->set_osthread(NULL);
delete osthread;
if (lock) os::Linux::createThread_lock()->unlock();
return false;
}
// Store pthread info into the OSThread
osthread->set_pthread_id(tid);
// Wait until child thread is either initialized or aborted
{
Monitor* sync_with_child = osthread->startThread_lock();
MutexLockerEx ml(sync_with_child, Mutex::_no_safepoint_check_flag);
while ((state = osthread->get_state()) == ALLOCATED) {
sync_with_child->wait(Mutex::_no_safepoint_check_flag);
}
}
if (lock) {
os::Linux::createThread_lock()->unlock();
}
}
// Aborted due to thread limit being reached
if (state == ZOMBIE) {
thread->set_osthread(NULL);
delete osthread;
return false;
}
// The thread is returned suspended (in state INITIALIZED),
// and is started higher up in the call chain
assert(state == INITIALIZED, "race condition");
return true;
}
线程的中断
中断方式
- 设置一个共享变量,来通过修改变量来实现while循环的结束
- 通过Interrupt机制唤醒阻塞状态下的线程
代码语言:javascript复制示例代码
public class Test extends Thread{
private static int count = 0;
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
System.out.println(">>>" count );
}
System.out.println("运行结束:interrupt Flag" Thread.currentThread().isInterrupted());
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Test());
thread.start();
TimeUnit.SECONDS.sleep(3);
thread.interrupt();
}
}
/*
...
>>>1137794
>>>1137795
运行结束:interrupt Flagtrue
*/
代码语言:javascript复制线程中断复位
public class Test2 extends Thread{
private static int count = 0;
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
TimeUnit.SECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("Sleep 状态下被中断:interrupt Flag " Thread.currentThread().isInterrupted());
}
}
System.out.println("运行结束:interrupt Flag" Thread.currentThread().isInterrupted());
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Test2());
thread.start();
TimeUnit.SECONDS.sleep(3);
thread.interrupt();
}
}
/*
Connected to the target VM, address: '127.0.0.1:53223', transport: 'socket'
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.yiyuery.sample.interrupt.Test2.run(Test2.java:28)
at java.lang.Thread.run(Thread.java:748)
Sleep 状态下被中断:interrupt Flag false
*/
此时线程不会停止,睡眠状态下的线程被中断后,需要考虑是否处理中断的通知信息。
代码语言:javascript复制 @Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
TimeUnit.SECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("Sleep 状态下被中断:interrupt Flag" Thread.currentThread().isInterrupted());
//响应中断
Thread.currentThread().interrupt();
}
}
System.out.println("运行结束:interrupt Flag" Thread.currentThread().isInterrupted());
}
/*
Connected to the target VM, address: '127.0.0.1:53882', transport: 'socket'
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.yiyuery.sample.interrupt.Test2.run(Test2.java:28)
at java.lang.Thread.run(Thread.java:748)
Sleep 状态下被中断:interrupt Flagfalse
运行结束:interrupt Flagtrue
Disconnected from the target VM, address: '127.0.0.1:53882', transport: 'socket'
*/
源码
- JDK
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
//...
private native void interrupt0();
- JVM
//jvm.cpp
//JVM_Interrupt 宏定义
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");
// Ensure that the C Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END
//JVM_Sleep 宏定义
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
JVMWrapper("JVM_Sleep");
if (millis < 0) {
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}
//中断抛出异常
if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
// Save current thread state and restore it at the end of this block.
// And set new thread state to SLEEPING.
JavaThreadSleepState jtss(thread);
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__sleep__begin, millis);
#else /* USDT2 */
HOTSPOT_THREAD_SLEEP_BEGIN(
millis);
#endif /* USDT2 */
EventThreadSleep event;
if (millis == 0) {
// When ConvertSleepToYield is on, this matches the classic VM implementation of
// JVM_Sleep. Critical for similar threading behaviour (Win32)
// It appears that in certain GUI contexts, it may be beneficial to do a short sleep
// for SOLARIS
if (ConvertSleepToYield) {
os::yield();
} else {
ThreadState old_state = thread->osthread()->get_state();
thread->osthread()->set_state(SLEEPING);
os::sleep(thread, MinSleepInterval, false);
thread->osthread()->set_state(old_state);
}
} else {
ThreadState old_state = thread->osthread()->get_state();
thread->osthread()->set_state(SLEEPING);
if (os::sleep(thread, millis, true) == OS_INTRPT) {
// An asynchronous exception (e.g., ThreadDeathException) could have been thrown on
// us while we were sleeping. We do not overwrite those.
if (!HAS_PENDING_EXCEPTION) {
if (event.should_commit()) {
event.set_time(millis);
event.commit();
}
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
#else /* USDT2 */
HOTSPOT_THREAD_SLEEP_END(
1);
#endif /* USDT2 */
// TODO-FIXME: THROW_MSG returns which means we will not call set_state()
// to properly restore the thread state. That's likely wrong.
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
}
thread->osthread()->set_state(old_state);
}
if (event.should_commit()) {
event.set_time(millis);
event.commit();
}
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
#else /* USDT2 */
HOTSPOT_THREAD_SLEEP_END(
0);
#endif /* USDT2 */
JVM_END
.
JVM 在不同的操作系统层面会有不同的实现。
代码语言:javascript复制//os_linux.cpp
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
//设置一个中断状态
osthread->set_interrupted(true);
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we execute unpark().
OrderAccess::fence();
//如果是Sleep状态,进行唤醒,并抛出一个InterruptException
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
// For JSR166. Unpark even if interrupt status already was set
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
异步并发
异步 Future
代码语言:javascript复制public class Test {
final static ExecutorService executorService = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
long start = System.currentTimeMillis();
try {
List<String> result = doBusiness();
System.out.println(StringUtils.join(result.toArray(new String[2]), ","));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println("Total use " (System.currentTimeMillis() - start));
}
private static List<String> doBusiness() throws InterruptedException, ExecutionException, TimeoutException {
Callable<String> task1 = new Callable<String>() {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(2);
return "T1 use 2 s";
}
};
Future<String> r1 = executorService.submit(task1);
Callable<String> task2 = new Callable<String>() {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(5);
return "T2 use 5 s";
}
};
Future<String> r2 = executorService.submit(task2);
List<String> result = new ArrayList();
result.add(r1.get(3, TimeUnit.SECONDS));
result.add(r2.get(6, TimeUnit.SECONDS));
return result;
}
}
/*
Connected to the target VM, address: '127.0.0.1:61608', transport: 'socket'
T1 use 2 s,T2 use 5 s
Total use 5036
*/
业务开发中,通过并发请求,然后等待最慢的一个返回。这样的方式,可以大大提升响应速度。
异步Callback
代码语言:javascript复制public class CallbackTest {
static CloseableHttpAsyncClient httpAsyncClient = HttpAsyncClients.createDefault();
static {
httpAsyncClient.start();
}
public static void main(String[] args) throws ExecutionException, InterruptedException, UnsupportedEncodingException {
//利用回调机制,先发起网络请求,当网络返回后回调相关方法
HttpPost httpPost = new HttpPost("http://httpbin.org/post");
httpPost.setEntity(new StringEntity("{'key':'value'}"));
CompletableFuture<String> future = getHttpData(httpPost);
String result = future.get();
System.out.println("GET RESULT: " result);
}
private static CompletableFuture<String> getHttpData(HttpPost httpPost) {
CompletableFuture asyncFuture = new CompletableFuture<>();
HttpAsyncRequestProducer producer = HttpAsyncMethods.create(httpPost);
BasicAsyncResponseConsumer consumer = new BasicAsyncResponseConsumer();
FutureCallback callback = new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse o) {
try {
asyncFuture.complete(EntityUtils.toString(o.getEntity()));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Exception e) {
asyncFuture.completeExceptionally(e);
}
@Override
public void cancelled() {
asyncFuture.cancel(true);
}
};
httpAsyncClient.execute(producer, consumer, callback);
return asyncFuture;
}
}
/*
GET RESULT: {
"args": {},
"data": "{'key':'value'}",
"files": {},
"form": {},
"headers": {
"Content-Length": "15",
"Content-Type": "text/plain; charset=ISO-8859-1",
"Host": "httpbin.org",
"User-Agent": "Apache-HttpAsyncClient/4.1.2 (Java/1.8.0_181)",
"X-Amzn-Trace-Id": "Root=1-5f01b05d-a0dfd33a333cdd3cc79e66fa"
},
"json": null,
"origin": "115.205.151.52",
"url": "http://httpbin.org/post"
}
*/
HttpAsyncClient
使用基于NIO的异步I/O
模型实现,它采用Reactor
模式,摒弃了阻塞I/O
模型。采用线程池来分发请求的响应回调事件通知,从而有效的支撑大量并发连接,这种方式虽然不能提升性能,但是却可以支持大量的并发连接或提升吞吐量(因为响应的并发回调,而不再是单个线程占用一个连接),这种异步实现可以配合CompletableFuture
实现半异步。
异步编排 CompletableFuture
场景1:多个服务异步并发调用,然后处理结果合并返回,(不)阻塞主线程
代码语言:javascript复制public class CompletableFutureTest {
final static ExecutorService executorService = Executors.newFixedThreadPool(2);
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
CompletableFuture<String> t1 = doTask1();
CompletableFuture<String> t2 = doTask2();
CompletableFuture<String> t3 = doTask3();
CompletableFuture
.allOf(t1, t2, t3)
//内部使用ForkJoinPool实现
.thenApplyAsync((Void) -> {
String s1 = null;
try {
s1 = t1.get();
String s2 = t2.get();
String s3 = t3.get();
System.out.println(s1 s2 s3);
System.out.println("总耗时 AT 1: " (System.currentTimeMillis() -start));
return s1 s2 s3;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "";
}).exceptionally(e -> {
System.out.println("捕获到异常!");
return "ERROR";
});
System.out.println("总耗时 AT 2: " (System.currentTimeMillis() -start));
Thread.currentThread().join();
}
private static CompletableFuture<String> doTask1() {
CompletableFuture<String> result = new CompletableFuture<>();
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("T1 use 3s");
result.complete("T1 use 3s ");
});
return result;
}
private static CompletableFuture<String> doTask2() {
CompletableFuture<String> result = new CompletableFuture<>();
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(7);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("T2 use 7s");
result.complete("T2 use 7s ");
});
return result;
}
private static CompletableFuture<String> doTask3() {
CompletableFuture<String> result = new CompletableFuture<>();
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("T3 use 10s");
result.complete("T3 use 10s ");
});
return result;
}
}
/*
总耗时 AT 2:4
T1 use 3s
T1 use 7s
T1 use 10s
T1 use 3s T1 use 7s T1 use 10s
总耗时 AT 1:13009
*/
增加线程池大小
final static ExecutorService executorService = Executors.newFixedThreadPool(3);
总耗时 AT 2:3
T1 use 3s
T2 use 7s
T3 use 10s
T1 use 3s T2 use 7s T3 use 10s
总耗时 AT 1:10005
分析两次打印的结果可以得出的结论:
CompletableFuture.allOf(t1, t2, t3).thenApplyAsync
方法不会阻塞主线程- 线程池的数目会影响多个任务同时执行的总耗时
- 任务执行的最小时间大于等于耗时最大的任务的耗时时间。
代码语言:javascript复制同步阻塞等待结果
public class CompletableFutureTest2 {
final static ExecutorService executorService = Executors.newFixedThreadPool(3);
public static void main(String[] args) throws InterruptedException, ExecutionException {
long start = System.currentTimeMillis();
CompletableFuture<String> t1 = doTask1();
CompletableFuture<String> t2 = doTask2();
CompletableFuture<String> t3 = doTask3();
CompletableFuture<List<?>> result = CompletableFuture
.allOf(t1, t2, t3)
.thenApply((Void) -> {
try {
String s1 = t1.get();
String s2 = t2.get();
String s3 = t3.get();
System.out.println(s1 s2 s3);
System.out.println("总耗时 AT 1: " (System.currentTimeMillis() -start));
return Arrays.asList(s1 ,s2 ,s3);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return Collections.emptyList();
}).exceptionally(e -> {
System.out.println("捕获到异常!");
return Collections.emptyList();
});
System.out.println(StringUtils.join(result.get().toArray(new String[2]), ","));
System.out.println("总耗时 AT 2: " (System.currentTimeMillis() -start));
Thread.currentThread().join();
}
private static CompletableFuture<String> doTask1() {
CompletableFuture<String> result = new CompletableFuture<>();
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("T1 use 3s");
result.complete("T1 use 3s ");
});
return result;
}
private static CompletableFuture<String> doTask2() {
CompletableFuture<String> result = new CompletableFuture<>();
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(7);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("T1 use 7s");
result.complete("T1 use 7s ");
});
return result;
}
private static CompletableFuture<String> doTask3() {
CompletableFuture<String> result = new CompletableFuture<>();
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("T3 use 10s");
result.complete("T3 use 10s ");
});
return result;
}
}
/*
T1 use 3s
T2 use 7s
T3 use 10s
T1 use 3s T2 use 7s T3 use 10s
总耗时 AT 1:10005
T1 use 3s ,T2 use 7s ,T3 use 10s
总耗时 AT 2:10011
*/
场景2:多个服务并发调用,消费结果,不阻塞主线程
代码语言:javascript复制public class CompletableFutureTest3 {
final static ExecutorService executorService = Executors.newFixedThreadPool(3);
public static void main(String[] args) throws InterruptedException, ExecutionException {
long start = System.currentTimeMillis();
CompletableFuture<String> t1 = doTask1();
CompletableFuture<String> t2 = doTask2();
CompletableFuture<String> t3 = doTask3();
//异步调用t2,同时处理t1、t2结果
t1.thenAcceptBothAsync(t2, (t1Result, t2Result) -> {
System.out.println("t1Result: " t1Result);
System.out.println("t2Result: " t2Result);
System.out.println("总耗时 AT 3: " (System.currentTimeMillis() -start));
}).exceptionally(e -> {
System.out.println("捕获到异常!");
return null;
});
//异步调用t3,同时处理t2、t3结果
t2.thenAcceptBothAsync(t3,(t2Result,t3Result)->{
System.out.println("t2Result: " t2Result);
System.out.println("t3Result: " t3Result);
System.out.println("总耗时 AT 2: " (System.currentTimeMillis() -start));
}).exceptionally(e -> {
System.out.println("捕获到异常!");
return null;
});
System.out.println("总耗时 AT 1: " (System.currentTimeMillis() -start));
Thread.currentThread().join();
}
private static CompletableFuture<String> doTask1() {
CompletableFuture<String> result = new CompletableFuture<>();
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("T1 use 3s");
result.complete("T1 use 3s ");
});
return result;
}
private static CompletableFuture<String> doTask2() {
CompletableFuture<String> result = new CompletableFuture<>();
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(7);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("T2 use 7s");
result.complete("T2 use 7s ");
});
return result;
}
private static CompletableFuture<String> doTask3() {
CompletableFuture<String> result = new CompletableFuture<>();
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("T3 use 10s");
result.complete("T3 use 10s ");
});
return result;
}
}
/*
总耗时 AT 1:4
T1 use 3s
T2 use 7s
t1Result: T1 use 3s
t2Result: T2 use 7s
总耗时 AT 3:7005
T3 use 10s
t2Result: T2 use 7s
t3Result: T3 use 10s
总耗时 AT 2:10004
*/
场景3:T1执行完,并发执行T2和T3,然后消费结果,不阻塞主线程
代码语言:javascript复制v) -> {
try {
TimeUnit.SECONDS.sleep(7);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result = "T2 use 7s";
System.out.println(result);
return result;
});
CompletableFuture<String> t3 = doTask3();
t2.thenCombineAsync(t3, (t2Result, t3Result) -> {
System.out.println("t2Result: " t2Result);
System.out.println("t3Result: " t3Result);
System.out.println("总耗时 AT 2: " (System.currentTimeMillis() -start));
return null;
});
System.out.println("总耗时 AT 1: " (System.currentTimeMillis() -start));
Thread.currentThread().join();
}
private static CompletableFuture<String> doTask1() {
//注意 T1不再是线程池中异步执行
CompletableFuture<String> result = new CompletableFuture<>();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("T1 use 3s");
result.complete("T1 use 3s ");
return result;
}
private static CompletableFuture<String> doTask2() {
CompletableFuture<String> result = new CompletableFuture<>();
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(7);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("T2 use 7s");
result.complete("T2 use 7s ");
});
return result;
}
private static CompletableFuture<String> doTask3() {
CompletableFuture<String> result = new CompletableFuture<>();
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("T3 use 10s");
result.complete("T3 use 10s ");
});
return result;
}
}
/*
T1 use 3s
总耗时 AT 1:3063
T2 use 7s
T3 use 10s
t2Result: T2 use 7s
t3Result: T3 use 10s
总耗时 AT 2:13064
*/
小结
- Java 中线程有6种状态,操作系统层面只有5种。
- 创建一个线程的方式有3种,
Thread
、Runnable
、Callable
。 - 需要返回值时可以使用
Callable Future
。 - 异步编排
CompletableFuture
可以提升服务的响应速度。 - 并发编程是为了充分利用硬件和软件资源。