即时通讯
最近在做IM(即时通讯:Instant Messaging)项目(基于网易云信)的升级改造, 虽然核心的通讯采用第三方SDK,但是对于即时通讯这一块还是产生了不少疑惑, 这篇文章主要记录自己对于实现聊天双方的即时通讯的思考。
作为一名Java开发,网络协议这一块自己接触最多的就是应用层的HTTP,它作为半双工通讯是无法实现 即时的效果,即使客户端采用轮询方式访问服务端产生的消息,也并不是真正意义上的即时,那么如何实现是我一开始最纠结的问题,纠结的点在于即时通讯,不光是要考虑Web端,还有APP应用。如果实现Web端的即时通讯,我们可以想到使用WebSocket协议(全双工)实现,但是APP应用像QQ, 微信他们是如何达到即时的效果,以QQ软件为例,它的应用层协议是私有协议,传输层协议采用UDP, 应用层使用私协议的好处就是高效,节约流量(一般使用二进制协议),安全性高,难以破解,很考验设计者 的能力,传输层采用UDP协议,UDP协议与TCP协议相比更快,少了三次握手这些环节,但是它不稳定, 无法保证消息能被接收(腾讯好像是在应用层上下了功夫来保障消息不会丢失),到这里,通过QQ实协议的选取,我想通讯的过程应该是(个人猜测):用户A登录,基于UDP协议,与腾讯服务器建立连接,服务器记住当前登录 用户的IP地址(这个地址通常是经过过NAT(网络地址转换)得到的 ,并非局域网地址,在互联网中,如果我们内网访问外网, 通常在通过网络出口的时候,被替换IP报文头部的地址信息,这种方式可以节约IPV4的地址,也就是 同一个NAT下的内网主机,在访问外网地址时,使用的是同一个地址信息,只是端口不同),当腾讯服务器记录 用户A的IP地址和端口号时,表明链接建立,此时用户A想与好友B进行通讯,发送了一条消息(这里不考虑离线), 那么消息会通过服务器转发给B,转发的方式也就是通过UDP发送报文到B的地址。这里我简单用自己想法使用Java基于UDP写了 一段代码(文章末尾的代码一),这样也就是实现了即时通讯(服务器通过UDP协议对客服端转发消息)
除了服务器转发之外,我们应该还需要知道另外一种方案P2P,也就是点对点通讯,通常情况下,一般的即时通讯, 文本消息是通过服务器转发,多媒体数据(视频电话这些)采用P2P的方案,这里我们仍然以UDP协议为例,想一下如何实现点对点的通讯, 与服务器转发不同,点对点的通讯是可能需要内网穿透的(不同局域网下的主机A访问到主机B),我们内网访问公网IP(腾讯的服务器)是容易的, 但是我们要实现访问内网却不是那么简单, 我想大家都听过内网穿透,打洞这些词(一个含义),他们就是实现P2P功能的方案,在说使用UDP实现打洞前,我们看一下上面说的NAT这个概念, 我们家庭/公司中一般都会有路由器,路由器有三种工作模式:NAT模式,路由模式,全模式,三者区别我就不提了,通常情况下,我们路由处于NAT模式, NAT模式按照方式有有这几种类型:全锥形NAT,限制锥形NAT,端口限制锥形NAT,对称型NAT。
如果需要打洞的两台主机在同一个NAT下,那么就没有打洞的必要,使用局域网IP即可完成P2P,即使用打洞可能也没效果, 一般路由器会拒绝掉这种回路的UDP包,
而如果主机不在同一个NAT下,那么就有打洞的必要了(对称型NAT有些特殊,无法完成打洞,因为每次建立连接端口都会发生变化, 难以实现P2P,补充方案是:端口预测)
下面再说打洞的步骤
代码语言:javascript复制主机A,主机B,服务器S
主机A发送请求给服务器S,建立连接 A <-> S
主机B发送请求给服务器S,建立连接 B <-> S
主机A发送请求给服务器S,与B进行P2P连接
服务器S发送打洞请求给B,B主动发送一条消息给A,此时由于A所处的NAT不信任B,拒绝B发送包,但是此时由于B给A发消息,那么B的NAT信任A。
A发送P2P连接请求,此时B <-> A之间打通
参考文章:https://blog.csdn.net/jdh99/article/details/6667648
说到这里其实整理一下就会明白,打洞就是完成相互信任一个过程,当我们在不同的NAT下,通过服务器得到对方的IP和端口,然后得到对方NAT信任, 再通过UDP进行连接通讯。在这些NAT类型中,对称型NAT特殊是因为,每次他的端口都不一样,我们很难预测进行通讯时他的端口是什么,也就无法完成打洞, 通常情况下,我们家庭路由不是对称型NAT,但是电脑连接手机4G开启热点,就是对称型NAT。
想要鉴别自己所处的NAT类型是不是对称型,其实很简单, 如下图,手机热点访问对两个公网地址发包,得到的自己的IP地址是一样的,但是端口不同。对两个公网IP进行UDP广播,如果两个主机拿到的端口不相同那就是对称型NAT。但是如果端口变化可以预测,那么应该也可以打洞成功(但是我用手机热点, 使用UDP打洞没有成功过)。
打洞的完整代码(文章末尾的代码二)
代码一
服务端
代码语言:javascript复制DatagramChannel channel = (DatagramChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
SocketAddress receive = channel.receive(byteBuffer);
//记录地址
map.put("用户X", receive);
客户端
代码语言:javascript复制String msg = "X:登录建立连接"
DatagramPacket packet = new DatagramPacket(msg.getBytes(), msg.getBytes().length,
new InetSocketAddress("IP", 8091));
服务端转发消息
代码语言:javascript复制public static void sendMessage(SocketAddress socketAddress, String msg) throws Exception {
if (socketAddress != null) {
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
server.send(byteBuffer, socketAddress);
} else {
System.out.println("群发数据");
}
}
客户端发送消息
代码语言:javascript复制DatagramPacket packet2 = new DatagramPacket("A:发送给B".getBytes(),
"A:发送给B".getBytes().length,
new InetSocketAddress(address[0],
Integer.parseInt(address[1])));
socket.send(packet2);
代码二
服务端:
代码语言:javascript复制package com.studyjava.email.udp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class UDPService {
//保存连接的地址
private static Map<String, SocketAddress> map = new ConcurrentHashMap<>();
//数据报
private static DatagramChannel server = null;
//选择器
private static Selector selector = null;
static {
try {
server = DatagramChannel.open().bind(new InetSocketAddress(8091));
server.configureBlocking(false);
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void sendMessage(SocketAddress socketAddress, String msg) throws Exception {
if (socketAddress != null) {
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
server.send(byteBuffer, socketAddress);
} else {
System.out.println("群发数据");
}
}
public static String readMsg(SelectionKey selectionKey) throws Exception {
DatagramChannel channel = (DatagramChannel) selectionKey.channel();
System.out.println(channel.getRemoteAddress() ":" channel.getLocalAddress());
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
SocketAddress receive = channel.receive(byteBuffer);
byteBuffer.flip();
String msg = decode(byteBuffer);//new String(byteBuffer.array(), "utf-8");
System.out.println("收到消息:" msg);
String notices = "";
System.out.println(msg.charAt(0));
if ((msg.charAt(0) == 'A' && msg.charAt(msg.length() - 1) == 'B') || msg.charAt(0) == 'B' && msg.charAt(msg.length() - 1) == 'A') {
notices = "B->A" map.get("A");
sendMessage(map.get("B"), notices);
notices = "A->B" map.get("B");
sendMessage(map.get("A"), notices);
System.out.println("A客服端连接B,通知B进行打洞");
System.out.println("B客服端连接A,通知A进行打洞");
} else if (msg.charAt(0) == 'A') {
System.out.println("A客户端注册连接");
notices = "SELF" receive;
sendMessage(receive, notices);
map.put("A", receive);
} else if (msg.charAt(0) == 'B') {
System.out.println("B客户端注册连接");
notices = "SELF" receive;
sendMessage(receive, notices);
map.put("B", receive);
} else {
sendMessage(null, msg);
}
return msg;
}
public static String decode(ByteBuffer bb) {
Charset charset = Charset.forName("utf-8");
return charset.decode(bb).toString();
}
public static void main(String[] args) throws Exception {
System.out.println("启动UDP");
server.register(selector, SelectionKey.OP_READ);
while (true) {
if (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
readMsg(selectionKey);
iterator.remove();
}
}
}
}
}
}
客户端A和B(使用同一份代码即可,通过Scanner输入判断A和B)
代码语言:javascript复制package com.studyjava.email.udp;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Enumeration;
import java.util.Scanner;
public class UDPClientOne {
public static DatagramSocket socket = null;
private static String self = "";
public static void main(String[] args) throws Exception {
try {
socket = new DatagramSocket();
Scanner scanner = new Scanner(System.in);
String msg;
new Thread(() -> {
try {
while (true) {
byte[] arr = new byte[1024];
DatagramPacket packet = new DatagramPacket(arr, arr.length);
socket.receive(packet);
System.out.println(packet.getData().length);
int len = 0;
for (byte b : packet.getData()) {
if (b == 0)
break;
len ;
}
String m1 = new String(packet.getData(), 0, len, "utf-8");
System.out.println("A收到消息:" m1);
if (m1.substring(0, 4).equals("A->B")) {
String[] address = m1.substring(5).split(":");
System.out.println("地址" address[0] "端口" address[1]);
if (address[0].equals(self)) {
System.out.println("内网访问B");
address[0] = getHostIp();
}
DatagramPacket packet1 = new DatagramPacket("打洞消息A".getBytes(), "打洞消息A".getBytes().length,
new InetSocketAddress(address[0], Integer.parseInt(address[1])));
socket.send(packet1);
//
Thread.sleep(1000);
System.out.println("地址" address[0] "端口" address[1]);
// System.out.println("开始打洞B->A");
DatagramPacket packet2 = new DatagramPacket("P2PA".getBytes(), "P2PA".getBytes().length,
new InetSocketAddress(address[0], Integer.parseInt(address[1])));
socket.send(packet2);
} else if (m1.substring(0, 4).equals("SELF")) {
//保存自己的公网IP
System.out.println("保存IP");
self = m1.substring(5).split(":")[0];
}
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
while (!(msg = scanner.nextLine()).equals("exit")) {
System.out.println(msg "==");
DatagramPacket packet = new DatagramPacket(msg.getBytes(), msg.getBytes().length,
new InetSocketAddress("IP", 8091));
System.out.println(socket.getPort());
socket.send(packet);
}
} finally {
socket.close();
}
}
private static String decode(ByteBuffer bb) {
Charset charset = Charset.forName("utf-8");
return charset.decode(bb).toString();
}
private static String getHostIp() {
try {
Enumeration<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces();
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement();
Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress ip = (InetAddress) addresses.nextElement();
if (ip != null
&& ip instanceof Inet4Address
&& !ip.isLoopbackAddress() //loopback地址即本机地址,IPv4的loopback范围是127.0.0.0 ~ 127.255.255.255
&& ip.getHostAddress().indexOf(":") == -1) {
System.out.println("本机的IP = " ip.getHostAddress());
return ip.getHostAddress();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}