需求
- 编写一个NIO群聊系统,实现服务器端和客户端之间的数据简单通讯,非阻塞
- 实现多人群聊
- 服务器端: 可以监测用户上线, 离线, 并实现消息转发功能
- 客户端: 通过Channel可以无阻塞发送消息给其他用户,同时可以接受其他用户发送的消息(由服务器转发得到)
- 目的: 进一步了解NIO非阻塞网络编程机制
- 示意图分析和代码
示意图
编码
Server
代码语言:javascript复制package com.dance.netty.nio.demo.groupchat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
public class GroupChatServer {
/**
* 选择器
*/
private Selector selector;
/**
* 服务器 Channel
*/
private ServerSocketChannel serverSocketChannel;
/**
* 服务器端口号
*/
private static final int PORT = 6667;
public GroupChatServer() {
try {
// 初始化参数
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 监听
*/
public void listener() {
for (; ; ) {
try {
int eventCount = selector.select(10000);
// 有事件要处理
if (eventCount > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
System.out.println("用户: " socketChannel.getRemoteAddress().toString().substring(1) " 上线了");
}
if (selectionKey.isReadable()) {
readData(selectionKey);
}
iterator.remove();
}
}
System.out.println("等待事件发生......");
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 处理读事件
*
* @param selectionKey channel id
*/
public void readData(SelectionKey selectionKey) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
try {
int readSize = 0;
int readSizeCount = 0;
byte[] dist = new byte[0];
// 为了处理大于1024的数据 采用循环读取
while ((readSize = socketChannel.read(buffer)) > 0) {
readSizeCount = readSize;
byte[] src = buffer.array();
buffer.flip();
// 为了防止汉子断裂,采用字节数组合并
dist = mergerByteArray(dist, dist.length, src, buffer.limit());
}
String message = new String(dist);
System.out.println("用户: " socketChannel.getRemoteAddress().toString().substring(1) " , 发送: " message);
sendMessageToOtherClients(message, selectionKey);
} catch (IOException e) {
// e.printStackTrace();
try {
System.out.println("用户: " socketChannel.getRemoteAddress().toString().substring(1) " 离线了");
// 从selector中取消注册
selectionKey.cancel();
// 关闭通道
socketChannel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
/**
* 发送给其他客户端
*
* @param message 消息
* @param selectionKey form client key
*/
public void sendMessageToOtherClients(String message, SelectionKey selectionKey) {
// 获取所有的在线客户端
Set<SelectionKey> keys = selector.keys();
// 排除自己
keys.stream().filter(key -> !key.equals(selectionKey)).filter(key -> key.channel() instanceof SocketChannel).forEach(key -> {
SocketChannel socketChannel = (SocketChannel) key.channel();
// 因为不知道 字节数组的长度 所以采用新的Buffer
ByteBuffer wrap = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
try {
int write = socketChannel.write(wrap);
System.out.println("输出数据给:" socketChannel.getRemoteAddress().toString().substring(1));
} catch (IOException e) {
e.printStackTrace();
}
});
}
/**
* byte array merger
*
* @param left 左边的字节数组
* @param leftLength 要合并的结束坐标
* @param right 右边的字节数组
* @param rightLength 要合并的结束坐标
* @return 合并后的字节数组
*/
public byte[] mergerByteArray(byte[] left, int leftLength, byte[] right, int rightLength) {
byte[] bytes = new byte[leftLength rightLength];
System.arraycopy(left, 0, bytes, 0, leftLength);
System.arraycopy(right, 0, bytes, leftLength, rightLength);
return bytes;
}
public static void main(String[] args) {
GroupChatServer groupChatServer = new GroupChatServer();
groupChatServer.listener();
}
}
client
代码语言:javascript复制package com.dance.netty.nio.demo.groupchat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;
public class GroupChatClient {
private final String IP = "127.0.0.1";
private final int PORT = 6667;
private Selector selector;
private SocketChannel socketChannel;
private String username;
public GroupChatClient() {
try {
selector = Selector.open();
socketChannel = SocketChannel.open(new InetSocketAddress(IP, PORT));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username " init success......");
} catch (IOException e) {
e.printStackTrace();
}
}
public void sendMessage(String message) {
message = username " : " message;
try {
int write = socketChannel.write(ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
e.printStackTrace();
}
}
public void readMessage() {
int select = 0;
try {
select = selector.select();
if (select > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(buffer);
if (read > 0) {
buffer.flip();
String msg = new String(buffer.array(), 0, buffer.limit());
System.out.println(msg);
}
}
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
GroupChatClient groupChatClient = new GroupChatClient();
new Thread(() -> {
while (true) {
groupChatClient.readMessage();
}
}).start();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String s = scanner.nextLine();
groupChatClient.sendMessage(s);
}
}
}
测试
- 启动Server
- 启动三个客户端
第一个
第二个
Server提示
第三个
客户端上线提示OK
- 发送消息
第一个客户端发送消息
第二个
第三个
- 第二个客户端回复
第一个
第三个
多人群聊功能实现 ok
- 第三个客户端下线
server端提示
用户下线提示ok