JEP 尝鲜系列 3 - 使用虚线程进行同步网络 IO 的不阻塞原理

2021-12-30 14:48:56 浏览数 (1)

相关 JEP:

  • JEP 353 Reimplement the Legacy Socket API
  • JEP 373 Reimplement the Legacy DatagramSocket API

使用虚线程进行网络 IO

Project Loom 主要目标是在 Java 平台上提供一种易于使用、高吞吐量的轻量级并发性和新的编程模型的 JVM 特性和API。这带来了许多有趣和令人兴奋的前景,其中之一是简化网络交互的代码的同时兼顾性能。现在的服务器能够处理打开的套接字连接的数量,远远超过它们能够支持的线程数量,这既带来了机遇,也带来了挑战。

但是不幸的是,编写与网络交互的可伸缩代码是很困难的。我们一般使用同步 API 的方式进行编码,但是在超过一定阈值之后,同步代码就迎来了瓶颈,很难进行伸缩。因为这样的API在执行 I/O 操作时会阻塞,而 I/O 操作又会将线程绑定起来,直到操作就绪,例如尝试从套接字读取数据但是当前并没有数据要读取的时候。目前的线程,在 Java 平台中是一个昂贵的资源,以至于无法等待 I/O 操作的完成再去释放。为了解决这个限制,我们通常使用异步 I/O 或 Ractor 框架,因为它们可以构造出在 I/O 操作中不用绑定线程的代码,而是在 I/O 操作完成或准备就绪时使用回调或事件通知线程进行处理。

使用异步和非阻塞 API 比使用同步 API 更具有挑战性,部分原因是用这些 API 写出来的代码是比较反人类的。同步API在很大程度上更容易使用;代码更易于编写、更容易阅读和更易于调试,调试的时候堆栈里面的信息大部分是有用的。但是如前所述,使用同步 API 的代码不能像异步代码那样伸缩扩展,因此我们必须做一个艰难的选择:选择更简单的同步代码,并接受它不会扩展;或者选择更可伸缩的异步代码,并处理所有的复杂性。两个都不是个好选择!Project Loom 主要就是要让同步代码也能灵活伸缩扩展。

在本文中,我们将查看 Java 平台的网络 API 在虚拟线程上被调用时是如何工作的。了解底层细节,我们才能更好地、更放心地使用虚拟线程(纤程)。

虚拟线程(纤程

在进一步研究之前,我们需要了解一下ProjectLoom中的新线程–虚拟线程(也可以称为纤程)。

虚拟线程是用户态线程,被 JVM 管理,而不是操作系统。虚拟线程占用的系统资源很少,一个 JVM 可以容纳百万量级的虚拟线程。特别适合于经常执行阻塞时间比较长,经常等待 IO 的任务。

平台线程(即目前 Java 平台的线程),是和操作系统内核线程一一对应的。平台线程通常拥有一个非常大的栈,以及其他的一些系统维护的资源。虚拟线程则使用一小组用作载体线程的平台线程。在虚拟线程中执行的代码通常不会知道底层承载的线程。锁和 I/O 操作是将承载线程从一个虚拟线程重新调度到另一个虚拟线程的调度点。虚拟线程可能会 parked(例如LockSupport.park()),从而使其无法调度。一个已 parked 的虚拟线程可能被取消(例如LockSupport.unpark(Thread)),这样重新启用了它的调度。

网络 API

Java 平台中主要有两种网络 API:

  1. 异步 - AsynchronousServerSocketChannelAsynchronousSocketChannel
  2. 同步 - java.net.Socketjava.net.ServerSocketjava.net.DatagramSocketjava.nio.channels.SocketChanneljava.nio.channels.ServerSocketChanneljava.nio.channels.DatagramChannel

第一类异步 API,创建启动在之后某个时间完成的 I/O 操作,可能在启动 I/O 操作的线程之外的线程上完成。根据定义,这些 API 不会导致阻塞的系统调用,因此在虚拟线程中运行时不需要特殊处理

第二类同步 API,从它们在虚拟线程中运行时的行为角度来看,它们更有趣。在这些 API 中,NIO channel 相关的可以配置成为非阻塞模式。这种 channel 通常使用 I/O 事件通知机制实现,例如注册到 Selector 上监听事件。类似于异步网络 API,在虚拟线程中执行不需要额外处理,因为 I/O 操作不自己调用阻塞的系统调用,这个调用留给了 Selector。最后,我们来看看将 channel 配置成为阻塞模式以及 java.net 相关 API 的情况(我们这里称这种 API 为同步阻塞 API)。同步 API 的语义要求 I/O 操作一旦启动,在调用线程中完成或失败,然后将控制权返回给调用方。但是,如果 I/O 操作“尚未准备好”怎么办呢?例如,目前没有数据可以读取。

同步阻塞 API

在虚拟线程中运行的 Java 同步网络 API 会将底层原生 Socket 切换到非阻塞模式。当 Java 代码启用一个 I/O 请求并且这个请求没有立即完成(原生 socket 返回 EAGAIN - 代表"未就绪"/“会阻塞”)的时候,这个底层 socket 会被注册到一个 JVM 内部事件通知机制(Poller),并且虚拟线程会被 parked。当底层 I/O 操作就绪的时候(有相关事件会到达 Poller),虚拟线程会 unparked 并且底层的 Socket 操作会被重试处理。

我们来用一个例子仔细看下这其中的原理,首先,我们需要下载 project loom 的 JDK(地址:http://jdk.java.net/loom/),并解压使用。

接下来编写代码:

代码语言:javascript复制
//Java 16 中的 Record 对象,可以理解为有包含两个 final 属性(url 和 response)的类
static record URLData (URL url, byte[] response) { }

static List<URLData> retrieveURLs(URL... urls) throws Exception {
  //创建虚拟线程线程池
  try (var executor = Executors.newVirtualThreadExecutor()) {
    //生成读取对每个 url 执行 getURL 方法的任务
    var tasks = Arrays.stream(urls)
            .map(url -> (Callable<URLData>)() -> getURL(url))
            .toList();
    //提交任务,等待并返回所有结果
    return executor.submit(tasks)
            .filter(Future::isCompletedNormally)
            .map(Future::join)
            .toList();
  }
}

//读取url的内容
static URLData getURL(URL url) throws IOException {
  try (InputStream in = url.openStream()) {
    return new URLData(url, in.readAllBytes());
  }
}

public static void main(String[] args) throws Exception {
    //访问 google,由于你懂得,会比较慢
    List<URLData> urlData = retrieveURLs(new URL("https://www.google.com/"));
}

我们使用 retrieveURLs 访问谷歌,肯定会很慢,来保证能采集到堆栈。同时,不能用 jstack 采集堆栈(目前 jstack 采集不到虚拟线程堆栈,只能采集到承载线程的堆栈),需要用 jcmd 命令中的 JavaThread.dump 采集。同时,为了能采集到我们想要的堆栈,我们需要一些小操作。

首先,我们在 getURL(URL url) 方法的第一行打断点,debug 到这里暂停。然后执行命令:

代码语言:javascript复制
> jps
25496 LoomThreadMain
12512 Jps

> jcmd 25496 JavaThread.dump threads.txt -overwrite

然后继续执行程序,再执行命令,采集虚拟线程执行 I/O 操作时候的堆栈:

代码语言:javascript复制
> jcmd 25496 JavaThread.dump threads2.txt -overwrite

我们查看threads.txt这个文件,其中我们关心的线程信息是:

代码语言:javascript复制
"main" #1
      java.base@17-loom/jdk.internal.misc.Unsafe.park(Native Method)
      java.base@17-loom/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue$Node.block(LinkedTransferQueue.java:470)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3470)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3441)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:669)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:616)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1286)
      java.base@17-loom/java.util.concurrent.ExecutorServiceHelper$BlockingQueueSpliterator.tryAdvance(ExecutorServiceHelper.java:197)
      java.base@17-loom/java.util.Spliterator.forEachRemaining(Spliterator.java:326)
      java.base@17-loom/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
      java.base@17-loom/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
      java.base@17-loom/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
      java.base@17-loom/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
      java.base@17-loom/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616)
      java.base@17-loom/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:622)
      java.base@17-loom/java.util.stream.ReferencePipeline.toList(ReferencePipeline.java:627)
      app//com.github.hashjang.LoomThreadMain.retrieveURLs(LoomThreadMain.java:43)
      app//com.github.hashjang.LoomThreadMain.main(LoomThreadMain.java:29)

"ForkJoinPool-1-worker-1" #27
      java.base@17-loom/java.lang.Continuation.run(Continuation.java:300)
      java.base@17-loom/java.lang.VirtualThread.runContinuation(VirtualThread.java:240)
      java.base@17-loom/java.lang.VirtualThread$$Lambda$25/0x0000000801053fc0.run(Unknown Source)
      java.base@17-loom/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
      java.base@17-loom/java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:373)
      java.base@17-loom/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
      java.base@17-loom/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1177)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1648)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1615)
      java.base@17-loom/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
      
"<unnamed>" #26 virtual
      java.base/java.util.concurrent.ConcurrentHashMap.transfer(ConcurrentHashMap.java:2431)
      java.base/java.util.concurrent.ConcurrentHashMap.addCount(ConcurrentHashMap.java:2354)
      java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1075)
      java.base/java.util.concurrent.ConcurrentHashMap.putIfAbsent(ConcurrentHashMap.java:1541)
      java.base/sun.util.locale.LocaleObjectCache.get(LocaleObjectCache.java:68)
      java.base/java.util.Locale.getInstance(Locale.java:841)
      java.base/java.util.Locale.forLanguageTag(Locale.java:1736)
      java.base/sun.util.locale.provider.LocaleProviderAdapter.toLocaleArray(LocaleProviderAdapter.java:323)
      java.base/sun.util.locale.provider.CalendarDataProviderImpl.getAvailableLocales(CalendarDataProviderImpl.java:63)
      java.base/java.util.spi.LocaleServiceProvider.isSupportedLocale(LocaleServiceProvider.java:217)
      java.base/sun.util.locale.provider.LocaleServiceProviderPool.findProviders(LocaleServiceProviderPool.java:306)
      java.base/sun.util.locale.provider.LocaleServiceProviderPool.getLocalizedObjectImpl(LocaleServiceProviderPool.java:274)
      java.base/sun.util.locale.provider.LocaleServiceProviderPool.getLocalizedObject(LocaleServiceProviderPool.java:256)
      java.base/sun.util.locale.provider.CalendarDataUtility.retrieveFirstDayOfWeek(CalendarDataUtility.java:76)
      java.base/java.util.Calendar.setWeekCountData(Calendar.java:3419)
      java.base/java.util.Calendar.<init>(Calendar.java:1612)
      java.base/java.util.GregorianCalendar.<init>(GregorianCalendar.java:738)
      java.base/java.util.Calendar$Builder.build(Calendar.java:1494)
      java.base/sun.util.locale.provider.CalendarProviderImpl.getInstance(CalendarProviderImpl.java:87)
      java.base/java.util.Calendar.createCalendar(Calendar.java:1697)
      java.base/java.util.Calendar.getInstance(Calendar.java:1661)
      java.base/java.text.SimpleDateFormat.initializeCalendar(SimpleDateFormat.java:680)
      java.base/java.text.SimpleDateFormat.<init>(SimpleDateFormat.java:624)
      java.base/java.text.SimpleDateFormat.<init>(SimpleDateFormat.java:603)
      java.base/sun.security.util.DisabledAlgorithmConstraints$DenyAfterConstraint.<clinit>(DisabledAlgorithmConstraints.java:695)
      java.base/sun.security.util.DisabledAlgorithmConstraints$Constraints.<init>(DisabledAlgorithmConstraints.java:424)
      java.base/sun.security.util.DisabledAlgorithmConstraints.<init>(DisabledAlgorithmConstraints.java:149)
      java.base/sun.security.ssl.SSLAlgorithmConstraints.<clinit>(SSLAlgorithmConstraints.java:49)
      java.base/sun.security.ssl.ProtocolVersion.<init>(ProtocolVersion.java:158)
      java.base/sun.security.ssl.ProtocolVersion.<clinit>(ProtocolVersion.java:41)
      java.base/sun.security.ssl.SSLContextImpl$AbstractTLSContext.<clinit>(SSLContextImpl.java:539)
      java.base/java.lang.Class.forName0(Native Method)
      java.base/java.lang.Class.forName(Class.java:375)
      java.base/java.security.Provider$Service.getImplClass(Provider.java:1937)
      java.base/java.security.Provider$Service.getDefaultConstructor(Provider.java:1968)
      java.base/java.security.Provider$Service.newInstanceOf(Provider.java:1882)
      java.base/java.security.Provider$Service.newInstanceUtil(Provider.java:1890)
      java.base/java.security.Provider$Service.newInstance(Provider.java:1865)
      java.base/sun.security.jca.GetInstance.getInstance(GetInstance.java:236)
      java.base/sun.security.jca.GetInstance.getInstance(GetInstance.java:164)
      java.base/javax.net.ssl.SSLContext.getInstance(SSLContext.java:184)
      java.base/javax.net.ssl.SSLContext.getDefault(SSLContext.java:110)
      java.base/javax.net.ssl.SSLSocketFactory.getDefault(SSLSocketFactory.java:83)
      java.base/javax.net.ssl.HttpsURLConnection.getDefaultSSLSocketFactory(HttpsURLConnection.java:334)
      java.base/javax.net.ssl.HttpsURLConnection.<init>(HttpsURLConnection.java:291)
      java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.<init>(HttpsURLConnectionImpl.java:81)
      java.base/sun.net.www.protocol.https.Handler.openConnection(Handler.java:62)
      java.base/sun.net.www.protocol.https.Handler.openConnection(Handler.java:57)
      java.base/java.net.URL.openConnection(URL.java:1093)
      java.base/java.net.URL.openStream(URL.java:1159)
      com.github.hashjang.LoomThreadMain.getURL(LoomThreadMain.java:48)
      com.github.hashjang.LoomThreadMain.lambda$retrieveURLs$0(LoomThreadMain.java:38)
      java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:295)
      java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
      java.base/java.util.concurrent.ThreadExecutor$TaskRunner.run(ThreadExecutor.java:385)
      java.base/java.lang.VirtualThread.run(VirtualThread.java:295)
      java.base/java.lang.VirtualThread$VThreadContinuation.lambda$new$0(VirtualThread.java:172)
      java.base/java.lang.Continuation.enter0(Continuation.java:372)
      java.base/java.lang.Continuation.enter(Continuation.java:365)

其中 "<unnamed>" #26 virtual 是我们程序中创建的虚拟线程,并且通过堆栈中可以看出,虚拟线程还没有处于 I/O 操作。通过线程堆栈也可以看出,这个虚拟线程的承载线程是 "ForkJoinPool-1-worker-1" #27. 可以看出虚拟线程默认的承载线程是 Java 8 之后默认会启动的 common ForkJoinPool 中的线程。并且是通过 Continuation 这个类执行虚拟线程的工作的。

查看threads2.txt

代码语言:javascript复制
"main" #1
      java.base@17-loom/jdk.internal.misc.Unsafe.park(Native Method)
      java.base@17-loom/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue$Node.block(LinkedTransferQueue.java:470)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3470)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3441)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:669)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:616)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1286)
      java.base@17-loom/java.util.concurrent.ExecutorServiceHelper$BlockingQueueSpliterator.tryAdvance(ExecutorServiceHelper.java:197)
      java.base@17-loom/java.util.Spliterator.forEachRemaining(Spliterator.java:326)
      java.base@17-loom/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
      java.base@17-loom/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
      java.base@17-loom/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
      java.base@17-loom/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
      java.base@17-loom/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616)
      java.base@17-loom/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:622)
      java.base@17-loom/java.util.stream.ReferencePipeline.toList(ReferencePipeline.java:627)
      app//com.github.hashjang.LoomThreadMain.retrieveURLs(LoomThreadMain.java:43)
      app//com.github.hashjang.LoomThreadMain.main(LoomThreadMain.java:29)
      
"ForkJoinPool-1-worker-1" #25
      java.base@17-loom/jdk.internal.misc.Unsafe.park(Native Method)
      java.base@17-loom/java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:449)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1719)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1616)
      java.base@17-loom/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

"Read-Poller" #41
      java.base@17-loom/sun.nio.ch.WEPoll.wait(Native Method)
      java.base@17-loom/sun.nio.ch.WEPollPoller.poll(WEPollPoller.java:64)
      java.base@17-loom/sun.nio.ch.Poller.poll(Poller.java:196)
      java.base@17-loom/sun.nio.ch.Poller.lambda$startPollerThread$0(Poller.java:66)
      java.base@17-loom/sun.nio.ch.Poller$$Lambda$89/0x00000008010e5168.run(Unknown Source)
      java.base@17-loom/java.lang.Thread.run(Thread.java:1521)
      java.base@17-loom/jdk.internal.misc.InnocuousThread.run(InnocuousThread.java:161)
      
"<unnamed>" #24 virtual
      java.base/java.lang.Continuation.yield(Continuation.java:402)
      java.base/java.lang.VirtualThread.yieldContinuation(VirtualThread.java:367)
      java.base/java.lang.VirtualThread.park(VirtualThread.java:534)
      java.base/java.lang.System$2.parkVirtualThread(System.java:2373)
      java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:60)
      java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:184)
      java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:212)
      java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:607)
      java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:331)
      java.base/java.net.Socket.connect(Socket.java:642)
      java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:299)
      java.base/sun.security.ssl.BaseSSLSocketImpl.connect(BaseSSLSocketImpl.java:174)
      java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:182)
      java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:497)
      java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:600)
      java.base/sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:266)
      java.base/sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:380)
      java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:189)
      java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1232)
      java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1120)
      java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:175)
      java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1653)
      java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1577)
      java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getInputStream(HttpsURLConnectionImpl.java:224)
      java.base/java.net.URL.openStream(URL.java:1159)
      com.github.hashjang.LoomThreadMain.getURL(LoomThreadMain.java:48)
      com.github.hashjang.LoomThreadMain.lambda$retrieveURLs$0(LoomThreadMain.java:38)
      java.base/java.util.concurrent.FutureTask.run(FutureTask.java:295)
      java.base/java.util.concurrent.ThreadExecutor$TaskRunner.run(ThreadExecutor.java:385)
      java.base/java.lang.VirtualThread.run(VirtualThread.java:295)
      java.base/java.lang.VirtualThread$VThreadContinuation.lambda$new$0(VirtualThread.java:172)
      java.base/java.lang.Continuation.enter0(Continuation.java:372)
      java.base/java.lang.Continuation.enter(Continuation.java:365)

通过这里的线程堆栈可以看出,虚拟线程在执行 I/O 操作的时候,会调用 java.lang.Continuation.yield 让出承载线程的资源(相当于 park 住了)。查看原来的承载线程 "ForkJoinPool-1-worker-1" #25,确实处于空闲状态了。那么 I/O 操作去哪里了呢?这就引出了这个线程 "Read-Poller" #41

这个线程是一个 JVM 共用的 read poller。它的核心逻辑是执行一个基本事件循环,监听所有的同步网络 read(网络读就绪),connect(发起网络连接就绪) 和 accept(接受网络连接就绪) 操作。当这些 I/O 操作就绪的时候,poller 会被通知,并且 unpark 对应的虚拟线程,使得虚拟线程继续执行。同时,除了 read poller,还有一个用于写事件的 write poller。

我是使用 Windows 进行测试的,在 Windows 中 poller 底层实现基于 wepoll,所以我们看到堆栈里面包含 WEPoll。对于 MacOS 则是 kqueue,对于 Linux 则是 epoll

poller 维护一个以虚拟线程的文件描述符为 key 的 map。当一个虚拟线程并将它的文件描述符注册到 poller 上的时候,会以虚拟线程的文件描述符为 key,虚拟线程本身为 value 放入这个 map。当 poller 的事件循环中的相关事件就绪的时候,通过事件中的虚拟线程文件描述符在 map 中找到对应的虚拟线程 unpark 之。

伸缩扩展性

如果简单来看,上面的设计与使用 NIO Channel 和 Selector 并没有太大的不同,NIO Channel 和 Selector 可以在许多服务器端框架和库中找到,例如 Netty。但是相对来说,NIO Channel 和 Selector 提供了一个更复杂的模型,用户代码必须实现事件循环并跨 I/O 边界维护应用程序逻辑,而虚拟线程则提供了一个更简单、更直观的编程模型,Java 平台负责跨 I/O 边界调度任务和维护对应的上下文。

如前文我们看到的那样,虚拟线程默认的承载线程就是 ForkJoinPool。这是一个非常适合虚拟线程的线程池,工作窃取算法能极致的调度运行虚拟线程。

结论

同步 Java 网络 API 已经被重新实现,相关的 JEP 包括 JEP 353 和 JEP 373. 在虚拟线程中运行时,不能立即完成的 I/O 操作将导致虚拟线程被 parked 。当 I/O 就绪时,虚拟线程将被 unparked。这个实现相对于当前的异步非阻塞 I/O 实现代码来看,更加简单易用,隐藏了很多业务不关心的实现细节。

0 人点赞