前言
本篇博文是《从0到1学习 Netty》中 NIO 系列的第五篇博文,主要内容是使用多线程对程序进行优化,充分利用 CPU 的能力,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
引入
这前几篇文章中,都是采用单线程进行设计,虽然可以运行,但是没有充分利用 CPU 的性能,并且如果有一个事件的处理时间较长,则会影响其他事件的处理。
例如,开发一个项目,如果团队只有一个全栈工程师,那么他需要先完成前端,再完成后端,只能按部就班的完成任务,如果前端开发遭遇困难,花费了很多时间,则会大大拉长项目开发周期,而如果一个团队里有前端工程师和后端工程师,则前后端的开发能同步进行,这样会大大提高开发效率。
同理,对之前的代码进行优化,分两组选择器:
- 选择一个线程配置一个选择器,作为 ‘Boss’,专门处理
accept
事件; - 创建多个线程(最好与 CPU 核心数一直),作为 ‘Worker’,每个线程配置一个选择器,轮流处理
read
**,**write
等事件
实现
1、创建一个 Boss 线程,负责处理 accept
事件类型:
Thread.currentThread().setName("Boss");
Selector boss = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(7999));
while (true) {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
}
}
}
2、创建 Worker
类,用于初始化 Worker 线程和 Selector,负责处理 read
事件类型:
class Worker implements Runnable{
private Thread thread;
private volatile Selector worker;
private String name;
public Worker(String name) {
this.name = name;
}
public void register() throws IOException {
this.thread = new Thread(this, this.name);
this.worker = Selector.open();
this.thread.start();
}
@Override
public void run() {
while (true) {
try {
this.worker.select();
Iterator<SelectionKey> iter = this.worker.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
但是这里会有个问题,每次进行 register()
的时候会新创建一个线程,但我们只想一个 Worker 对应一个线程,所以我们需要对上述代码进行优化,使用标志符来进行判断是否完成过初始化:
private volatile boolean start = false;
public void register() throws IOException {
if (!this.start) {
this.thread = new Thread(this, this.name);
this.selector = Selector.open();
this.thread.start();
this.start = true;
}
}
注意,this.worker = Selector.open();
与 this.thread.start();
不要写反了,不然之后运行会出现空指针异常:
Exception in thread "worker-0" java.lang.NullPointerException
at com.sidiot.netty.c3.MultiThreadServer$Worker.run(MultiThreadServer.java:75)
at java.base/java.lang.Thread.run(Thread.java:832)
3、将 Worker 进行关联,先创建一个 worker 线程:
代码语言:javascript复制Worker worker0 = new Worker("worker-0");
worker0.register();
while (true) {
...
while (iter.hasNext()) {
...
if (key.isAcceptable()) {
...
log.debug("connected... {}", sc.getRemoteAddress());
log.debug("before register {}", sc.getRemoteAddress());
sc.register(worker0.selector, SelectionKey.OP_READ, null);
log.debug("after register {}", sc.getRemoteAddress());
}
}
}
4、编写客户端:
代码语言:javascript复制public class MultiThreadClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 7999));
sc.write(Charset.defaultCharset().encode("Hello, World! --sidiot."));
System.in.read();
}
}
5、运行服务端和客户端,运行结果如下:
代码语言:javascript复制20:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:50612
20:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:50612
20:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:50612
发现 worker 并没有进行工作,或者说是客户端发送的数据并没有进入到 worker 的可读事件中,这是因为在 worker 的 run()
方法运行时,SocketChannel
还没有注册到 worker 的 selector 中,导致 worker 线程在 this.selector.select();
的位置发生了阻塞;
6、由于 sc.register
发生在 boss 线程中,而 select
发生在 worker 线程中,无法确定两个线程的执行顺序,因此需要把两步操作都放入一个线程中;
将 SocketChannel
传到到 Worker 的 register()
方法中:
public void register(SocketChannel sc) throws IOException {
if (!this.start) {
this.thread = new Thread(this, this.name);
this.selector = Selector.open();
this.thread.start();
this.start = true;
}
sc.register(this.selector, SelectionKey.OP_READ, null);
}
但这样还是不行的,因为 register()
方法还是在 boss 线程中执行,这就需要使用队列来完成线程间的通信了:
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public void register(SocketChannel sc) throws IOException {
...
this.queue.add(() -> {
try {
sc.register(this.selector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
});
this.selector.wakeup();
}
@Override
public void run() {
while (true) {
try {
this.selector.select();
Runnable task = this.queue.poll();
if (task != null) {
task.run();
}
Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
...
}
}
注意,这里需要 this.selector.wakeup();
来唤醒 selector 继续往下走;
还有另一种方法,参考代码点击这里;
7、将单线程 worker 转成多线程:
代码语言:javascript复制Worker[] workers = new Worker[4];
for (int i = 0; i < workers.length; i ) {
workers[i] = new Worker("worker-" i);
}
同时使用计数器来实现各个 worker 线程的轮询使用:
代码语言:javascript复制AtomicInteger index = new AtomicInteger();
while (true) {
...
while (iter.hasNext()) {
...
if (key.isAcceptable()) {
...
workers[index.getAndIncrement() % workers.length].register(sc);
}
}
}
运行结果:
代码语言:javascript复制22:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:54668
22:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:54668
22:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:54668
22:36:13 [DEBUG] [worker-0] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54668
-------- -------------------- all ------------------------ ----------------
position: [0], limit: [7]
-------------------------------------------------
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------- ------------------------------------------------- ----------------
|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|
-------- ------------------------------------------------- ----------------
22:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:54676
22:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:54676
22:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:54676
22:36:20 [DEBUG] [worker-1] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54676
-------- -------------------- all ------------------------ ----------------
position: [0], limit: [7]
-------------------------------------------------
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------- ------------------------------------------------- ----------------
|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|
-------- ------------------------------------------------- ----------------
22:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:54687
22:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:54687
22:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:54687
22:36:30 [DEBUG] [worker-0] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54687
-------- -------------------- all ------------------------ ----------------
position: [0], limit: [7]
-------------------------------------------------
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------- ------------------------------------------------- ----------------
|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|
-------- ------------------------------------------------- ----------------
后记
综上所述,多线程优化是在 Java NIO 中提高系统性能和响应能力的关键手段。通过引入并发处理机制、合理的线程管理策略以及有效的同步与通信机制,可以充分发挥 NIO 框架的优势,提升系统的效率与可扩展性。
以上就是 多线程优化 的所有内容了,希望本篇博文对大家有所帮助!
参考:
- Netty API reference;
- 黑马程序员Netty全套教程;