文章目录- 一 多线程编程
- 线程安全
- 产生原因
- 解决线程安全问题
- synchronized 同步代码块
- synchronized 同步代码块
- 线程间通信
- 两个线程交替运行
- 生产者消费者
- 二 bio编程
- 三 nio编程
- 文件IO
- 往本地文件中写数据
- 从本地文件中读数据
- 复制文件
- 网络IO
- nio网路通信demo
- 多人网络聊天
- AIO
- IO 对比总结
- 线程安全
- 产生原因
- 解决线程安全问题
- synchronized 同步代码块
- synchronized 同步代码块
- 线程间通信
- 两个线程交替运行
- 生产者消费者
- 文件IO
- 往本地文件中写数据
- 从本地文件中读数据
- 复制文件
- 网络IO
- nio网路通信demo
- 多人网络聊天
- AIO
- IO 对比总结
一 多线程编程
线程是比进程更小的能独立运行的基本单位,它是进程的一部分,一个进程可以拥有多个线程,但至少要有一个线程,即主执行线程(Java 的 main 方法)。我们既可以编写单线程应用,也可以编写多线程应用。 一个进程中的多个线程可以并发(同时)执行,在一些执行时间长、需要等待的任务上(例如:文件读写和网络传输等),多线程就比较有用了。 怎么理解多线程呢?来两个例子:
- 进程就是一个工厂,一个线程就是工厂中的一条生产线,一个工厂至少有一条生产线,只有一条生产线就是单线程应用,拥有多条生产线就是多线程应用。多条生产线可以同时运行。
- 我们使用迅雷可以同时下载多个视频,迅雷就是进程,多个下载任务就是线程,这几个线程可以同时运行去下载视频。
多线程可以共享内存、充分利用 CPU,通过提高资源(内存和 CPU)使用率从而提高程序的执行效率。CPU 使用抢占式调度模式在多个线程间进行着随机的高速的切换。对于 CPU的一个核而言,某个时刻,只能执行一个线程,而 CPU 在多个线程间的切换速度相对我们 的感觉要快很多,看上去就像是多个线程或任务在同时运行。 Java 天生就支持多线程并提供了两种编程方式,一个是继承 Thread 类,一个是实现Runnable 接口
线程安全
产生原因
多个线程操作的是同一个共享资源,但是线程之间是彼此独立、互相隔绝的,因此就会出现数据(共享资源)不能同步更新的情况,这就是线程安全问题
代码语言:javascript复制package com.xiepanpan.thread.safe;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 销售窗口 有线程安全问题
*/
public class SaleWindow implements Runnable{
private int id = 10;
/**
* 卖10张火车票
*/
public void run() {
for (int i = 0; i < 10; i ) {
if (id>0) {
System.out.println(Thread.currentThread().getName() "卖了编号为" id "的火车票");
id--;
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
代码语言:javascript复制package com.xiepanpan.thread.safe;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 两个线程随机售票 出现线程安全问题
*/
public class TestSaleWindow {
public static void main(String[] args) {
SaleWindow saleWindow = new SaleWindow();
Runnable target;
Thread t1 = new Thread(saleWindow);
Thread t2 = new Thread(saleWindow);
t1.setName("窗口A");
t2.setName("窗口B");
t1.start();
t2.start();
}
}
解决线程安全问题
Java 中提供了一个同步机制(锁)来解决线程安全问题,即让操作共享数据的代码在某一时间段,只被一个线程执行(锁住),在执行过程中,其他线程不可以参与进来,这样共享数据就能同步了。简单来说,就是给某些代码加把锁。 锁是什么?又从哪儿来呢?锁的专业名称叫监视器 monitor,其实 Java 为每个对象都自动内置了一个锁(监视器 monitor),当某个线程执行到某代码块时就会自动得到这个对象的锁,那么其他线程就无法执行该代码块了,一直要等到之前那个线程停止(释放锁)。需要 特别注意的是:多个线程必须使用同一把锁(对象)。 Java 的同步机制提供了两种实现方式:
- 同步代码块:即给代码块上锁,变成同步代码块
- 同步方法:即给方法上锁,变成同步方法
synchronized 同步代码块
代码语言:javascript复制package com.xiepanpan.thread.safe;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 销售窗口 同步代码块
*/
public class SaleWindow1 implements Runnable{
private int id = 10;
/**
* 卖10张火车票
*/
public void run() {
for (int i = 0; i < 10; i ) {
synchronized (this) {
if (id>0) {
System.out.println(Thread.currentThread().getName() "卖了编号为" id "的火车票");
id--;
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
代码语言:javascript复制package com.xiepanpan.thread.safe;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 两个线程随机售票 使用同步代码块解决线程安全问题
*/
public class TestSaleWindow1 {
public static void main(String[] args) {
SaleWindow1 saleWindow1 = new SaleWindow1();
Runnable target;
Thread t1 = new Thread(saleWindow1);
Thread t2 = new Thread(saleWindow1);
t1.setName("窗口A");
t2.setName("窗口B");
t1.start();
t2.start();
}
}
synchronized 同步代码块
代码语言:javascript复制package com.xiepanpan.thread.safe;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 销售窗口 使用同步方法 默认使用this作为锁
*/
public class SaleWindow2 implements Runnable{
private int id = 10;
/**
* 卖10张火车票
*/
public void run() {
for (int i = 0; i < 10; i ) {
saleOne();
}
}
private synchronized void saleOne() {
if (id>0) {
System.out.println(Thread.currentThread().getName() "卖了编号为" id "的火车票");
id--;
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
代码语言:javascript复制package com.xiepanpan.thread.safe;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 两个线程随机售票 使用同步方法解决线程安全问题
*/
public class TestSaleWindow2 {
public static void main(String[] args) {
SaleWindow2 saleWindow2 = new SaleWindow2();
Runnable target;
Thread t1 = new Thread(saleWindow2);
Thread t2 = new Thread(saleWindow2);
t1.setName("窗口A");
t2.setName("窗口B");
t1.start();
t2.start();
}
}
通过查看源码,我们发现 StringBuffer 和 Vector 类中的大部分方法都是同步方法,所以证明这两个类在使用时是保证线程安全的;而StringBuilder和ArrayList类中的方法都是普通方法, 没有使用 synchronized 关键字进行修饰,所以证明这两个类在使用时不保证线程安全。线程安全和性能之间不可兼得,保证线程安全就会损失性能,保证性能就不能满足线程安全。
线程间通信
多个线程并发执行时, 在默认情况下 CPU 是随机性的在线程之间进行切换的,但是有时候我们希望它们能有规律的执行, 那么,多线程之间就需要一些协调通信来改变或控制 CPU的随机性。Java 提供了等待唤醒机制来解决这个问题,具体来说就是多个线程依靠一个同步 锁,然后借助于 wait()和 notify()方法就可以实现线程间的协调通信。 同步锁相当于中间人的作用,多个线程必须用同一个同步锁(认识同一个中间人),只有同一个锁上的被等待的线程,才可以被持有该锁的另一个线程唤醒,使用不同锁的线程之间不能相互唤醒,也就无法协调通信。
Java 在 Object 类中提供了一些方法可以用来实现线程间的协调通信,我们一起来了解 一下:
- public final void wait(); 让当前线程释放锁
- public final native void wait(long timeout); 让当前线程释放锁,并等待 xx 毫秒
- public final native void notify(); 唤醒持有同一锁的某个线程
- public final native void notifyAll(); 唤醒持有同一锁的所有线程
需要注意的是:在调用 wait 和 notify 方法时,当前线程必须已经持有锁,然后才可以调用,否则将会抛出 IllegalMonitorStateException 异常。接下来咱们通过两个案例来演示一下具体如何编程实现线程间通信。
两个线程交替运行
两个线程交替输出信息
为了保证两个线程使用的一定是同一个锁,我们创建一个对象作为静态属性放到一个类中,这个对象就用来充当锁
代码语言:javascript复制package com.xiepanpan.thread.communication.num;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 锁对象
*/
public class MyLock {
public static Object object = new Object();
}
该线程输出十次 1,使用 MyLock.o 作为锁,每输出一个 1 就唤醒另一个线程,然后自己休眠并释放锁。
代码语言:javascript复制package com.xiepanpan.thread.communication.num;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 输出数字1的线程
*/
public class ThreadForNumOne extends Thread{
public void run() {
for (int i = 0; i < 10; i ) {
synchronized (MyLock.object) {
System.out.println(1);
MyLock.object.notify();
try {
MyLock.object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
代码语言:javascript复制package com.xiepanpan.thread.communication.num;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 输出数字2的线程
*/
public class ThreadForNumTwo extends Thread{
public void run() {
for (int i = 0; i < 10; i ) {
synchronized (MyLock.object) {
System.out.println(2);
MyLock.object.notify();
try {
MyLock.object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
该线程输出十次 2,也使用 MyLock.o 作为锁,每输出一个 2 就唤醒另一个线程,然后自己休眠并释放锁。
代码语言:javascript复制package com.xiepanpan.thread.communication.num;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 测试两个线程交替执行
*/
public class TestThreadForNum {
public static void main(String[] args) {
new ThreadForNumOne().start();
new ThreadForNumTwo().start();
}
}
生产者消费者
生产者是一堆线程,消费者是另一堆线程,内存缓冲区可以使用 List 集合存储数据。该模式的关键之处是如何处理多线程之间的协调通信,内存缓冲区为空的时候,消费者必须等待,而内存缓冲区满的时候,生产者必须等待,其他时候可以是个动态平衡。
这里实现一个生产者一个消费者
定义一个静态集合作为内存缓冲区用来存储数据,同时这个集合也可以作为锁去被多个线程使用。
代码语言:javascript复制package com.xiepanpan.thread.communication.producerconsumer;
import java.util.ArrayList;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 篮子
*/
public class Basket {
public static ArrayList<String> basket = new ArrayList<String>();
}
生产者不断的往集合(筐)里放水果,当筐满了就停,同时释放锁。
代码语言:javascript复制package com.xiepanpan.thread.communication.producerconsumer;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 农民类 相当于生产者
*/
public class Farmer extends Thread{
public void run() {
while (true) {
synchronized (Basket.basket) {
//篮子满了就不要放了 让农夫休息一哈
if(Basket.basket.size()==10) {
try {
Basket.basket.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//往篮子里放水果
Basket.basket.add("apple");
System.out.println("农夫放了一个水果,目前篮子里有" Basket.basket.size() "个水果");
//唤醒小孩继续吃
Basket.basket.notify();
}
//模拟控制速度
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费者不断的从集合(筐)里拿水果吃,当筐空了就停,同时释放锁。
代码语言:javascript复制package com.xiepanpan.thread.communication.producerconsumer;
import java.util.Random;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 孩子类 相当于消费者
*/
public class Child extends Thread{
@Override
public void run() {
while (true) {
synchronized (Basket.basket) {
//篮子里没有水果 让小孩休息一哈
if (Basket.basket.size()==0 ) {
try {
Basket.basket.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//小孩吃水果
Basket.basket.remove("apple");
System.out.println("小孩吃了一个水果 目前篮子里有" Basket.basket.size() "个水果");
//唤醒农夫继续放水果
Basket.basket.notify();
}
try {
Random random = new Random();
Thread.sleep(random.nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
代码语言:javascript复制package com.xiepanpan.thread.communication.producerconsumer;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: 测试模拟生产者消费者模式 生产者消费者一对一
*/
public class Test {
public static void main(String[] args) {
new Farmer().start();
new Child().start();
}
}
二 bio编程
BIO 有的称之为 basic(基本) IO,有的称之为 block(阻塞) IO,主要应用于文件 IO 和网络 IO,这里不再说文件 IO, 大家对此都非常熟悉,本次课程主要讲解网络 IO。 在 JDK1.4 之前,我们建立网络连接的时候只能采用 BIO,需要先在服务端启动一个ServerSocket,然后在客户端启动 Socket 来对服务端进行通信,默认情况下服务端需要对每个请求建立一个线程等待请求,而客户端发送请求后,先咨询服务端是否有线程响应,如果没有则会一直等待或者遭到拒绝,如果有的话,客户端线程会等待请求结束后才继续执行,这就是阻塞式 IO。
服务器端程序,绑定端口号 9999,accept 方法用来监听客户端连接,如果没有客户端连接,就一直等待,程序会阻塞到这里。
代码语言:javascript复制package com.xiepanpan.bio;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: bio 服务端程序 先启动
*/
public class TCPServer {
public static void main(String[] args) throws IOException {
// 创建ServerSocket 对象
ServerSocket serverSocket = new ServerSocket(9999);
while (true) {
//监听客户端
System.out.println("启动服务端。。");
//①这里阻塞
Socket socket = serverSocket.accept();
System.out.println("已连接客户端");
//从连接中取出输入流的接收消息
//②阻塞
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[10];
inputStream.read(bytes);
String clientIp = serverSocket.getInetAddress().getHostAddress();
System.out.println(clientIp "说:" new String(bytes).trim());
//从连接中取出输出流并回复
OutputStream outputStream = socket.getOutputStream();
outputStream.write("收到".getBytes());
//关闭
socket.close();
}
}
}
客户端程序,通过 9999 端口连接服务器端,getInputStream 方法用来等待服务器端返回数据,如果没有返回,就一直等待,程序会阻塞到这里。
代码语言:javascript复制package com.xiepanpan.bio;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Scanner;
/**
* @author: xiepanpan
* @Date: 2020/8/3
* @Description: bio 客户端程序
*/
public class TCPClient {
public static void main(String[] args) throws IOException {
while(true) {
//创建socket对象
Socket socket = new Socket("127.0.0.1",9999);
//从连接中取出输出流并发送消息
OutputStream outputStream = socket.getOutputStream();
System.out.println("请输入:");
Scanner scanner = new Scanner(System.in);
String msg = scanner.nextLine();
outputStream.write(msg.getBytes());
//从连接中取出输入流并接收回话
//①阻塞
InputStream inputStream = socket.getInputStream();
byte[] bytes= new byte[20];
inputStream.read(bytes);
System.out.println("长安说:" new String(bytes).trim());
}
}
}
三 nio编程
java.nio 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO)。新增了许多用于处理输入输出的类,这些类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写,新增了满足 NIO 的功能。
NIO 和 BIO 有着相同的目的和作用,但是它们的实现方式完全不同,BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多。另外,NIO 是非阻塞式的,这一点跟 BIO 也很不相同,使用它可以提供非阻塞式的高伸缩性网络。
NIO 主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)。
传统的 BIO基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。
文件IO
缓冲区(Buffer):实际上是一个容器,是一个特殊的数组,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer,如下图所示:
在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类,常用的 Buffer 子类有:
- ByteBuffer,存储字节数据到缓冲区
- ShortBuffer,存储字符串数据到缓冲区
- CharBuffer,存储字符数据到缓冲区
- IntBuffer,存储整数数据到缓冲区
- LongBuffer,存储长整型数据到缓冲区
- DoubleBuffer,存储小数到缓冲区
- FloatBuffer,存储小数到缓冲区
对于 Java 中的基本数据类型,都有一个 Buffer 类型与之相对应,最常用的自然是ByteBuffer 类(二进制数据),该类的主要方法如下所示:
- public abstract ByteBuffer put(byte[] b); 存储字节数据到缓冲区
- public abstract byte[] get(); 从缓冲区获得字节数据
- public final byte[] array(); 把缓冲区数据转换成字节数组
- public static ByteBuffer allocate(int capacity); 设置缓冲区的初始容量
- public static ByteBuffer wrap(byte[] array); 把一个现成的数组放到缓冲区中使用
- public final Buffer flip(); 翻转缓冲区,重置位置到初始位置
通道(Channel):类似于 BIO 中的 stream,例如 FileInputStream 对象,用来建立到目标(文件,网络套接字,硬件设备等)的一个连接,但是需要注意:BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel)是双向的,既可以用来进行读操作,也可以用来进行写操作。
常用的 Channel 类有:FileChannel、DatagramChannel、ServerSocketChannel 和 SocketChannel。FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写。
往本地文件中写数据
代码语言:javascript复制/**
* 往本地文件中写数据
*/
@Test
public void test1() throws IOException {
//1. 创建文件输出流
File file;
FileOutputStream fileOutputStream = new FileOutputStream("test.txt");
//2. 从流中得到一个通道
FileChannel channel = fileOutputStream.getChannel();
//3. 提供一个缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//4. 往缓冲区中存入数据
String string = "hello,nio";
byteBuffer.put(string.getBytes());
//5. 翻转缓冲区
byteBuffer.flip();
//6. 把缓冲区写到通道中
channel.write(byteBuffer);
//7. 关闭流
fileOutputStream.close();
}
NIO 中的通道是从输出流对象里通过 getChannel 方法获取到的,该通道是双向的,既可以读,又可以写。在往通道里写数据之前,必须通过 put 方法把数据存到 ByteBuffer 中,然后通过通道的 write 方法写数据。在 write 之前,需要调用 flip 方法翻转缓冲区,把内部重置到初始位置,这样在接下来写数据时才能把所有数据写到通道里。
从本地文件中读数据
代码语言:javascript复制/**
* 从本地文件中读取数据
* @throws FileNotFoundException
*/
@Test
public void test2() throws IOException {
//1. 创建输入流
File file = new File("test.txt");
FileInputStream fileInputStream = new FileInputStream(file);
//2. 得到一个通道
FileChannel channel = fileInputStream.getChannel();
//3. 准备一个缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
//4. 从通道里读取数据并保存到缓冲区中
channel.read(byteBuffer);
System.out.println(new String(byteBuffer.array()));
//5. 关闭
fileInputStream.close();
}
上述代码从输入流中获得一个通道,然后提供 ByteBuffer 缓冲区,该缓冲区的初始容量和文件的大小一样,最后通过通道的 read 方法把数据读取出来并存储到了 ByteBuffer 中。
复制文件
代码语言:javascript复制/**
* 使用NIO实现文件复制
* @throws IOException
*/
@Test
public void test3() throws IOException {
//1. 创建两个流
FileInputStream fileInputStream = new FileInputStream("test.txt");
FileOutputStream fileOutputStream = new FileOutputStream("D:\test.txt");
//2. 得到两个通道
FileChannel sourceFileChannel = fileInputStream.getChannel();
FileChannel destFileChannel = fileOutputStream.getChannel();
//3. 复制
destFileChannel.transferFrom(sourceFileChannel,0,sourceFileChannel.size());
//4. 关闭
fileInputStream.close();
fileOutputStream.close();
}
通过 BIO 复制一个视频文件:
代码语言:javascript复制 @Test
public void test3() throws Exception{
FileInputStream fis=new FileInputStream("C:\Users\zdx\Desktop\oracle.mov");
FileOutputStream fos=new FileOutputStream("d:\oracle.mov");
byte[] b=new byte[1024];
while (true) {
int res=fis.read(b);
if(res==-1){
break;
}
fos.write(b,0,res);
}
fis.close();
fos.close();
}
上述代码分别通过输入流和输出流实现了文件的复制,这是通过传统的 BIO 实现的
通过 NIO 复制相同的视频文件,代码如下所示:
代码语言:javascript复制 @Test
public void test4() throws Exception{
FileInputStream fis=new FileInputStream("C:\Users\zdx\Desktop\oracle.mov");
FileOutputStream fos=new FileOutputStream("d:\oracle.mov");
FileChannel sourceCh = fis.getChannel();
FileChannel destCh = fos.getChannel();
destCh.transferFrom(sourceCh, 0, sourceCh.size());
sourceCh.close();
destCh.close();
}
上述代码分别从两个流中得到两个通道,sourceCh 负责读数据,destCh 负责写数据,然后直接调用 transferFrom 方法一步到位实现了文件复制。
网络IO
文件 IO 时用到的 FileChannel 并不支持非阻塞操作,学习 NIO 主要就是进行网络 IO,Java NIO 中的网络通道是非阻塞 IO 的实现,基于事件驱动,非常适用于服务器需要维持大量连接,但是数据交换量不大的情况,例如一些即时通信的服务等等… 在 Java 中编写 Socket 服务器,通常有以下几种模式:
- 一个客户端连接用一个线程,优点:程序编写简单;缺点:如果连接非常多,分配的线程也会非常多,服务器可能会因为资源耗尽而崩溃。
- 把每一个客户端连接交给一个拥有固定数量线程的连接池,优点:程序编写相对简单,可以处理大量的连接。确定:线程的开销非常大,连接如果非常多,排队现象会比较严重。
- 使用 Java 的 NIO,用非阻塞的 IO 方式处理。这种模式可以用一个线程,处理大量的客户端连接。
- Selector(选择器),能够检测多个注册的通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接。这样使得只有在连接真正有读写事件发生时,才会调用函数来进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程,并且避免了多线程之间的上下文切换导致的开销。
该类的常用方法如下所示:
- public static Selector open(),得到一个选择器对象
- public int select(long timeout),监控所有注册的通道,当其中有 IO 操作可以进行时,将对应的 SelectionKey 加入到内部集合中并返回,参数用来设置超时时间
public Set<SelectionKey> selectedKeys()
,从内部集合中得到所有的 SelectionKey
- SelectionKey,代表了 Selector 和网络通道的注册关系,一共四种:
- int OP_ACCEPT:有新的网络连接可以 accept,值为 16
- int OP_CONNECT:代表连接已经建立,值为 8
- int OP_READ 和 int OP_WRITE:代表了读、写操作,值为 1 和 4
该类的常用方法如下所示:
- public abstract Selector selector(),得到与之关联的 Selector 对象
- public abstract SelectableChannel channel(),得到与之关联的通道
- public final Object attachment(),得到与之关联的共享数据
- public abstract SelectionKey interestOps(int ops),设置或改变监听事件
- public final boolean isAcceptable(),是否可以 accept
- public final boolean isReadable(),是否可以读
- public final boolean isWritable(),是否可以写
- ServerSocketChannel,用来在服务器端监听新的客户端 Socket 连接,常用方法如下所示:
- public static ServerSocketChannel open(),得到一个 ServerSocketChannel 通道
- public final ServerSocketChannel bind(SocketAddress local),设置服务器端端口号
- public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
- public SocketChannel accept(),接受一个连接,返回代表这个连接的通道对象
- public final SelectionKey register(Selector sel, int ops),注册一个选择器并设置监听事件
- SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 总是把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。常用方法如下所示:
- public static SocketChannel open(),得到一个 SocketChannel 通道
- public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式, 取值 false 表示采用非阻塞模式
- public boolean connect(SocketAddress remote),连接服务器
- public boolean finishConnect(),如果上面的方法连接失败,接下来就要通过该方法完成 连接操作
- public int write(ByteBuffer src),往通道里写数据
- public int read(ByteBuffer dst),从通道里读数据
- public final SelectionKey register(Selector sel, int ops, Object att),注册一个选择器并设置 监听事件,最后一个参数可以设置共享数据
- public final void close(),关闭通道
nio网路通信demo
代码语言:javascript复制package com.xiepanpan.nio.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
/**
* @author: xiepanpan
* @Date: 2020/8/4
* @Description: nio实现网络通信 服务端
*/
public class NIOServer {
public static void main(String[] args) throws IOException {
//1、 得到一个ServerSocketChannel 老大
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//2、 得到一个Selector对象 间谍
Selector selector = Selector.open();
//3. 绑定端口
serverSocketChannel.bind(new InetSocketAddress(9999));
//4、设置非阻塞模式
serverSocketChannel.configureBlocking(false);
//5. 把ServerSocketChannel对象注册给Selector对象
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//6. 干活
while (true) {
//监控客户端
if (selector.select(2000)==0) {
System.out.println("server: 没有客户端搭理我 我干别的事");
continue;
}
//得到SelectionKey 判断通道里的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()){
//客户端连接请求事件
System.out.println("OP_ACCEPT");
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024));
}
//读取客户端数据事件
if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
channel.read(byteBuffer);
System.out.println("客户端发来数据:" new String(byteBuffer.array()));
}
//手动从集合中移除当前key 防止重复处理
iterator.remove();
}
}
}
}
代码语言:javascript复制package com.xiepanpan.nio.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* @author: xiepanpan
* @Date: 2020/8/4
* @Description: nio实现网络通信 客户端
*/
public class NIOClient {
public static void main(String[] args) throws IOException {
//1. 得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
//2. 设置非阻塞方式
socketChannel.configureBlocking(false);
//3、 提供服务端的ip地址和端口号
String host;
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 9999);
//4. 连接服务器端
if (!socketChannel.connect(inetSocketAddress)) {
while (!socketChannel.finishConnect()) {
System.out.println("Client:连接服务器的同时 我还可以干别的事情");
}
}
//5. 得到一个缓冲区并存入数据
String msg = "hello server";
ByteBuffer wrap = ByteBuffer.wrap(msg.getBytes());
//6、发送数据
socketChannel.write(wrap);
System.in.read();
}
}
多人网络聊天
聊天程序的服务器端,可以接受客户端发来的数据,并能把数据广播给所有客户端
代码语言:javascript复制package com.xiepanpan.nio.chat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
/**
* @author: xiepanpan
* @Date: 2020/8/5
* @Description: nio实现多人聊天 服务端
*/
public class ChatServer {
//监听通道 (老大)
private ServerSocketChannel serverSocketChannel;
//选择器对象 (间谍)
private Selector selector;
private static final int PORT =9999;
public ChatServer() throws IOException {
//得到监听通道
serverSocketChannel = ServerSocketChannel.open();
selector = Selector.open();
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
//将选择器绑定到监听通道并监听accept事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
printInfo("chat server is ready...");
}
public static void main(String[] args) throws IOException {
new ChatServer().start();
}
/**
* 往控制台打印消息
* @param string
*/
private void printInfo(String string) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("[" simpleDateFormat.format(new Date()) "]" string);
}
private void start() throws IOException {
while (true) {
if (selector.select(2000) == 0) {
System.out.println("server: 没有客户端找我 我就干别的事情");
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ);
System.out.println(socketChannel.getRemoteAddress().toString().substring(1) "上线了。。。");
}
//读数据
if (selectionKey.isReadable()) {
readMsg(selectionKey);
}
//
iterator.remove();
}
}
}
private void readMsg(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int count = socketChannel.read(byteBuffer);
if (count>0) {
String msg = new String(byteBuffer.array());
printInfo(msg);
//广播消息
broadCast(socketChannel,msg);
}
}
/**
* 向所有客户端发送广播
* @param socketChannel
* @param msg
*/
private void broadCast(SocketChannel socketChannel, String msg) throws IOException {
System.out.println("服务器发送了广播");
for (SelectionKey selectionKey:selector.keys()) {
Channel targetChannel = selectionKey.channel();
//是socketChannel 但不是自身 消息不广播给自己
if (targetChannel instanceof SocketChannel && targetChannel!=socketChannel) {
SocketChannel destChannel = (SocketChannel) targetChannel;
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
destChannel.write(byteBuffer);
}
}
}
}
聊天程序的客户端,可以向服务器端发送数据,并能接收服务器广播的数据。
代码语言:javascript复制package com.xiepanpan.nio.chat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* @author: xiepanpan
* @Date: 2020/8/5
* @Description: nio实现多人聊天 客户端
*/
public class ChatClient {
//服务器地址
private final String HOST = "127.0.0.1";
private int PORT =9999;
//网络通道
private SocketChannel socketChannel;
//用户名
private String username;
public ChatClient() throws IOException {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
InetSocketAddress inetSocketAddress = new InetSocketAddress(HOST, PORT);
if (!socketChannel.connect(inetSocketAddress)) {
while (!socketChannel.finishConnect()) {
System.out.println("Client:连接服务器端的同时 我还可以干别的事情");
}
}
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println("=======client" username " is ready========");
}
/**
* 向服务端发送数据
* @param msg
*/
public void sendMsg(String msg) throws IOException {
//发送 bye表示聊天结束
if (msg.equalsIgnoreCase("bye")) {
socketChannel.close();
return;
}
msg = username "说:" msg;
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
socketChannel.write(byteBuffer);
}
/**
* 从服务器端接收数据
*/
public void receiveMsg() throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int size = socketChannel.read(byteBuffer);
if (size>0) {
String msg = new String(byteBuffer.array());
System.out.println(msg.trim());
}
}
}
代码语言:javascript复制package com.xiepanpan.nio.chat;
import java.io.IOException;
import java.util.Scanner;
/**
* @author: xiepanpan
* @Date: 2020/8/5
* @Description: 多人聊天测试类
*/
public class TestChat {
public static void main(String[] args) throws IOException {
final ChatClient chatClient = new ChatClient();
new Thread(){
@Override
public void run() {
while (true) {
try {
chatClient.receiveMsg();
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}.start();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
chatClient.sendMsg(msg);
}
}
}
配置这里可以启动多个客户端
AIO
JDK 7 引入了 Asynchronous I/O,即 AIO。在进行 I/O 编程中,常用到两种模式:Reactor和 Proactor。Java 的 NIO 就是 Reactor,当有事件触发时,服务器端得到通知,进行相应的处理。 AIO 即 NIO2.0,叫做异步不阻塞的 IO。AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,一个有效的请求才启动一个线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。
IO 对比总结
IO 的方式通常分为几种:同步阻塞的 BIO、同步非阻塞的 NIO、异步非阻塞的 AIO。
- BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,但程序直观简单易理解。
- NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4 开始支持。
- AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。
举个例子再理解一下:
- 同步阻塞:你到饭馆点餐,然后在那等着,啥都干不了,饭馆没做好,你就必须等着!
- 同步非阻塞:你在饭馆点完餐,就去玩儿了。不过玩一会儿,就回饭馆问一声:好了没啊!
- 异步非阻塞:饭馆打电话说,我们知道您的位置,一会给你送过来,安心玩儿就可以了,类似于现在的外卖。
对比总结 | BIO | NIO | AIO |
---|---|---|---|
IO方式 | 同步阻塞 | 同步非阻塞(多路复用) | 异步非阻塞 |
API使用难度 | 简单 | 复杂 | 复杂 |
可靠性 | 差 | 好 | 好 |
吞吐量 | 低 | 高 | 高 |