客户端接收服务端处理完成的响应
ClientCnxnSocketNetty.messageReceived
服 务 端 处 理 完 成 以 后 , 会 通 过NettyServerCnxn.sendResponse 发送返回的响应信息, 客户端会在 ClientCnxnSocketNetty.messageReceived 接收服务端的返回
代码语言:javascript复制@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
updateNow();
while (buf.isReadable()) {
if (incomingBuffer.remaining() > buf.readableBytes()) {
int newLimit = incomingBuffer.position() buf.readableBytes();
incomingBuffer.limit(newLimit);
}
buf.readBytes(incomingBuffer);
incomingBuffer.limit(incomingBuffer.capacity());
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount.getAndIncrement();
readLength();
} else if (!initialized) {
readConnectResult();
lenBuffer.clear();
incomingBuffer = lenBuffer;
initialized = true;
updateLastHeard();
} else {
//收到消息以后触发 SendThread.readResponse 方法
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
wakeupCnxn();
// Note: SimpleChannelInboundHandler releases the ByteBuf for us
// so we don't need to do it.
}
SendThread. . readResponse 这个方法里面主要的流程如下 首先读取 header,如果其 xid == -2,表明是一个 ping 的response,return 如果 xid 是 -4 ,表明是一个 AuthPacket 的 response return 如果 xid 是 -1,表明是一个 notification,此时要继续读取并构造一个 enent,通过 EventThread.queueEvent 发送,return 其它情况下:从 pendingQueue 拿出一个 Packet,校验后更新 packet信息
代码语言:javascript复制void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
//反序列化 header
replyHdr.deserialize(bbia, "header");
switch (replyHdr.getXid()) {
case PING_XID:
LOG.debug("Got ping response for session id: 0x{} after {}ms.",
Long.toHexString(sessionId),
((System.nanoTime() - lastPingSentNs) / 1000000));
return;
case AUTHPACKET_XID:
LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
changeZkState(States.AUTH_FAILED);
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
eventThread.queueEventOfDeath();
}
return;
case NOTIFICATION_XID:
//表示当前的消息类型为一个 notification(意味着是服务端的一个响应事件)
LOG.debug("Got notification session id: 0x{}",
Long.toHexString(sessionId));
WatcherEvent event = new WatcherEvent();
//反序列化响应信息
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0) {
event.setPath("/");
} else if (serverPath.length() > chrootPath.length()) {
event.setPath(serverPath.substring(chrootPath.length()));
} else {
LOG.warn("Got server path {} which is too short for chroot path {}.",
event.getPath(), chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
eventThread.queueEvent(we);
return;
default:
break;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia, "token");
zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got " replyHdr.getXid());
}
因为当前这个数据包已经收到了响应,所以讲它从pendingQueued 中移除
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
//校验数据包信息,校验成功后讲数据包信息进行更新(替换为服务端的信息)
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid " replyHdr.getXid()
" with err " replyHdr.getErr()
" expected Xid " packet.requestHeader.getXid()
" for a packet with details: " packet);
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
//获得服 务端的响应,反序列化以后设置到 packet.response 属性 中。
// 所以我们可以在 exists 方法的最后一行通过packet.response 拿到改请求的返回结果
packet.response.deserialize(bbia, "response");
}
LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);
} finally {
//最后调用finishPacket 方法完成处理
finishPacket(packet);
}
}
finishPacket方法:
代码语言:javascript复制// 主要功能是把从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去
protected void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
//注册事件
p.watchRegistration.register(err);
}
// Add all the removed watch events to the event queue, so that the
// clients will be notified with 'Data/Child WatchRemoved' event type.
将所有移除的监视事件添加到事件队列, 这样客户端能收到 “data/child 事件被移除”的事件类型
if (p.watchDeregistration != null) {
Map<EventType, Set<Watcher>> materializedWatchers = null;
try {
materializedWatchers = p.watchDeregistration.unregister(err);
for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {
Set<Watcher> watchers = entry.getValue();
if (watchers.size() > 0) {
queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey());
// ignore connectionloss when removing from local
// session
p.replyHeader.setErr(Code.OK.intValue());
}
}
} catch (KeeperException.NoWatcherException nwe) {
p.replyHeader.setErr(nwe.code().intValue());
} catch (KeeperException ke) {
p.replyHeader.setErr(ke.code().intValue());
}
}
cb 就是 AsnycCallback,如果为 null,表明是同步调用的接口,不需要异步回掉,因此,直接 notifyAll即可。
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
register方法
代码语言:javascript复制public void register(int rc) {
if (shouldAddWatch(rc)) {
通过子类的实现取得ZKWatchManager 中的 existsWatches
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized (watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
// 将Watcher对象放到ZKWatchManager中 的existsWatches 里面
watchers.add(watcher);
}
}
}
getWatches方法
代码语言:javascript复制class ExistsWatchRegistration extends WatchRegistration {
public ExistsWatchRegistration(Watcher watcher, String clientPath) {
super(watcher, clientPath);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return rc == 0 ? getWatchManager().getDataWatches() : getWatchManager().getExistWatches();
}
@Override
protected boolean shouldAddWatch(int rc) {
return rc == 0 || rc == KeeperException.Code.NONODE.intValue();
}
}
客户端存储 watcher 的几个 map 集合,分别对应三种注册监听事件
代码语言:javascript复制class ZKWatchManager implements ClientWatchManager {
private static final Logger LOG = LoggerFactory.getLogger(ZKWatchManager.class);
private final Map<String, Set<Watcher>> dataWatches = new HashMap<>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<>();
private final Map<String, Set<Watcher>> persistentWatches = new HashMap<>();
private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<>();
总 的来说, 当使 用 ZooKeeper 构造方法或 者使用getData 、 exists 和 getChildren 三 个 接 口 来 向ZooKeeper 服务器注册 Watcher 的时候,首先将此消息传递给服务端,传递成功后,服务端会通知客户端,然后客户端将该路径和 Watcher 对应关系存储起来备用。
queuePacket方法
代码语言:javascript复制//将当前的数据包添加到等待事件通知的队列中
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void queuePacket(Packet packet) {
if (wasKilled) {
synchronized (waitingEvents) {
if (isRunning) {
waitingEvents.add(packet);
} else {
processEvent(packet);
}
}
} else {
waitingEvents.add(packet);
}
}
客户端接收服务端流程图:
事件触发
zookeeper.setData(“/mic”, “1”.getByte(),-1) ; //修改节点的值触发监听
服务端事件响应
代码语言:javascript复制public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte[] lastdata = null;
synchronized (n) {
lastdata = n.data;
nodes.preChange(path, n);
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
nodes.postChange(path, n);
}
// now update if the path is in a quota subtree.
String lastPrefix = getMaxPrefixWithQuota(path);
long dataBytes = data == null ? 0 : data.length;
if (lastPrefix != null) {
this.updateCountBytes(lastPrefix, dataBytes - (lastdata == null ? 0 : lastdata.length), 0);
}
nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastdata));
updateWriteStat(path, dataBytes);
// 触 发 对 应 节 点 的NodeDataChanged 事件
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
WatchManager类的triggerWatch方法
代码语言:javascript复制@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
//根据事件类型、连接状态、节点路径创建 WatchedEvent
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers = new HashSet<>();
PathParentIterator pathParentIterator = getPathParentIterator(path);
synchronized (this) {
for (String localPath : pathParentIterator.asIterable()) {
Set<Watcher> thisWatchers = watchTable.get(localPath);
if (thisWatchers == null || thisWatchers.isEmpty()) {
continue;
}
Iterator<Watcher> iterator = thisWatchers.iterator();
while (iterator.hasNext()) {
Watcher watcher = iterator.next();
WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
if (watcherMode.isRecursive()) {
if (type != EventType.NodeChildrenChanged) {
watchers.add(watcher);
}
} else if (!pathParentIterator.atParentPath()) {
watchers.add(watcher);
if (!watcherMode.isPersistent()) {
iterator.remove();
//根据 watcher 从 watcher 表中取出路径集合
Set<String> paths = watch2Paths.get(watcher);
if (paths != null) {
//移除路径
paths.remove(localPath);
}
}
}
}
if (thisWatchers.isEmpty()) {
watchTable.remove(localPath);
}
}
}
if (watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " path);
}
return null;
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
//
w.process(e);
}
switch (type) {
case NodeCreated:
ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size());
break;
case NodeDeleted:
ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size());
break;
case NodeDataChanged:
ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size());
break;
case NodeChildrenChanged:
ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size());
break;
default:
// Other types not logged.
break;
}
return new WatcherOrBitSet(watchers);
}
NettyServerCnxn 这个类的 process 方法看看
代码语言:javascript复制@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " event " to 0x" Long.toHexString(this.sessionId) " through " this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
try {
//发送响应
sendResponse(h, e, "notification");
} catch (IOException e1) {
LOG.debug("Problem sending to {}", getRemoteSocketAddress(), e1);
close();
}
}
那 接 下 里 , 客 户 端 会 收 到 这 个 response , 触 发SendThread.readResponse 方法
客户端处理事件响应
SendThread.readResponse
代码语言:javascript复制void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
//反序列化 header
replyHdr.deserialize(bbia, "header");
switch (replyHdr.getXid()) {
case PING_XID:
LOG.debug("Got ping response for session id: 0x{} after {}ms.",
Long.toHexString(sessionId),
((System.nanoTime() - lastPingSentNs) / 1000000));
return;
case AUTHPACKET_XID:
LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
changeZkState(States.AUTH_FAILED);
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
eventThread.queueEventOfDeath();
}
return;
case NOTIFICATION_XID:
//表示当前的消息类型为一个 notification(意味着是服务端的一个响应事件)
LOG.debug("Got notification session id: 0x{}",
Long.toHexString(sessionId));
WatcherEvent event = new WatcherEvent();
//反序列化响应信息
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0) {
event.setPath("/");
} else if (serverPath.length() > chrootPath.length()) {
event.setPath(serverPath.substring(chrootPath.length()));
} else {
LOG.warn("Got server path {} which is too short for chroot path {}.",
event.getPath(), chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
//根据该通知事件,从ZKWatchManager 中取出所有相关的 Watcher,如果获取到相应的 Watcher,就会让 Watcher 移除失效
eventThread.queueEvent(we);
return;
default:
break;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia, "token");
zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got " replyHdr.getXid());
}
因为当前这个数据包已经收到了响应,所以讲它从pendingQueued 中移除
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
//校验数据包信息,校验成功后讲数据包信息进行更新(替换为服务端的信息)
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid " replyHdr.getXid()
" with err " replyHdr.getErr()
" expected Xid " packet.requestHeader.getXid()
" for a packet with details: " packet);
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
//获得服 务端的响应,反序列化以后设置到 packet.response 属性 中。
// 所以我们可以在 exists 方法的最后一行通过packet.response 拿到改请求的返回结果
packet.response.deserialize(bbia, "response");
}
LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);
} finally {
//最后调用finishPacket 方法完成处理
finishPacket(packet);
}
}
queueEvent方法
SendThread 接收到服务端的通知事件后,会通过调用EventThread 类 的 queueEvent 方 法 将 事 件 传 给EventThread 线程,queueEvent 方法根据该通知事件,从ZKWatchManager 中取出所有相关的 Watcher,如果获取到相应的 Watcher,就会让 Watcher 移除失效。
代码语言:javascript复制private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None && sessionState == event.getState()) {
return;
}
sessionState = event.getState();
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// materialize the watchers based on the event
//materialize
watchers = watchManager.materialize(event.getState(), event.getType(), event.getPath());
} else {
watchers = new HashSet<>(materializedWatchers);
}
//封装 WatcherSetEventPair 对象,添加到waitngEvents 队列中
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
materialize方法:
通过 dataWatches 或者 existWatches 或者 childWatches的 remove 取出对应的 watch,表明客户端 watch 也是注册一次就移除 同时需要根据 keeperState、eventType 和 path 返回应该被通知的 Watcher 集合
waitingEvents.add(pair);waitingEvents 是 EventThread 这个线程中的阻塞队列,很明显,又是在我们第一步操作的时候实例化的一个线程。从名字可以指导,waitingEvents 是一个待处理 Watcher的队列,EventThread 的 run() 方法会不断从队列中取数据,交由 processEvent 方法处理:
EventThread 的 run() 方法从waitingEvents
代码语言:javascript复制@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
//处理事件
processEvent(event);
}
if (wasKilled) {
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
}
processEvent方法
代码语言:javascript复制private void processEvent(Object event) {
try {
//判断事件类型
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
//得到WatcherSetEventPair
WatcherSetEventPair pair = (WatcherSetEventPair) event;
//拿到符合触发机制的所有 watcher 列表,循环进行调用
for (Watcher watcher : pair.watchers) {
try {
// 调 用 客 户 端 的 回 调process
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher.", t);
}
}
头疼。。。