深入分析 Watcher 机制的实现原理(三)客户端接收服务端处理完成的响应及事件触发

2022-10-25 16:12:27 浏览数 (2)

客户端接收服务端处理完成的响应

ClientCnxnSocketNetty.messageReceived

服 务 端 处 理 完 成 以 后 , 会 通 过NettyServerCnxn.sendResponse 发送返回的响应信息, 客户端会在 ClientCnxnSocketNetty.messageReceived 接收服务端的返回

代码语言:javascript复制
@Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
            updateNow();
            while (buf.isReadable()) {
                if (incomingBuffer.remaining() > buf.readableBytes()) {
                    int newLimit = incomingBuffer.position()   buf.readableBytes();
                    incomingBuffer.limit(newLimit);
                }
                buf.readBytes(incomingBuffer);
                incomingBuffer.limit(incomingBuffer.capacity());

                if (!incomingBuffer.hasRemaining()) {
                    incomingBuffer.flip();
                    if (incomingBuffer == lenBuffer) {
                        recvCount.getAndIncrement();
                        readLength();
                    } else if (!initialized) {
                        readConnectResult();
                        lenBuffer.clear();
                        incomingBuffer = lenBuffer;
                        initialized = true;
                        updateLastHeard();
                    } else {
                        //收到消息以后触发 SendThread.readResponse 方法
                        sendThread.readResponse(incomingBuffer);
                        lenBuffer.clear();
                        incomingBuffer = lenBuffer;
                        updateLastHeard();
                    }
                }
            }
            wakeupCnxn();
            // Note: SimpleChannelInboundHandler releases the ByteBuf for us
            // so we don't need to do it.
        }

SendThread. . readResponse 这个方法里面主要的流程如下 首先读取 header,如果其 xid == -2,表明是一个 ping 的response,return 如果 xid 是 -4 ,表明是一个 AuthPacket 的 response return 如果 xid 是 -1,表明是一个 notification,此时要继续读取并构造一个 enent,通过 EventThread.queueEvent 发送,return 其它情况下:从 pendingQueue 拿出一个 Packet,校验后更新 packet信息

代码语言:javascript复制
void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();

            //反序列化 header
            replyHdr.deserialize(bbia, "header");
            switch (replyHdr.getXid()) {
            case PING_XID:
                LOG.debug("Got ping response for session id: 0x{} after {}ms.",
                    Long.toHexString(sessionId),
                    ((System.nanoTime() - lastPingSentNs) / 1000000));
                return;
              case AUTHPACKET_XID:
                LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
                if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                    changeZkState(States.AUTH_FAILED);
                    eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.AuthFailed, null));
                    eventThread.queueEventOfDeath();
                }
              return;
            case NOTIFICATION_XID:
                //表示当前的消息类型为一个 notification(意味着是服务端的一个响应事件)
                LOG.debug("Got notification session id: 0x{}",
                    Long.toHexString(sessionId));
                WatcherEvent event = new WatcherEvent();

                //反序列化响应信息
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if (serverPath.compareTo(chrootPath) == 0) {
                        event.setPath("/");
                    } else if (serverPath.length() > chrootPath.length()) {
                        event.setPath(serverPath.substring(chrootPath.length()));
                     } else {
                         LOG.warn("Got server path {} which is too short for chroot path {}.",
                             event.getPath(), chrootPath);
                     }
                }

                WatchedEvent we = new WatchedEvent(event);
                LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
                eventThread.queueEvent(we);
                return;
            default:
                break;
            }

            // If SASL authentication is currently in progress, construct and
            // send a response packet immediately, rather than queuing a
            // response as with other packets.
            if (tunnelAuthInProgress()) {
                GetSASLRequest request = new GetSASLRequest();
                request.deserialize(bbia, "token");
                zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
                return;
            }

            Packet packet;
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got "   replyHdr.getXid());
                }

                因为当前这个数据包已经收到了响应,所以讲它从pendingQueued 中移除
                packet = pendingQueue.remove();
            }
            /*
             * Since requests are processed in order, we better get a response
             * to the first request!
             */
            try {

                //校验数据包信息,校验成功后讲数据包信息进行更新(替换为服务端的信息)
                if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                    packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
                    throw new IOException("Xid out of order. Got Xid "   replyHdr.getXid()
                                            " with err "   replyHdr.getErr()
                                            " expected Xid "   packet.requestHeader.getXid()
                                            " for a packet with details: "   packet);
                }

                packet.replyHeader.setXid(replyHdr.getXid());
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
                if (replyHdr.getZxid() > 0) {
                    lastZxid = replyHdr.getZxid();
                }
                if (packet.response != null && replyHdr.getErr() == 0) {
                    //获得服 务端的响应,反序列化以后设置到 packet.response 属性 中。
                    // 所以我们可以在 exists 方法的最后一行通过packet.response 拿到改请求的返回结果
                    packet.response.deserialize(bbia, "response");
                }

                LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);
            } finally {

                //最后调用finishPacket 方法完成处理
                finishPacket(packet);
            }
        }

finishPacket方法:

代码语言:javascript复制
// 主要功能是把从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去
    protected void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        if (p.watchRegistration != null) {
            
            //注册事件
            p.watchRegistration.register(err);
        }
        // Add all the removed watch events to the event queue, so that the
        // clients will be notified with 'Data/Child WatchRemoved' event type.
        将所有移除的监视事件添加到事件队列, 这样客户端能收到 “data/child 事件被移除”的事件类型
        if (p.watchDeregistration != null) {
            Map<EventType, Set<Watcher>> materializedWatchers = null;
            try {
                materializedWatchers = p.watchDeregistration.unregister(err);
                for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {
                    Set<Watcher> watchers = entry.getValue();
                    if (watchers.size() > 0) {
                        queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey());
                        // ignore connectionloss when removing from local
                        // session
                        p.replyHeader.setErr(Code.OK.intValue());
                    }
                }
            } catch (KeeperException.NoWatcherException nwe) {
                p.replyHeader.setErr(nwe.code().intValue());
            } catch (KeeperException ke) {
                p.replyHeader.setErr(ke.code().intValue());
            }
        }

        cb 就是 AsnycCallback,如果为 null,表明是同步调用的接口,不需要异步回掉,因此,直接 notifyAll即可。
        if (p.cb == null) {
            synchronized (p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            p.finished = true;
            eventThread.queuePacket(p);
        }
    }

register方法

代码语言:javascript复制
public void register(int rc) {
            if (shouldAddWatch(rc)) {
                通过子类的实现取得ZKWatchManager 中的 existsWatches
                Map<String, Set<Watcher>> watches = getWatches(rc);
                synchronized (watches) {
                    Set<Watcher> watchers = watches.get(clientPath);
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    // 将Watcher对象放到ZKWatchManager中 的existsWatches 里面
                    watchers.add(watcher);
                }
            }
        }

getWatches方法

代码语言:javascript复制
class ExistsWatchRegistration extends WatchRegistration {

        public ExistsWatchRegistration(Watcher watcher, String clientPath) {
            super(watcher, clientPath);
        }

        @Override
        protected Map<String, Set<Watcher>> getWatches(int rc) {
            return rc == 0 ? getWatchManager().getDataWatches() : getWatchManager().getExistWatches();
        }

        @Override
        protected boolean shouldAddWatch(int rc) {
            return rc == 0 || rc == KeeperException.Code.NONODE.intValue();
        }

    }

客户端存储 watcher 的几个 map 集合,分别对应三种注册监听事件

代码语言:javascript复制
class ZKWatchManager implements ClientWatchManager {

    private static final Logger LOG = LoggerFactory.getLogger(ZKWatchManager.class);

    private final Map<String, Set<Watcher>> dataWatches = new HashMap<>();
    private final Map<String, Set<Watcher>> existWatches = new HashMap<>();
    private final Map<String, Set<Watcher>> childWatches = new HashMap<>();
    private final Map<String, Set<Watcher>> persistentWatches = new HashMap<>();
    private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<>();

总 的来说, 当使 用 ZooKeeper 构造方法或 者使用getData 、 exists 和 getChildren 三 个 接 口 来 向ZooKeeper 服务器注册 Watcher 的时候,首先将此消息传递给服务端,传递成功后,服务端会通知客户端,然后客户端将该路径和 Watcher 对应关系存储起来备用。

queuePacket方法

代码语言:javascript复制
//将当前的数据包添加到等待事件通知的队列中
        @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
        public void queuePacket(Packet packet) {
            if (wasKilled) {
                synchronized (waitingEvents) {
                    if (isRunning) {
                        waitingEvents.add(packet);
                    } else {
                        processEvent(packet);
                    }
                }
            } else {
                waitingEvents.add(packet);
            }
        }

客户端接收服务端流程图:

事件触发

zookeeper.setData(“/mic”, “1”.getByte(),-1) ; //修改节点的值触发监听

服务端事件响应
代码语言:javascript复制
public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        byte[] lastdata = null;
        synchronized (n) {
            lastdata = n.data;
            nodes.preChange(path, n);
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
            nodes.postChange(path, n);
        }
        // now update if the path is in a quota subtree.
        String lastPrefix = getMaxPrefixWithQuota(path);
        long dataBytes = data == null ? 0 : data.length;
        if (lastPrefix != null) {
            this.updateCountBytes(lastPrefix, dataBytes - (lastdata == null ? 0 : lastdata.length), 0);
        }
        nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastdata));

        updateWriteStat(path, dataBytes);
        // 触 发 对 应 节 点 的NodeDataChanged 事件
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }

WatchManager类的triggerWatch方法

代码语言:javascript复制
@Override
    public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
        //根据事件类型、连接状态、节点路径创建 WatchedEvent
        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
        Set<Watcher> watchers = new HashSet<>();
        PathParentIterator pathParentIterator = getPathParentIterator(path);
        synchronized (this) {
            for (String localPath : pathParentIterator.asIterable()) {
                Set<Watcher> thisWatchers = watchTable.get(localPath);
                if (thisWatchers == null || thisWatchers.isEmpty()) {
                    continue;
                }
                Iterator<Watcher> iterator = thisWatchers.iterator();
                while (iterator.hasNext()) {
                    Watcher watcher = iterator.next();
                    WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
                    if (watcherMode.isRecursive()) {
                        if (type != EventType.NodeChildrenChanged) {
                            watchers.add(watcher);
                        }
                    } else if (!pathParentIterator.atParentPath()) {
                        watchers.add(watcher);
                        if (!watcherMode.isPersistent()) {
                            iterator.remove();
                            //根据 watcher 从 watcher 表中取出路径集合
                            Set<String> paths = watch2Paths.get(watcher);
                            if (paths != null) {
                                //移除路径
                                paths.remove(localPath);
                            }
                        }
                    }
                }
                if (thisWatchers.isEmpty()) {
                    watchTable.remove(localPath);
                }
            }
        }
        if (watchers.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for "   path);
            }
            return null;
        }

        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }

            //
            w.process(e);
        }

        switch (type) {
            case NodeCreated:
                ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size());
                break;

            case NodeDeleted:
                ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size());
                break;

            case NodeDataChanged:
                ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size());
                break;

            case NodeChildrenChanged:
                ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size());
                break;
            default:
                // Other types not logged.
                break;
        }

        return new WatcherOrBitSet(watchers);
    }

NettyServerCnxn 这个类的 process 方法看看

代码语言:javascript复制
@Override
public void process(WatchedEvent event) {
    ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(
            LOG,
            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
            "Deliver event "   event   " to 0x"   Long.toHexString(this.sessionId)   " through "   this);
    }

    // Convert WatchedEvent to a type that can be sent over the wire
    WatcherEvent e = event.getWrapper();

    try {

        //发送响应
        sendResponse(h, e, "notification");
    } catch (IOException e1) {
        LOG.debug("Problem sending to {}", getRemoteSocketAddress(), e1);
        close();
    }
}

那 接 下 里 , 客 户 端 会 收 到 这 个 response , 触 发SendThread.readResponse 方法

客户端处理事件响应

SendThread.readResponse

代码语言:javascript复制
void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();

            //反序列化 header
            replyHdr.deserialize(bbia, "header");
            switch (replyHdr.getXid()) {
            case PING_XID:
                LOG.debug("Got ping response for session id: 0x{} after {}ms.",
                    Long.toHexString(sessionId),
                    ((System.nanoTime() - lastPingSentNs) / 1000000));
                return;
              case AUTHPACKET_XID:
                LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
                if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                    changeZkState(States.AUTH_FAILED);
                    eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.AuthFailed, null));
                    eventThread.queueEventOfDeath();
                }
              return;
            case NOTIFICATION_XID:
                //表示当前的消息类型为一个 notification(意味着是服务端的一个响应事件)
                LOG.debug("Got notification session id: 0x{}",
                    Long.toHexString(sessionId));
                WatcherEvent event = new WatcherEvent();

                //反序列化响应信息
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if (serverPath.compareTo(chrootPath) == 0) {
                        event.setPath("/");
                    } else if (serverPath.length() > chrootPath.length()) {
                        event.setPath(serverPath.substring(chrootPath.length()));
                     } else {
                         LOG.warn("Got server path {} which is too short for chroot path {}.",
                             event.getPath(), chrootPath);
                     }
                }

                WatchedEvent we = new WatchedEvent(event);
                LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));

                //根据该通知事件,从ZKWatchManager 中取出所有相关的 Watcher,如果获取到相应的 Watcher,就会让 Watcher 移除失效
                eventThread.queueEvent(we);
                return;
            default:
                break;
            }

            // If SASL authentication is currently in progress, construct and
            // send a response packet immediately, rather than queuing a
            // response as with other packets.
            if (tunnelAuthInProgress()) {
                GetSASLRequest request = new GetSASLRequest();
                request.deserialize(bbia, "token");
                zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
                return;
            }

            Packet packet;
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got "   replyHdr.getXid());
                }

                因为当前这个数据包已经收到了响应,所以讲它从pendingQueued 中移除
                packet = pendingQueue.remove();
            }
            /*
             * Since requests are processed in order, we better get a response
             * to the first request!
             */
            try {

                //校验数据包信息,校验成功后讲数据包信息进行更新(替换为服务端的信息)
                if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                    packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
                    throw new IOException("Xid out of order. Got Xid "   replyHdr.getXid()
                                            " with err "   replyHdr.getErr()
                                            " expected Xid "   packet.requestHeader.getXid()
                                            " for a packet with details: "   packet);
                }

                packet.replyHeader.setXid(replyHdr.getXid());
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
                if (replyHdr.getZxid() > 0) {
                    lastZxid = replyHdr.getZxid();
                }
                if (packet.response != null && replyHdr.getErr() == 0) {
                    //获得服 务端的响应,反序列化以后设置到 packet.response 属性 中。
                    // 所以我们可以在 exists 方法的最后一行通过packet.response 拿到改请求的返回结果
                    packet.response.deserialize(bbia, "response");
                }

                LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);
            } finally {

                //最后调用finishPacket 方法完成处理
                finishPacket(packet);
            }
        }

queueEvent方法

SendThread 接收到服务端的通知事件后,会通过调用EventThread 类 的 queueEvent 方 法 将 事 件 传 给EventThread 线程,queueEvent 方法根据该通知事件,从ZKWatchManager 中取出所有相关的 Watcher,如果获取到相应的 Watcher,就会让 Watcher 移除失效。

代码语言:javascript复制
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
            if (event.getType() == EventType.None && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();
            final Set<Watcher> watchers;
            if (materializedWatchers == null) {
                // materialize the watchers based on the event
                //materialize
                watchers = watchManager.materialize(event.getState(), event.getType(), event.getPath());
            } else {
                watchers = new HashSet<>(materializedWatchers);
            }

            //封装 WatcherSetEventPair 对象,添加到waitngEvents 队列中
            WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
            // queue the pair (watch set & event) for later processing
            waitingEvents.add(pair);
        }

materialize方法:

通过 dataWatches 或者 existWatches 或者 childWatches的 remove 取出对应的 watch,表明客户端 watch 也是注册一次就移除 同时需要根据 keeperState、eventType 和 path 返回应该被通知的 Watcher 集合

waitingEvents.add(pair);waitingEvents 是 EventThread 这个线程中的阻塞队列,很明显,又是在我们第一步操作的时候实例化的一个线程。从名字可以指导,waitingEvents 是一个待处理 Watcher的队列,EventThread 的 run() 方法会不断从队列中取数据,交由 processEvent 方法处理:

EventThread 的 run() 方法从waitingEvents

代码语言:javascript复制
@Override
        @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
        public void run() {
            try {
                isRunning = true;
                while (true) {
                    Object event = waitingEvents.take();
                    if (event == eventOfDeath) {
                        wasKilled = true;
                    } else {

                        //处理事件
                        processEvent(event);
                    }
                    if (wasKilled) {
                        synchronized (waitingEvents) {
                            if (waitingEvents.isEmpty()) {
                                isRunning = false;
                                break;
                            }
                        }
                    }
                }
            } catch (InterruptedException e) {
                LOG.error("Event thread exiting due to interruption", e);
            }

            LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
        }

processEvent方法

代码语言:javascript复制
private void processEvent(Object event) {
            try {
                //判断事件类型
                if (event instanceof WatcherSetEventPair) {
                    // each watcher will process the event
                    //得到WatcherSetEventPair
                    WatcherSetEventPair pair = (WatcherSetEventPair) event;
                    //拿到符合触发机制的所有 watcher 列表,循环进行调用
                    for (Watcher watcher : pair.watchers) {

                        try {

                            // 调 用 客 户 端 的 回 调process
                            watcher.process(pair.event);
                        } catch (Throwable t) {
                            LOG.error("Error while calling watcher.", t);
                        }
                    }

头疼。。。

0 人点赞