对IM的一点小思考

2020-06-01 10:43:46 浏览数 (1)

即时通讯

最近在做IM(即时通讯:Instant Messaging)项目(基于网易云信)的升级改造, 虽然核心的通讯采用第三方SDK,但是对于即时通讯这一块还是产生了不少疑惑, 这篇文章主要记录自己对于实现聊天双方的即时通讯的思考。

作为一名Java开发,网络协议这一块自己接触最多的就是应用层的HTTP,它作为半双工通讯是无法实现 即时的效果,即使客户端采用轮询方式访问服务端产生的消息,也并不是真正意义上的即时,那么如何实现是我一开始最纠结的问题,纠结的点在于即时通讯,不光是要考虑Web端,还有APP应用。如果实现Web端的即时通讯,我们可以想到使用WebSocket协议(全双工)实现,但是APP应用像QQ, 微信他们是如何达到即时的效果,以QQ软件为例,它的应用层协议是私有协议,传输层协议采用UDP, 应用层使用私协议的好处就是高效,节约流量(一般使用二进制协议),安全性高,难以破解,很考验设计者 的能力,传输层采用UDP协议,UDP协议与TCP协议相比更快,少了三次握手这些环节,但是它不稳定, 无法保证消息能被接收(腾讯好像是在应用层上下了功夫来保障消息不会丢失),到这里,通过QQ实协议的选取,我想通讯的过程应该是(个人猜测):用户A登录,基于UDP协议,与腾讯服务器建立连接,服务器记住当前登录 用户的IP地址(这个地址通常是经过过NAT(网络地址转换)得到的 ,并非局域网地址,在互联网中,如果我们内网访问外网, 通常在通过网络出口的时候,被替换IP报文头部的地址信息,这种方式可以节约IPV4的地址,也就是 同一个NAT下的内网主机,在访问外网地址时,使用的是同一个地址信息,只是端口不同),当腾讯服务器记录 用户A的IP地址和端口号时,表明链接建立,此时用户A想与好友B进行通讯,发送了一条消息(这里不考虑离线), 那么消息会通过服务器转发给B,转发的方式也就是通过UDP发送报文到B的地址。这里我简单用自己想法使用Java基于UDP写了 一段代码(文章末尾的代码一),这样也就是实现了即时通讯(服务器通过UDP协议对客服端转发消息)

除了服务器转发之外,我们应该还需要知道另外一种方案P2P,也就是点对点通讯,通常情况下,一般的即时通讯, 文本消息是通过服务器转发,多媒体数据(视频电话这些)采用P2P的方案,这里我们仍然以UDP协议为例,想一下如何实现点对点的通讯, 与服务器转发不同,点对点的通讯是可能需要内网穿透的(不同局域网下的主机A访问到主机B),我们内网访问公网IP(腾讯的服务器)是容易的, 但是我们要实现访问内网却不是那么简单, 我想大家都听过内网穿透,打洞这些词(一个含义),他们就是实现P2P功能的方案,在说使用UDP实现打洞前,我们看一下上面说的NAT这个概念, 我们家庭/公司中一般都会有路由器,路由器有三种工作模式:NAT模式,路由模式,全模式,三者区别我就不提了,通常情况下,我们路由处于NAT模式, NAT模式按照方式有有这几种类型:全锥形NAT,限制锥形NAT,端口限制锥形NAT,对称型NAT。

如果需要打洞的两台主机在同一个NAT下,那么就没有打洞的必要,使用局域网IP即可完成P2P,即使用打洞可能也没效果, 一般路由器会拒绝掉这种回路的UDP包,

而如果主机不在同一个NAT下,那么就有打洞的必要了(对称型NAT有些特殊,无法完成打洞,因为每次建立连接端口都会发生变化, 难以实现P2P,补充方案是:端口预测)

下面再说打洞的步骤

代码语言:javascript复制
主机A,主机B,服务器S

主机A发送请求给服务器S,建立连接 A <-> S
主机B发送请求给服务器S,建立连接 B <-> S

主机A发送请求给服务器S,与B进行P2P连接

服务器S发送打洞请求给B,B主动发送一条消息给A,此时由于A所处的NAT不信任B,拒绝B发送包,但是此时由于B给A发消息,那么B的NAT信任A。
A发送P2P连接请求,此时B <-> A之间打通

参考文章:https://blog.csdn.net/jdh99/article/details/6667648

说到这里其实整理一下就会明白,打洞就是完成相互信任一个过程,当我们在不同的NAT下,通过服务器得到对方的IP和端口,然后得到对方NAT信任, 再通过UDP进行连接通讯。在这些NAT类型中,对称型NAT特殊是因为,每次他的端口都不一样,我们很难预测进行通讯时他的端口是什么,也就无法完成打洞, 通常情况下,我们家庭路由不是对称型NAT,但是电脑连接手机4G开启热点,就是对称型NAT。

想要鉴别自己所处的NAT类型是不是对称型,其实很简单, 如下图,手机热点访问对两个公网地址发包,得到的自己的IP地址是一样的,但是端口不同。对两个公网IP进行UDP广播,如果两个主机拿到的端口不相同那就是对称型NAT。但是如果端口变化可以预测,那么应该也可以打洞成功(但是我用手机热点, 使用UDP打洞没有成功过)。

打洞的完整代码(文章末尾的代码二)

代码一

服务端

代码语言:javascript复制
DatagramChannel channel = (DatagramChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
SocketAddress receive = channel.receive(byteBuffer);
//记录地址
map.put("用户X", receive);

客户端

代码语言:javascript复制
String msg = "X:登录建立连接"
DatagramPacket packet = new DatagramPacket(msg.getBytes(), msg.getBytes().length,
                        new InetSocketAddress("IP", 8091));

服务端转发消息

代码语言:javascript复制
public static void sendMessage(SocketAddress socketAddress, String msg) throws Exception {
        if (socketAddress != null) {
            ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
            server.send(byteBuffer, socketAddress);
        } else {
            System.out.println("群发数据");
        }
    }

客户端发送消息

代码语言:javascript复制
DatagramPacket packet2 = new DatagramPacket("A:发送给B".getBytes(),
"A:发送给B".getBytes().length,
      new InetSocketAddress(address[0],
      Integer.parseInt(address[1])));
       socket.send(packet2);
代码二

服务端:

代码语言:javascript复制
package com.studyjava.email.udp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class UDPService {
    //保存连接的地址
    private static Map<String, SocketAddress> map = new ConcurrentHashMap<>();

    //数据报
    private static DatagramChannel server = null;

    //选择器
    private static Selector selector = null;

    static {
        try {
            server = DatagramChannel.open().bind(new InetSocketAddress(8091));
            server.configureBlocking(false);
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public static void sendMessage(SocketAddress socketAddress, String msg) throws Exception {
        if (socketAddress != null) {
            ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
            server.send(byteBuffer, socketAddress);
        } else {
            System.out.println("群发数据");
        }
    }

    public static String readMsg(SelectionKey selectionKey) throws Exception {
        DatagramChannel channel = (DatagramChannel) selectionKey.channel();
        System.out.println(channel.getRemoteAddress()   ":"   channel.getLocalAddress());
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        SocketAddress receive = channel.receive(byteBuffer);
        byteBuffer.flip();
        String msg = decode(byteBuffer);//new String(byteBuffer.array(), "utf-8");
        System.out.println("收到消息:"   msg);
        String notices = "";
        System.out.println(msg.charAt(0));
        if ((msg.charAt(0) == 'A' && msg.charAt(msg.length() - 1) == 'B') || msg.charAt(0) == 'B' && msg.charAt(msg.length() - 1) == 'A') {
            notices = "B->A"   map.get("A");
            sendMessage(map.get("B"), notices);
            notices = "A->B"   map.get("B");
            sendMessage(map.get("A"), notices);
            System.out.println("A客服端连接B,通知B进行打洞");
            System.out.println("B客服端连接A,通知A进行打洞");

        } else if (msg.charAt(0) == 'A') {
            System.out.println("A客户端注册连接");
            notices = "SELF"   receive;
            sendMessage(receive, notices);
            map.put("A", receive);
        } else if (msg.charAt(0) == 'B') {
            System.out.println("B客户端注册连接");
            notices = "SELF"   receive;
            sendMessage(receive, notices);
            map.put("B", receive);
        } else {
            sendMessage(null, msg);
        }

        return msg;
    }

    public static String decode(ByteBuffer bb) {
        Charset charset = Charset.forName("utf-8");
        return charset.decode(bb).toString();

    }

    public static void main(String[] args) throws Exception {
        System.out.println("启动UDP");
        server.register(selector, SelectionKey.OP_READ);
        while (true) {
            if (selector.select() > 0) {
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if (selectionKey.isReadable()) {
                        readMsg(selectionKey);
                        iterator.remove();
                    }
                }
            }
        }

    }
}

客户端A和B(使用同一份代码即可,通过Scanner输入判断A和B)

代码语言:javascript复制
package com.studyjava.email.udp;

import java.net.*;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Enumeration;
import java.util.Scanner;

public class UDPClientOne {
    public static DatagramSocket socket = null;
    private static String self = "";

    public static void main(String[] args) throws Exception {
        try {
            socket = new DatagramSocket();
            Scanner scanner = new Scanner(System.in);
            String msg;
            new Thread(() -> {
                try {
                    while (true) {
                        byte[] arr = new byte[1024];
                        DatagramPacket packet = new DatagramPacket(arr, arr.length);
                        socket.receive(packet);
                        System.out.println(packet.getData().length);

                        int len = 0;
                        for (byte b : packet.getData()) {
                            if (b == 0)
                                break;
                            len  ;
                        }
                        String m1 = new String(packet.getData(), 0, len, "utf-8");
                        System.out.println("A收到消息:"   m1);
                        if (m1.substring(0, 4).equals("A->B")) {
                            String[] address = m1.substring(5).split(":");
                            System.out.println("地址"   address[0]   "端口"   address[1]);
                            if (address[0].equals(self)) {
                                System.out.println("内网访问B");
                                address[0] = getHostIp();
                            }

                            DatagramPacket packet1 = new DatagramPacket("打洞消息A".getBytes(), "打洞消息A".getBytes().length,
                                    new InetSocketAddress(address[0], Integer.parseInt(address[1])));
                            socket.send(packet1);
                            //
                            Thread.sleep(1000);
                            System.out.println("地址"   address[0]   "端口"   address[1]);
//                            System.out.println("开始打洞B->A");
                            DatagramPacket packet2 = new DatagramPacket("P2PA".getBytes(), "P2PA".getBytes().length,
                                    new InetSocketAddress(address[0], Integer.parseInt(address[1])));
                            socket.send(packet2);
                        } else if (m1.substring(0, 4).equals("SELF")) {
                            //保存自己的公网IP
                            System.out.println("保存IP");
                            self = m1.substring(5).split(":")[0];
                        }

                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
            while (!(msg = scanner.nextLine()).equals("exit")) {
                System.out.println(msg   "==");
                DatagramPacket packet = new DatagramPacket(msg.getBytes(), msg.getBytes().length,
                        new InetSocketAddress("IP", 8091));
                System.out.println(socket.getPort());
                socket.send(packet);
            }
        } finally {
            socket.close();
        }
    }

    private static String decode(ByteBuffer bb) {
        Charset charset = Charset.forName("utf-8");
        return charset.decode(bb).toString();

    }

    private static String getHostIp() {
        try {
            Enumeration<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces();
            while (allNetInterfaces.hasMoreElements()) {
                NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement();
                Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
                while (addresses.hasMoreElements()) {
                    InetAddress ip = (InetAddress) addresses.nextElement();
                    if (ip != null
                            && ip instanceof Inet4Address
                            && !ip.isLoopbackAddress() //loopback地址即本机地址,IPv4的loopback范围是127.0.0.0 ~ 127.255.255.255
                            && ip.getHostAddress().indexOf(":") == -1) {
                        System.out.println("本机的IP = "   ip.getHostAddress());
                        return ip.getHostAddress();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

0 人点赞