服务端接收请求处理流程
服务端有一个 NettyServerCnxn 类,用来处理客户端发送过来的请求
代码语言:javascript复制private void receiveMessage(ByteBuf message) {
checkIsInEventLoop("receiveMessage");
try {
while (message.isReadable() && !throttled.get()) {
// //ByteBuffer 不为空
if (bb != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
LOG.trace("0x{} bb {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
//bb 剩余空间大于 message 中可读字节大小
if (bb.remaining() > message.readableBytes()) {
int newLimit = bb.position() message.readableBytes();
bb.limit(newLimit);
}
// 将 message 写入 bb 中
message.readBytes(bb);
bb.limit(bb.capacity());
if (LOG.isTraceEnabled()) {
LOG.trace("after readBytes message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
LOG.trace("after readbytes 0x{} bb {}",
Long.toHexString(sessionId),
ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
// 已经读完 message
if (bb.remaining() == 0) {
bb.flip();
//统计接收信息
packetReceived(4 bb.remaining());
ZooKeeperServer zks = this.zkServer;
if (zks == null || !zks.isRunning()) {
throw new IOException("ZK down");
}
if (initialized) {
// TODO: if zks.processPacket() is changed to take a ByteBuffer[],
// we could implement zero-copy queueing.
//处理客户端传送过来的数据包
zks.processPacket(this, bb);
} else {
LOG.debug("got conn req request from {}", getRemoteSocketAddress());
zks.processConnectRequest(this, bb);
initialized = true;
}
bb = null;
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("message readable {} bblenrem {}", message.readableBytes(), bbLen.remaining());
ByteBuffer dat = bbLen.duplicate();
dat.flip();
LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (message.readableBytes() < bbLen.remaining()) {
bbLen.limit(bbLen.position() message.readableBytes());
}
message.readBytes(bbLen);
bbLen.limit(bbLen.capacity());
if (bbLen.remaining() == 0) {
bbLen.flip();
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(bbLen)));
}
int len = bbLen.getInt();
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} bbLen len is {}", Long.toHexString(sessionId), len);
}
bbLen.clear();
if (!initialized) {
if (checkFourLetterWord(channel, message, len)) {
return;
}
}
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error " len);
}
// checkRequestSize will throw IOException if request is rejected
zkServer.checkRequestSizeWhenReceivingMessage(len);
bb = ByteBuffer.allocate(len);
}
}
}
} catch (IOException e) {
LOG.warn("Closing connection to {}", getRemoteSocketAddress(), e);
close(DisconnectReason.IO_EXCEPTION);
} catch (ClientCnxnLimitException e) {
// Common case exception, print at debug level
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.debug("Closing connection to {}", getRemoteSocketAddress(), e);
close(DisconnectReason.CLIENT_RATE_LIMIT);
}
}
zks.processPacket(this, bb) 方法:
代码语言:javascript复制public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
//反序列化客户端 header 头信息
h.deserialize(bia, "header");
// Need to increase the outstanding request count first, otherwise
// there might be a race condition that it enabled recv after
// processing request and then disabled when check throttling.
//
// Be aware that we're actually checking the global outstanding
// request before this request.
//
// It's fine if the IOException thrown before we decrease the count
// in cnxn, since it will close the cnxn anyway.
cnxn.incrOutstandingAndCheckThrottle(h);
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
//判断当前操作类型
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if (ap != null) {
try {
// handleAuthentication may close the connection, to allow the client to choose
// a different server to connect to.
authReturn = ap.handleAuthentication(
new ServerAuthenticationProvider.ServerObjs(this, cnxn),
authPacket.getAuth());
} catch (RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
LOG.debug("Authentication succeeded for scheme: {}", scheme);
LOG.info("auth success {}", cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
if (ap == null) {
LOG.warn(
"No authentication provider for scheme: {} has {}",
scheme,
ProviderRegistry.listProviders());
} else {
LOG.warn("Authentication failed for scheme: {}", scheme);
}
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
}
return;
} else if (h.getType() == OpCode.sasl) {
//如果不是授权操作,再判断是否为 sasl 操作
processSasl(incomingBuffer, cnxn, h);
} else {
if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) {
ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue());
cnxn.sendResponse(replyHeader, null, "response");
cnxn.sendCloseSession();
cnxn.disableRecv();
} else {
//最终进入这个代码块进行处理
//封装请求对象
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
int length = incomingBuffer.limit();
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejected
checkRequestSizeWhenMessageReceived(length);
si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
//提交请求
submitRequest(si);
}
}
}
submitRequest方法 负责在服务端提交当前请求
代码语言:javascript复制public void submitRequestNow(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
setLocalSessionFlag(si);
//处理请求 责任链模式
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type {}", si.type);
// Update request accounting/throttling limits
requestFinished(si);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
LOG.debug("Dropping request.", e);
// Update request accounting/throttling limits
requestFinished(si);
} catch (RequestProcessorException e) {
LOG.error("Unable to process request", e);
// Update request accounting/throttling limits
requestFinished(si);
}
}
firstProcessor.processRequest(si);
firstProcessor 的 初 始 化 是 在 ZookeeperServer 的setupRequestProcessor 中完成的,代码如下
代码语言:javascript复制protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor) syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor) firstProcessor).start();
}
这里用的责任链模式
从上面我们可以看到 firstProcessor 的实例是一个PrepRequestProcessor,而这个构造方法中又传递了一 个 Processor 构成了一个调用链。RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); 而syncProcessor的构造方法传递的又是一个Processor,对应的是 FinalRequestProcessor
所 以 整 个 调 用 链 是 PrepRequestProcessor ->SyncRequestProcessor ->FinalRequestProcessor
PrepRequestProcessor 的processRequest方法
代码语言:javascript复制LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
public void processRequest(Request request) {
request.prepQueueStartTime = Time.currentElapsedTime();
submittedRequests.add(request);
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
}
PrepRequestProcessor 这个类又继承了线程类 是基于异步化的操作 看run()方法
代码语言:javascript复制public void run() {
try {
while (true) {
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
//从阻塞队列中拿到请求进行处理
Request request = submittedRequests.take();
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
.add(Time.currentElapsedTime() - request.prepQueueStartTime);
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
request.prepStartTime = Time.currentElapsedTime();
//调用pRequest 进行预处理
pRequest(request);
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
}
pRequest方法
代码语言:javascript复制protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " request.cxid " type = "
// request.type " id = 0x" Long.toHexString(request.sessionId));
request.setHdr(null);
request.setTxn(null);
if (!request.isThrottled()) {
pRequestHelper(request);
}
request.zxid = zks.getZxid();
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
nextProcessor.processRequest(request);
}
nextProcessor 对 应 的 应 该 是SyncRequestProcessor
S yncRequestProcessor. processRequest方法
代码语言:javascript复制private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
public void processRequest(final Request request) {
Objects.requireNonNull(request, "Request cannot be null");
request.syncQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request);
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
}
也看run方法
代码语言:javascript复制@Override
public void run() {
try {
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
resetSnapshotStats();
lastFlushTime = Time.currentElapsedTime();
while (true) {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
if (si == null) {
/* We timed out looking for more writes to batch, go ahead and flush immediately */
flush();
//从阻塞队列中获取请求
si = queuedRequests.take();
}
if (si == REQUEST_OF_DEATH) {
break;
}
long startProcessTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
// track the number of records written to the log
//下面这块代码,粗略看来是触发快照操作,启动一个处理快照的线程
if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
if (shouldSnapshot()) {
resetSnapshotStats();
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (!snapThreadMutex.tryAcquire()) {
LOG.warn("Too busy to snap, skipping");
} else {
new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
snapThreadMutex.release();
}
}
}.start();
}
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read or a throttled request(which doesn't need to be written to the disk),
// and there are no pending flushes (writes), then just pass this to the next processor
if (nextProcessor != null) {
继续调用下一个处理器来处理请求
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable) nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (shouldFlush()) {
flush();
}
ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
}
} catch (Throwable t) {
handleException(this.getName(), t);
}
LOG.info("SyncRequestProcessor exited!");
}
FinalRe questProcessor. . processRequest方 法 并 根 据Request 对象中的操作更新内存中 Session 信息或者znode 数据。
关键代码:
代码语言:javascript复制ExistsRequest existsRequest = new ExistsRequest();
//反序列化 (将 ByteBuffer 反序列化成为 ExitsRequest.这个就是我们在客户端发起请求的时候传递过来的 Request 对象
ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
//得到请求路径
path = existsRequest.getPath();
if (path.indexOf('