线程的基本操作及原理

2023-01-09 15:59:37 浏览数 (2)

线程的基本操作及原理

Thread.join的使用及原理

经典案例

代码语言:javascript复制
public class Main {  
    static int x = 0;  
    static int i = 0;  
      
    public static void main(String[] args) throws InterruptedException {  
        Thread t1 = new Thread(() -> {  
            i = 1;  
            x = 2;  
        });  
        Thread t2 = new Thread(() -> i = x   2);
        t1.start();
        t2.start();
        Thread.sleep(1000);
        System.out.println("result: "   i);
    }  
}

result: 4 或 result: 1 ,该结果产生的原因是因为线程乱序执行导致的,解决方法:

代码语言:javascript复制
t1.start();  
t1.join(); // t1线程的执行结果对t2线程可见 t1先执行阻塞主线程
t2.start();

源码分析

分析 java.base.lang.Thread 源码:

代码语言:javascript复制
public final synchronized void join(final long millis)  
throws InterruptedException {  
    if (millis > 0) {  
        if (isAlive()) {  
            final long startTime = System.nanoTime();  
            long delay = millis;  
            do {  
                wait(delay);  
            } while (isAlive() && (delay = millis -  
                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);  
        }  
    } else if (millis == 0) {  
        while (isAlive()) {
	        // wait(0) 实现了线程的阻塞
            wait(0);  
        }  
    } else {  
        throw new IllegalArgumentException("timeout value is negative");  
    }  
}

分析 jdk12 hotspot 中 src/hotspot/share/runtime/thread.cpp 源码:

代码语言:javascript复制
void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
	assert(this == JavaThread::current(), "thread consistency check");
	......
	// 第 1954 行
	ensure_join(this); // 加入并阻塞主线程
	assert(!this->has_pending_exception(), "ensure_join should have cleared");
	......
}

static void ensure_join(JavaThread* thread) {
	// We do not need to grab the Threads_lock, since we are operating on ourself.
	Handle threadObj(thread, thread->threadObj());
	assert(threadObj.not_null(), "java thread object must exist");
	ObjectLocker lock(threadObj, thread);
	// Ignore pending exception (ThreadDeath), since we are exiting anyway
	thread->clear_pending_exception();
	// Thread is exiting. So set thread_status field in  java.lang.Thread class to TERMINATED.
	java_lang_Thread::set_thread_status(threadObj(), java_lang_Thread::TERMINATED);
	// Clear the native thread instance - this makes isAlive return false and allows the join()
	// to complete once we've done the notify_all below
	java_lang_Thread::set_thread(threadObj(), NULL);
	// 唤醒处于阻塞状态下的主线程
	lock.notify_all(thread);
	// Ignore pending exception (ThreadDeath), since we are exiting anyway
	thread->clear_pending_exception();
}

Thread.sleep的作用

使线程暂停执行一段时间,知道等待的时间结束才恢复执行或在这段时间内被中断执行。

代码语言:javascript复制
public class Main extends Thread {  
    public static void main(String[] args) throws InterruptedException {  
        new Main().start();  
	}  
	  
    @Override  
    public void run() {  
        System.out.println("begin: "   System.currentTimeMillis());  
        try {  
            Thread.sleep(3000); // 线程睡眠3秒  
            System.out.println("end: "   System.currentTimeMillis());  
        } catch (InterruptedException e) {  
            throw new RuntimeException(e);  
        }  
    }  
}

Thread.sleep的工作流程

  • 挂起线程并修改其运行状态
  • 用 sleep() 提供的参数来设置一个定时器
  • 当时间结束,定时器会触发,内核收到中断后修改线程的运行状态。例如线程会被标记为就绪而进入就绪队列等待调度

线程的调度算法

在操作系统中,CPU 竞争有很多种策略。Unix 系统使用的是时间片算法,而 Windows 则属于抢占式的。

问题思考

  • 假设现在是 2019-11-18 12:00:00.000 如果调用 Thread.sleep(1000) 在 2019-11-18 12:00:01.000 的时候这个线程会不会被唤醒?

该方法的本意是在未来的 1 秒内该线程不想去参与 CPU 的任何竞争,那么 1000 毫秒后如果还有线程在占用占用 CPU 操作系统是不会重分配线程的,直到那个线程结束或挂起。如果恰巧轮到操作系统分配 CPU,当前线程也不一定是优先级最高的那一个。

代码语言:javascript复制
begin: 1672976279782
end: 1672976282792
  • Thread.sleep(0) 的意义是什么?

类似于 Thread.yield() 和第一个问题一样,操作系统会接受该 0 毫秒的请求,并重新计算所有线程的优先级再重新分配 CPU 资源。所以 Thread.sleep(0) 的意义是立刻触发操作系统重新进行一次 CPU 的竞争,那么竞争的结果可能是当前线程仍然拥有当前 CPU 的控制权,或者是其他线程拿到优先控制权。

wait和notify的使用

如何实现:一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行响应操作(线程可见)

代码语言:javascript复制
public class Main {  
    public static void main(String[] args) {  
        Queue<String> queue = new LinkedList<>();  
        int size = 10;  
        Producer producer = new Producer(queue, size);  
        Consumer consumer = new Consumer(queue, size);  
		
        Thread t1 = new Thread(producer);  
        Thread t2 = new Thread(consumer);  
        t1.start();  
        t2.start();  
    }  
}  
  
class Producer implements Runnable {  
    private Queue<String> bags;  
    private int size;  
	  
    public Producer(Queue<String> bags, int size) {  
        this.bags = bags;  
        this.size = size;  
    }  
	  
    @Override  
    public void run() {  
        int i = 0;  
        while (true) {  
            i  ;  
            synchronized (bags) {  
                while (bags.size() == size) {  
                    System.out.println("bags已经满了");  
                    // 满了以后 阻塞  
                    try {  
                        bags.wait();  
                    } catch (InterruptedException e) {  
                        throw new RuntimeException(e);  
                    }  
                }  
                try {  
                    Thread.sleep(1000);  
                } catch (InterruptedException e) {  
                    throw new RuntimeException(e);  
                }  
                System.out.println("生产者 生产 bag : "   i);  
                bags.add("bag"   i);  
                // 生产完成 唤醒处于阻塞状态下的消费者  
                bags.notifyAll();  
            }  
        }  
    }  
}  
  
class Consumer implements Runnable {  
    private Queue<String> bags;  
    private int size;  
	  
    public Consumer(Queue<String> bags, int size) {  
        this.bags = bags;  
        this.size = size;  
    }  
	  
    @Override  
    public void run() {  
        while (true) {  
            synchronized (bags) {  
                while (bags.isEmpty()) {  
                    System.out.println("bags为空");  
                    // 阻塞  
                    try {  
                        bags.wait();  
                    } catch (InterruptedException e) {  
                        throw new RuntimeException(e);  
                    }  
                }  
                // 每秒消费一次  
                try {  
                    Thread.sleep(1000);  
                } catch (InterruptedException e) {  
                    throw new RuntimeException(e);  
                }  
                String bag = bags.remove();  
                System.out.println("消费者 消费 bag : "   bag);  
                // 肯定不会满 唤醒处于阻塞状态下的生产者  
                bags.notifyAll();  
            }  
        }  
    }  
}

运行结果:

代码语言:javascript复制
生产者 生产 bag : 1
生产者 生产 bag : 2
生产者 生产 bag : 3
生产者 生产 bag : 4
消费者 消费 bag : bag1
消费者 消费 bag : bag2
生产者 生产 bag : 5
消费者 消费 bag : bag3
消费者 消费 bag : bag4
消费者 消费 bag : bag5
bags为空
生产者 生产 bag : 6

Process finished with exit code 130
  • 为什么 wait() 和 notify() / notifyAll() 需要使用 synchronized () {} 同步锁来修饰?
  1. notify() 实质上是一种条件竞争,是基于 bags 的一个条件竞争是条件互斥的,所以每次都需要使用 synchronized(bags) 来获得 bags 的锁进行竞争。
  2. wait() 和 notify() 是用于实现多个线程之间的通信,而通信必然会存在一个通信的载体。wait 和 notify 就是基于 synchronized 来实现通信的,也就是两者必须要在同一个频道也就是同一个锁的范围内。

Thread.interrupted和Thread.interrupt

如何正确终止一个线程

为什么不使用stop()

代码语言:javascript复制
@Deprecated(since="1.2")  
public final void stop() {  
    @SuppressWarnings("removal")  
    SecurityManager security = System.getSecurityManager();  
    if (security != null) {  
        checkAccess();  
        if (this != Thread.currentThread()) {  
            security.checkPermission(SecurityConstants.STOP_THREAD_PERMISSION);  
        }  
    }  
    // A zero status value corresponds to "NEW", it can't change to  
    // not-NEW because we hold the lock.    if (threadStatus != 0) {  
        resume(); // Wake up thread if it was suspended; no-op otherwise  
    }  
	  
    // The VM can handle all thread states  
    stop0(new ThreadDeath());  
}

private native void stop0(Object o);

stop() 是调用了 stop0() 方法进行暂停线程的,stop0() 是一个 native 底层 JVM 方法,也就是从系统层面去停止一个线程。这种停止线程的方法是有损害的,如果当前线程未执行完就 stop() 了,会导致结果的不准确性。

使用共享变量终止

代码语言:javascript复制
public class Main {  
    static volatile boolean stop = false;  
	  
    public static void main(String[] args) throws InterruptedException {  
        Thread t1 = new Thread(new StopThread());  
        t1.start();  
        TimeUnit.SECONDS.sleep(2);  
        stop = true;  
    }  
	  
    static class StopThread implements Runnable {  
        @Override  
        public void run() {  
            while (!stop) {  
                System.out.println("持续运行中");  
                try {  
                    Thread.sleep(1000);  
                } catch (InterruptedException e) {  
                    throw new RuntimeException(e);  
                }  
            }  
        }  
    }  
}

通过共享变量来终止线程的好处是将终止的全权限交给了线程自己,该 while 循环会在结束后暂停,这样就不会导致结果不准确性的发生。

interrupt()方法

当其他线程通过调用当前线程的 interrupt() 方法,表示向当前线程打个招呼,告诉他可以中断线程的执行了,至于什么时候中断,取决于当前线程自己,是一个非常友好的中断线程方式。

代码语言:javascript复制
public class Main {  
    static int i;  
	// interrupt 是从 jvm 层面执行的
    public static void main(String[] args) throws InterruptedException {  
        Thread thread = new Thread(() -> {  
            // 需要判断中断标志  
            while (Thread.currentThread().isInterrupted()) {  
                i  ;  
            }  
        });  
        thread.start();  
        TimeUnit.SECONDS.sleep(3);
        // interrupt 这个属性由 false 变为 true
        thread.interrupt(); // 中断(友好的)  
    }  
}

Thread.currentThread().isInterrupt() 默认是 false。其中可以使用 interrupt 的情况为:

  1. while 循环是一种使用中断条件的情况
  2. 线程处于阻塞状态情况下通过 try catch 抛出 InterruptedException 异常来中断
    1. Thread.sleep(10000) 午休被突然叫醒干活
    2. wait() 线程等待阻塞方法是中断条件
    3. thread.join() 阻塞方法是中断条件

Thread.interrupted()

Thread.interrupted() 对设置中断标志的线程进行复位,并返回当前的中断状态。

代码语言:javascript复制
public class Main {  
    public static void main(String[] args) throws InterruptedException {  
        Thread thread = new Thread(() -> {
            while (true) {
                // true 表示被中断过
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println("before: "   Thread.currentThread().isInterrupted()); // true
                    Thread.interrupted(); // 复位
                    System.out.println("after: "   Thread.currentThread().isInterrupted()); // false
                }  
            }  
        });
        thread.start();
        TimeUnit.SECONDS.sleep(1);
        thread.interrupt();
    }  
}

0 人点赞