Virtual Thread 发布于

2023-10-21 11:42:46 浏览数 (2)

Abstract

In traditional Java high-concurrency projects, context switching of threads brings huge memory overhead. The emergence of thread pools solved these problems to some extent, but Java threads still could not escape the control of the OS. Thus, virtual threads came into being.

We all know that blocking a platform thread is expensive. And this is why so many coders try to avoid that at all cost, and why you end up writing reactive code, based on callbacks. Reactive code works great, but it makes your code hard to read and hard to test, this is clearly not an ideal situation.

Now, with the JDK 19 and virtual threads, blocking a thread becomes so cheap, that coders do not need to write reactive code anymore. Our good old blocking synchronous code is just fine. Today we are going to learn more about how all this is working, and why we should not fear about blocking a thread anymore.

How to create a virtual thread?

Super simple in fact, we can use Thread.ofVirtual() factory method to create a virtual thread. And from then on, we can create either a started thread, directly, or an unstarted thread.

代码语言:javascript复制
static void VirtualThreadCase() throws InterruptedException {
	// Thread -> Thread-0
	var pthread =
			Thread.ofPlatform()
					.unstarted(() -> System.out.println(Thread.currentThread()));
	pthread.start();
	pthread.join();
	
	/*
	 * virtual threads are running on top of platform threads
	 */
	// VirtualThread -> runnable@ForkJoinPool-1-worker-1
	var vthread =
			Thread.ofVirtual()
					.unstarted(() -> System.out.println(Thread.currentThread()));
	vthread.start();
	vthread.join();
}

Then, we can call start() to run this virtual thread, because it is an unstarted thread. And if you want to see a result in the console you'd better call join() for instance, to make sure that the main thread will not die, and with it the application dies, before this task had a chance to run.

Using a fork/join pool to run virtual thread

Virtual thread is based on ForkJoinPool, thry are part of the fork/join framework, added to the JDK 7 in 2011. In fact, the parallel Stream from the JDK 8 are working on top of this framework.

A fork join pool is basically a pool of threads. We already have pools of threads in the JDK called the executors, or the executor services, so why we need another one?

Well, fork/join pools do not work the same as executor services. An executor service has one single wait list to store their tasks, in which threads are going to take them. On the other hand, a fork/join pool has one wait list per thread. When a task, ran by a thread, generates another task, this task is added to the wait list of this thread.

This is what happens when a big task forks two smaller tasks when you are running a parallel stream. To prevent the so called "thread starvation" problem, that happens when a thread has no more tasks in its wait list, the fork/join pool also implements another pattern, called "Work Stealing".

代码语言:javascript复制
static void ForkJoinPoolCase() throws InterruptedException {
	// Thread -> ForkJoinPool.commonPool-worker-1
	var task =
			ForkJoinPool.commonPool()
					.submit(() -> System.out.println(Thread.currentThread()));
	task.join();
}

Making a virtual thread jump from one platform thread to another

You may think there is no performance in virtual thread processing. Now, let's make something wonderful.

Here we create 10 virtual, unstarted threads, with this stream pattern. And the task that these threads are running is just: print the current thread, then we put then to sleep for 10 milliseconds, and then print the name of the thread again.

代码语言:javascript复制
static void VirtualCalcCase() throws InterruptedException {  
    var threads = IntStream.range(0, 10)  
            .mapToObj(  
                    index -> Thread.ofVirtual().unstarted(() -> {  
                        if (index == 0) {  
                            System.out.println(Thread.currentThread());  
                        }  
                        try {  
                            Thread.sleep(10);  
                        } catch (InterruptedException e) {  
                            throw new RuntimeException(e);  
                        }  
                        if (index == 0) {  
                            System.out.println(Thread.currentThread());  
                        }  
                    })  
            )  
            .toList();  
    // make sure that everything can be seen in console  
    threads.forEach(Thread::start);  
    for (Thread thread : threads) {  
        thread.join();  
    }  
}

This case may print in console like below:

The same virtual thread is running first on top of the thread 1 of this fork/join pool, and then, when it comes back from sleeping, it is running on the top of the thread 8 instead of thread 1.

What happened in this virtual thread case?

So in fact, it could jump from one platform thread to the other. How can a virtual thread jump from one thread to the other? Well, at the core of this, there is an object called a Continuation. Let's see what is taking place in this sleep() method.

代码语言:javascript复制
if (currentThread() instanceof VirtualThread vthread) {  
    long nanos = MILLISECONDS.toNanos(millis);  
    vthread.sleepNanos(nanos);  
    return;  
}

We can see that there is a special processing if this thread is virtual or if it is not virtual. If the current thread is not virtual it will go inside the else part below:

代码语言:javascript复制
if (ThreadSleepEvent.isTurnedOn()) {  
    ThreadSleepEvent event = new ThreadSleepEvent();  
    try {  
        event.time = MILLISECONDS.toNanos(millis);  
        event.begin();  
        sleep0(millis);  
    } finally {  
        event.commit();  
    }  
} else {  
    sleep0(millis);  
}
Sleep Nanos

Then it will call a native sleep0() method like a normal thread calls sleep() method. But if the current thread is in fact a virtual thread, then the method sleepNanos() is going to be called in VirtualThread class. Let's check it out:

代码语言:javascript复制
void sleepNanos(long nanos) throws InterruptedException {
	assert Thread.currentThread() == this;
	if (nanos >= 0) {
		if (ThreadSleepEvent.isTurnedOn()) {
			ThreadSleepEvent event = new ThreadSleepEvent();
			try {
				event.time = nanos;
				event.begin();
				doSleepNanos(nanos);
			} finally {
				event.commit();
			}
		} else {
			doSleepNanos(nanos);
		}
	}
}

Inside this method, we can see that this method has a call to doSleepNanos(), so let's visit it:

代码语言:javascript复制
private void doSleepNanos(long nanos) throws InterruptedException {
	assert nanos >= 0;
	if (getAndClearInterrupt())
		throw new InterruptedException();
	if (nanos == 0) {
		tryYield();
	} else {
		// park for the sleep time
		try {
			long remainingNanos = nanos;
			long startNanos = System.nanoTime();
			while (remainingNanos > 0) {
				parkNanos(remainingNanos);
				if (getAndClearInterrupt()) {
					throw new InterruptedException();
				}
				remainingNanos = nanos - (System.nanoTime() - startNanos);
			}
		} finally {
			// may have been unparked while sleeping
			setParkPermit(true);
		}
	}
}
Yield

When nanos is 0, this method will have a call to tryYield() method, try to attempt to yield. Then this method is going to have call to yieldContinuation() method.

代码语言:javascript复制
void tryYield() {  
    assert Thread.currentThread() == this;  
    setState(YIELDING);  
    try {  
        yieldContinuation();  
    } finally {  
        assert Thread.currentThread() == this;  
        if (state() != RUNNING) {  
            assert state() == YIELDING;  
            setState(RUNNING);  
        }  
    }  
}

@ChangesCurrentThread  
private boolean yieldContinuation() {  
    boolean notifyJvmti = notifyJvmtiEvents;  
	
    // unmount  
    if (notifyJvmti) notifyJvmtiUnmountBegin(false);  
    unmount();  
    try {  
        return Continuation.yield(VTHREAD_SCOPE);
    } finally {  
        // re-mount  
        mount();  
        if (notifyJvmti) notifyJvmtiMountEnd(false);  
    }  
}

Now we can see Continuation Continuation.yield(VTHREAD_SCOPE) which we metioned at the beginning. This is the real reason why the virtual thread switches after calling the sleep() method.

Continuation

This continuation object is at the core of it. We can have a simple processing to show how this continuation object is working. To use Continuation class, we have to add an vm option to our project --add-exports java.base/jdk.internal.vm=ALL-UNNAMED.

代码语言:javascript复制
static void ContinuationExample() {  
    ContinuationScope scope = new ContinuationScope("scope");  
    Continuation continuation =  
            new Continuation(  
                    scope,  
                    () -> System.out.println("Running"));  
    System.out.println("Start");  
    continuation.run();  
    System.out.println("Done");  
}

A continuation works with an instance of ContinuationScope, so we created one here, and pass it to Continuation. The second argument is just a Runnable.

When we run this code with the proper options for the JVM, we can see that calling in fact the run() method on this continuation object just executes the Runnable this continuation is built on, which is not really exciting.

But now, let's call Continuation.yield() and pass this scope as an argument:

代码语言:javascript复制
Continuation continuation =  
        new Continuation(  
                scope,  
                () -> {  
                    System.out.println("Running");  
                    Continuation.yield(scope);    
					System.out.println("Still running");
                });

And now if we run this code again, we can see that this "Still running" message is not displayed on the console.

So, in fact, uielding a continuation suspends its execution. We can print something at the end of this method, and call run() again on this same continuation object.

代码语言:javascript复制
System.out.println("Start");  
continuation.run();  
System.out.println("Back");  
continuation.run();  
System.out.println("Done");

And now we can see something that is really unexpected. The execution of this Runnable continued after the call to yield().

Now we understand that, this yield() can suspend the execution of a task, that can be continued if you call run() again.

How is it working under the hood?

In fact, when a continuation yields its task, the corresponding stack is moved from the platform thread it is running on to the heap memory.

So now this platform thread is free to run another virtual thread. When this task gets the signal that it can continue running, then its stack is moved back from the heap to a platform thread, but not necessarily the same one.

So this is the price of blocking a virtual thread: moving the stack of this virtual thread to the main memory and back. Blocking a virtual thread is not free, nothing is ever free, we all know that, but it is much much cheaper than blocking a platform thread.

The nice thing is: all the blocking operations of the JDK have been refactored to leverage that. That includes the I/O operations, synchronization, and Thread.sleep().

How many platform threads can satisfy virtual threads?

How many platform threads do we need to run our virtual threads? We can have a test on it. We create virtual threads, and gather all the corresponding platform thread names. The code above is basically launching 5 virtual threads, and then it extracts the pool name and the platform thread name using some fancy regular expressions. And then, at the end of the day it just prints the different statics, the time it takes to run this code, the number of cores we have on our CPU, the number of thread pools and the number of platform threads.

代码语言:javascript复制
public class VirtualThreadCase {
    private static final Pattern POOL_PATTERN = Pattern.compile("ForkJoinPool");
    private static final Pattern WORKER_PATTERN = Pattern.compile("ForkJoinPool-\d -worker-\d ");
	
    public static void main(String[] args) throws InterruptedException {
        PerformanceTest();
    }
	
    static void PerformanceTest() throws InterruptedException {
        Set<String> poolNames = ConcurrentHashMap.newKeySet();
        Set<String> pThreadsName = ConcurrentHashMap.newKeySet();
		
        var threads = IntStream.range(0, 5)
                .mapToObj(i -> Thread.ofVirtual()
                        .unstarted(() -> {
                            String poolName = readPoolName();
                            poolNames.add(poolName);
                            String workerName = readWorkerName();
                            pThreadsName.add(workerName);
                        }))
                .toList();
        Instant begin = Instant.now();
        threads.forEach(Thread::start);
        for (Thread thread : threads) {
            thread.join();
        }
        Instant end = Instant.now();
        System.out.println("Time = "   Duration.between(begin, end).toMillis()   "ms");
        System.out.println("# Cores = "   Runtime.getRuntime().availableProcessors());
        System.out.println("# Pools = "   poolNames.size());
        System.out.println("# Platform threads = "   pThreadsName.size());
    }
	
    static String readWorkerName() {
        String name = Thread.currentThread().toString();
        Matcher workerMatcher = WORKER_PATTERN.matcher(name);
        if (workerMatcher.find()) {
            return workerMatcher.group();
        }
        return "not found";
    }
	
    static String readPoolName() {
        String name = Thread.currentThread().toString();
        Matcher poolMatcher = POOL_PATTERN.matcher(name);
        if (poolMatcher.find()) {
            return poolMatcher.group();
        }
        return "pool not found";
    }
}

For 5 threads, it uses about 3 or 4 platform threads.

Let's use 10 now, and run it again. 10 threads is still 3 or 4 platform threads.

Let's use 100. Now it's 10 platform threads.

Maybe we can also use 1000, 10_000 or 100_000. Still the same number of platform threads between 10 and 12.

It sounds like we have a pattern here. And by the way, even if these threads are not doing much, just some regular expressions and adding elements in a concurrent set, you can see that it takes only 35ms to run all these threads. Let's go up to 1_000_000 threads.

Less than one second, and still 12 platform threads. How about 10 milion threads?

It just takes a little less 12 seconds. This is just amazing. And this is what Java Loom is doing.

When a virtual is pinned to a platform thread?

There is one thing that can prevent a virtual thread stack to be moved to the heap memory though. We know that in the C language, we can get an address on the stack, by writing this ampersand character(&) in front of a variable.

And this is a promble here: because if you move this stack to somewhere else, and try to put it back on another platform thread, there is very little chance that this address will still be valid.

So if you have some C code in your virtual thread stack, or at least an address on this stack, this stack will be pinned to your platform thread, and it may block it. If your code is not blocking, or blocking for a short amount of time, then it's ok, the performance hit will be minimal. But if this code blocks for hundreds of milliseconds, then you may want to do something about this situation.

There are several places in the JDK where C code calls have already been replaced by Java code. This is the case for the Method class for instance, that has been refactored to use method handles as part of the JEP 416.

Comparing classical synchronization and reentrant lock

Let's see something about how things are working with synchronized blocks.

代码语言:javascript复制
public class VirtualThreadCase {
    static int counter = 0;
    static final Object lock = new Object();
	
    public static void main(String[] args) throws InterruptedException {
        Set<String> pThreadsName = ConcurrentHashMap.newKeySet();
        ChronoUnit delay = ChronoUnit.MICROS;
        var threads = IntStream.range(0, 100)
                .mapToObj(index -> Thread.ofVirtual()
                        .unstarted(() -> {
                            try {
                                if (index == 0) {
                                    System.out.println(Thread.currentThread());
                                }
                                pThreadsName.add(readWorkerName());
                                synchronized (lock) {
                                    Thread.sleep(Duration.of(1, delay));
                                    counter  ;
                                }
                                if (index == 0) {
                                    System.out.println(Thread.currentThread());
                                }
                                pThreadsName.add(readWorkerName());
                                synchronized (lock) {
                                    Thread.sleep(Duration.of(1, delay));
                                    counter  ;
                                }
                                if (index == 0) {
                                    System.out.println(Thread.currentThread());
                                }
                                pThreadsName.add(readWorkerName());
                                synchronized (lock) {
                                    Thread.sleep(Duration.of(1, delay));
                                    counter  ;
                                }
                                if (index == 0) {
                                    System.out.println(Thread.currentThread());
                                }
                                pThreadsName.add(readWorkerName());
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }))
                .toList();
        threads.forEach(Thread::start);
        for (Thread thread : threads) {
            thread.join();
        }
        synchronized (lock) {
            System.out.println("# counter = "   counter);
        }
        System.out.println("# Platform threads = "   pThreadsName.size());
    }
}

At the end, we display the value of the counter, which should be 300, and then display the number of platform threads that have been used.

Indeed the value of the counter is 300, the number of used platform threads is the same: 12. But what we can see is that, our virtual thread could not jump from one platform thread to the other. Why could this happened?

Because this synchronized stuff is in fact using an address on the stack, Loom knows that, and it pins this virtual thread to one platform thread.

Now, let's run the exact same code with ReentrantLock instead of synchronized blocks. We can see that now, the basic synchronization element of this code is based on the use of a ReentrantLock instead of the classical synchronized block.

代码语言:javascript复制
static void ReentrantBlock() throws InterruptedException {  
    Set<String> pThreadsName = ConcurrentHashMap.newKeySet();  
    ChronoUnit delay = ChronoUnit.MICROS;  
    var threads = IntStream.range(0, 100)  
            .mapToObj(index -> Thread.ofVirtual()  
                    .unstarted(() -> {  
                        try {  
                            if (index == 0) {  
                                System.out.println(Thread.currentThread());  
                            }  
                            pThreadsName.add(readWorkerName());  
                            reentrant.lock();  
                            try {  
                                Thread.sleep(Duration.of(1, delay));  
                                counter  ;  
                            } finally {  
                                reentrant.unlock();  
                            }  
                            if (index == 0) {  
                                System.out.println(Thread.currentThread());  
                            }  
                            pThreadsName.add(readWorkerName());  
                            reentrant.lock();  
                            try {  
                                Thread.sleep(Duration.of(1, delay));  
                                counter  ;  
                            } finally {  
                                reentrant.unlock();  
                            }  
                            if (index == 0) {  
                                System.out.println(Thread.currentThread());  
                            }  
                            pThreadsName.add(readWorkerName());  
                            reentrant.lock();  
                            try {  
                                Thread.sleep(Duration.of(1, delay));  
                                counter  ;  
                            } finally {  
                                reentrant.unlock();  
                            }  
                            if (index == 0) {  
                                System.out.println(Thread.currentThread());  
                            }  
                            pThreadsName.add(readWorkerName());  
                        } catch (InterruptedException e) {  
                            throw new RuntimeException(e);  
                        }  
                    }))  
            .toList();  
    threads.forEach(Thread::start);  
    for (Thread thread : threads) {  
        thread.join();  
    }  
    synchronized (lock) {  
        System.out.println("# counter = "   counter);  
    }  
    System.out.println("# Platform threads = "   pThreadsName.size());  
}

Now we can see that this virtual thread may jump from one platform thread to the other: 1, 7, 12.

The synchronized block itself is not really calling any C code, but it needs addresses on the stack to work, so the promblem is similar. If our synchronized code is not blocking, then it's ok. But if it is, and you experience performance issues, then you may try to use ReentrantLock instead, and see if it improves your situation.

What happen if too many platform threads are blocked? Well, the ForkJoinPool can detect that, and will create more platform threads, at least temporarily. And of course, it comes tieh a perfirmance hit.

Then what is coroutine?

The comparison between Java virtual threads and Go coroutines is a common one, given that both mechanisms aim to provide lightweight concurrency. However, there are some key differences and similarities between the two:

Similarities

  1. Lightweight: Both virtual threads and coroutines are designed to be lightweight compared to traditional threads. This means we can spawn thousands or even millions of them without exhausting system resources.
  2. Concurrency: Both mechanisms are designed to make concurrent programming easier and more efficient.
  3. Cooperative Scheduling: Both virtual threads and coroutines use cooperative scheduling. This means that they yield control voluntarily, rather than being preemptively scheduled by the OS.

Differences

  1. Implementation Details: The most significant difference is in how they're implemented. As we mentioned earlier, when a Java virtual thread yields, its stack is moved from the platform thread to the heap memory. This allows the platform thread to run another virtual thread. When the virtual thread is resumed, its stack is moved back to a platform thread. In contrast, coroutines in Go have their own stack that grows and shrinks as needed, but there isn't a concept of moving this stack to and from the heap in the same way.
  2. Runtime Environment: Coroutines run within the Go runtime, which has its own scheduler that multiplexes coroutines onto OS threads. Java virtual threads, on the other hand, are a feature of the JVM and are scheduled onto platform threads.
  3. Blocking: In Java, blocking a virtual thread has the cost of moving its stack to and from the heap. In Go, blocking a goroutine doesn't have this exact cost, but if a coroutine blocks (e.g., on I/O), the Go runtime can schedule another goroutine on the same OS thread.
  4. Maturity and Ecosystem: Virtual threads in Java were still an experimental feature (part of Project Loom). Go coroutines, on the other hand, have been a fundamental part of the language since its inception and have a mature ecosystem around them.

Outro

So we shouldn't simply compare Java virtual threads to Golang coroutines. While Java Virtual Threads and Go Coroutines both aim to address the challenges of concurrency and provide a more scalable and efficient way to handle many tasks simultaneously, their underlying implementations and behaviors are distinct.

Given these differences, while it's tempting to draw direct comparisons, it's more accurate to say that both mechanisms offer solutions to similar problems but do so in their unique ways. When choosing between them or comparing them, it's essential to consider the specific requirements of the task at hand and the nuances of each approach.

Reference

[1] Java. 2022. Launching 10 millions virtual threads with Loom - JEP Café #12. Retrieved Aug 20, 2023, from https://www.youtube.com/watch?v=UVoGE0GZZPI

[2] Ron Pressler, Alan Bateman. 2021. JEP 425: Virtual Threads (Preview). Retrieved Aug 20, 2023, from https://openjdk.org/jeps/425

[3] 猿java. 2023. Java终于发布了"协程"--虚拟线程,原来上手这么简单. Retrieved Aug 21, 2023, from https://zhuanlan.zhihu.com/p/579732019

0 人点赞