前面我们通过学习scala知道通常如果想运行scala程序,必然会有一个入口,而这个入口可以通过kafka的启动脚本kafka-server-start.sh可以看到运行的就是kafka.Kafka,也即Kafka.scala。和RocketMQ一样,kafka的broker也是在服务端,而生产者和消费者在client端中。而此时的疑问是kafka的网络准备、kafka中的broker和生产者、消费者又是怎样联系起来的呢?
这里首先kafka会进行启动,此时找到入口之后,我们应该知道kafka最需要做两件事,一个是对网络上的相关操作进行启动SocketServer,此时与业务无关,而另一个则是和业务相关的KafkaApis,此时会进行模式匹配,如果匹配到哪个请求,则进行哪个请求的具体实现处理。下面来看具体的源码,重点把握KafkaServer。
Kafka.scala
代码语言:javascript复制 def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
try {
if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
new LoggingSignalHandler().register()
} catch {
case e: ReflectiveOperationException =>
warn("Failed to register optional signal handler that logs a message when the process is terminated "
s"by a signal. Reason for registration failure is: $e", e)
}
// attach shutdown handler to catch terminating signals as well as normal termination
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})
//kafkaServer端启动
kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()
}
catch {
case e: Throwable =>
fatal("Exiting Kafka due to fatal exception", e)
Exit.exit(1)
}
Exit.exit(0)
}
}
kafkaServerStartable.startup()
代码语言:javascript复制//启动kafka服务端
def startup() {
try server.startup()
catch {
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}
}
KafkaServer kafka的服务端,重要
里面启动了很多重要的方法,而这些方法是后面联系kafka的生产者和消费者的
代码语言:javascript复制/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
try {
info("starting")
if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
if (startupComplete.get)
return
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
brokerState.newState(Starting)
/* setup zookeeper */
//初始化zookeeper客户端
initZkClient(time)
/* Get or create cluster_id */
_clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")
/* generate brokerId */
val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
config.brokerId = brokerId
logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
this.logIdent = logContext.logPrefix
// initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
// applied after DynamicConfigManager starts.
//从zookeeper中初始化动态broker配置信息
config.dynamicConfig.initialize(zkClient)
/* start scheduler */
//启动kafka定时任务
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()
/* create and configure metrics */
val reporters = new util.ArrayList[MetricsReporter]
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
metrics = new Metrics(metricConfig, reporters, time, true)
/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
notifyClusterListeners(kafkaMetricsReporters metrics.reporters.asScala)
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
/* start log manager */
//启动日志管理
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()
metadataCache = new MetadataCache(config.brokerId)
// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
//启动sockerServer 重要
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)
/* start replica manager */
//启动replica管理
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()
val brokerInfo = createBrokerInfo
zkClient.registerBrokerInZk(brokerInfo)
// Now that the broker id is successfully registered, checkpoint it
checkpointBrokerId(config.brokerId)
/* start token manager */
//启动token管理
tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
tokenManager.startup()
/* start kafka controller */
//启动kafka控制器
kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, tokenManager, threadNamePrefix)
kafkaController.startup()
adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
//启动组协调器
groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM)
groupCoordinator.startup()
/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
//启动事务协调器
transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
val fetchManager = new FetchManager(Time.SYSTEM,
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
/* start processing requests */
//创建新的kafkaApi对象
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)
//创建新的kafka请求处理器池
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)
Mx4jLoader.maybeLoad()
/* Add all reconfigurables for config change notification before starting config handlers */
config.dynamicConfig.addReconfigurables(this)
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
// Create the config manager. start listening to notifications
//启动动态配置管理
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
//启动处理器
socketServer.startProcessors()
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
//启动完整标识设置
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
SocketServer
代码语言:javascript复制def startup(startupProcessors: Boolean = true) {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
if (startupProcessors) {
//重要
startProcessors()
}
}
newGauge("NetworkProcessorAvgIdlePercent",
new Gauge[Double] {
def value = SocketServer.this.synchronized {
val ioWaitRatioMetricNames = processors.values.asScala.map { p =>
metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
}
ioWaitRatioMetricNames.map { metricName =>
Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
}.sum / processors.size
}
}
)
newGauge("MemoryPoolAvailable",
new Gauge[Long] {
def value = memoryPool.availableMemory()
}
)
newGauge("MemoryPoolUsed",
new Gauge[Long] {
def value = memoryPool.size() - memoryPool.availableMemory()
}
)
info("Started " acceptors.size " acceptor threads")
}
启动处理器SocketServer
代码语言:javascript复制def startProcessors(): Unit = synchronized {
acceptors.values.asScala.foreach { _.startProcessors() }
info(s"Started processors for ${acceptors.size} acceptors")
}
启动处理器
代码语言:javascript复制private[network] def startProcessors(): Unit = synchronized {
if (!processorsStarted.getAndSet(true)) {
startProcessors(processors)
}
}
继续启动
代码语言:javascript复制private def startProcessors(processors: Seq[Processor]): Unit = synchronized {
processors.foreach { processor =>
KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
processor).start()
}
}
Processor
代码语言:javascript复制private[kafka] class Processor(val id: Int,
time: Time,
maxRequestSize: Int,
requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
connectionsMaxIdleMs: Long,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
config: KafkaConfig,
metrics: Metrics,
credentialProvider: CredentialProvider,
memoryPool: MemoryPool,
logContext: LogContext) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
import Processor._
private object ConnectionId {
def fromString(s: String): Option[ConnectionId] = s.split("-") match {
case Array(local, remote, index) => BrokerEndPoint.parseHostPort(local).flatMap { case (localHost, localPort) =>
BrokerEndPoint.parseHostPort(remote).map { case (remoteHost, remotePort) =>
ConnectionId(localHost, localPort, remoteHost, remotePort, Integer.parseInt(index))
}
}
case _ => None
}
}
private[network] case class ConnectionId(localHost: String, localPort: Int, remoteHost: String, remotePort: Int, index: Int) {
override def toString: String = s"$localHost:$localPort-$remoteHost:$remotePort-$index"
}
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
private[kafka] val metricTags = mutable.LinkedHashMap(
ListenerMetricTag -> listenerName.value,
NetworkProcessorMetricTag -> id.toString
).asJava
newGauge(IdlePercentMetricName,
new Gauge[Double] {
def value = {
Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags)))
.fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
}
},
// for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric
// also includes the listener name)
Map(NetworkProcessorMetricTag -> id.toString)
)
private val selector = createSelector(
ChannelBuilders.serverChannelBuilder(listenerName,
listenerName == config.interBrokerListenerName,
securityProtocol,
config,
credentialProvider.credentialCache,
credentialProvider.tokenCache))
// Visible to override for testing
protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
channelBuilder match {
case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)
case _ =>
}
new KSelector(
maxRequestSize,
connectionsMaxIdleMs,
metrics,
time,
"socket-server",
metricTags,
false,
true,
channelBuilder,
memoryPool,
logContext)
}
// Connection ids have the format `localAddr:localPort-remoteAddr:remotePort-index`. The index is a
// non-negative incrementing value that ensures that even if remotePort is reused after a connection is
// closed, connection ids are not reused while requests from the closed connection are being processed.
private var nextConnectionIndex = 0
//重写run方法
override def run() {
//启动完整
startupComplete()
try {
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
//处理新的响应
processNewResponses()
//执行poll方法
poll()
//处理完整接收方法
processCompletedReceives()
//处理完整的发送方法
processCompletedSends()
processDisconnected()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. These exceptions are caught and
// processed by the individual methods above which close the failing channel and continue processing other
// channels. So this catch block should only ever see ControlThrowables.
case e: Throwable => processException("Processor got uncaught exception.", e)
}
}
} finally {
debug("Closing selector - processor " id)
CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}
private def processException(errorMessage: String, throwable: Throwable) {
throwable match {
case e: ControlThrowable => throw e
case e => error(errorMessage, e)
}
}
private def processChannelException(channelId: String, errorMessage: String, throwable: Throwable) {
if (openOrClosingChannel(channelId).isDefined) {
error(s"Closing socket for $channelId because of error", throwable)
close(channelId)
}
processException(errorMessage, throwable)
}
//处理新的响应信息
private def processNewResponses() {
var curr: RequestChannel.Response = null
while ({curr = dequeueResponse(); curr != null}) {
val channelId = curr.request.context.connectionId
try {
curr.responseAction match {
case RequestChannel.NoOpAction =>
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
updateRequestMetrics(curr)
trace("Socket server received empty response to send, registering for read: " curr)
openOrClosingChannel(channelId).foreach(c => selector.unmute(c.id))
case RequestChannel.SendAction =>
val responseSend = curr.responseSend.getOrElse(
throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr"))
sendResponse(curr, responseSend)
case RequestChannel.CloseConnectionAction =>
updateRequestMetrics(curr)
trace("Closing socket connection actively according to the response code.")
close(channelId)
}
} catch {
case e: Throwable =>
processChannelException(channelId, s"Exception while processing response for $channelId", e)
}
}
}
/* `protected` for test usage */
//发送响应
protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
val connectionId = response.request.context.connectionId
trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response")
// `channel` can be None if the connection was closed remotely or if selector closed it for being idle for too long
if (channel(connectionId).isEmpty) {
warn(s"Attempting to send response via channel for which there is no open connection, connection id $connectionId")
response.request.updateRequestMetrics(0L, response)
}
// Invoke send for closingChannel as well so that the send is failed and the channel closed properly and
// removed from the Selector after discarding any pending staged receives.
// `openOrClosingChannel` can be None if the selector closed the connection because it was idle for too long
if (openOrClosingChannel(connectionId).isDefined) {
selector.send(responseSend)
inflightResponses = (connectionId -> response)
}
}
//poll方法
private def poll() {
try selector.poll(300)
catch {
case e @ (_: IllegalStateException | _: IOException) =>
// The exception is not re-thrown and any completed sends/receives/connections/disconnections
// from this poll will be processed.
error(s"Processor $id poll failed due to illegal state or IO exception")
}
}
//处理完整接收
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
openOrClosingChannel(receive.source) match {
case Some(channel) =>
val header = RequestHeader.parse(receive.payload)
val context = new RequestContext(header, receive.source, channel.socketAddress,
channel.principal, listenerName, securityProtocol)
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)
requestChannel.sendRequest(req)
selector.mute(receive.source)
case None =>
// This should never happen since completed receives are processed immediately after `poll()`
throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
}
} catch {
// note that even though we got an exception, we can assume that receive.source is valid.
// Issues with constructing a valid receive object were handled earlier
case e: Throwable =>
processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
}
}
}
//处理完整发送
private def processCompletedSends() {
selector.completedSends.asScala.foreach { send =>
try {
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
updateRequestMetrics(resp)
selector.unmute(send.destination)
} catch {
case e: Throwable => processChannelException(send.destination,
s"Exception while processing completed send to ${send.destination}", e)
}
}
}
private def updateRequestMetrics(response: RequestChannel.Response) {
val request = response.request
val networkThreadTimeNanos = openOrClosingChannel(request.context.connectionId).fold(0L)(_.getAndResetNetworkThreadTimeNanos())
request.updateRequestMetrics(networkThreadTimeNanos, response)
}
private def processDisconnected() {
selector.disconnected.keySet.asScala.foreach { connectionId =>
try {
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost
inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
// the channel has been closed by the selector but the quotas still need to be updated
connectionQuotas.dec(InetAddress.getByName(remoteHost))
} catch {
case e: Throwable => processException(s"Exception while processing disconnection of $connectionId", e)
}
}
}
/**
* Close the connection identified by `connectionId` and decrement the connection count.
* The channel will be immediately removed from the selector's `channels` or `closingChannels`
* and no further disconnect notifications will be sent for this channel by the selector.
* If responses are pending for the channel, they are dropped and metrics is updated.
* If the channel has already been removed from selector, no action is taken.
*/
private def close(connectionId: String): Unit = {
openOrClosingChannel(connectionId).foreach { channel =>
debug(s"Closing selector connection $connectionId")
val address = channel.socketAddress
if (address != null)
connectionQuotas.dec(address)
selector.close(connectionId)
inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response))
}
}
/**
* Queue up a new connection for reading
*/
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}
/**
* Register any new connections that have been queued up
*/
private def configureNewConnections() {
while (!newConnections.isEmpty) {
val channel = newConnections.poll()
try {
debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
selector.register(connectionId(channel.socket), channel)
} catch {
// We explicitly catch all exceptions and close the socket to avoid a socket leak.
case e: Throwable =>
val remoteAddress = channel.socket.getRemoteSocketAddress
// need to close the channel here to avoid a socket leak.
close(channel)
processException(s"Processor $id closed connection from $remoteAddress", e)
}
}
}
/**
* Close the selector and all open connections
*/
private def closeAll() {
selector.channels.asScala.foreach { channel =>
close(channel.id)
}
selector.close()
removeMetric(IdlePercentMetricName, Map(NetworkProcessorMetricTag -> id.toString))
}
// 'protected` to allow override for testing
protected[network] def connectionId(socket: Socket): String = {
val localHost = socket.getLocalAddress.getHostAddress
val localPort = socket.getLocalPort
val remoteHost = socket.getInetAddress.getHostAddress
val remotePort = socket.getPort
val connId = ConnectionId(localHost, localPort, remoteHost, remotePort, nextConnectionIndex).toString
nextConnectionIndex = if (nextConnectionIndex == Int.MaxValue) 0 else nextConnectionIndex 1
connId
}
private[network] def enqueueResponse(response: RequestChannel.Response): Unit = {
responseQueue.put(response)
wakeup()
}
private def dequeueResponse(): RequestChannel.Response = {
val response = responseQueue.poll()
if (response != null)
response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
response
}
private[network] def responseQueueSize = responseQueue.size
// Only for testing
private[network] def inflightResponseCount: Int = inflightResponses.size
// Visible for testing
// Only methods that are safe to call on a disconnected channel should be invoked on 'openOrClosingChannel'.
private[network] def openOrClosingChannel(connectionId: String): Option[KafkaChannel] =
Option(selector.channel(connectionId)).orElse(Option(selector.closingChannel(connectionId)))
/* For test usage */
private[network] def channel(connectionId: String): Option[KafkaChannel] =
Option(selector.channel(connectionId))
// Visible for testing
private[network] def numStagedReceives(connectionId: String): Int =
openOrClosingChannel(connectionId).map(c => selector.numStagedReceives(c)).getOrElse(0)
/**
* Wakeup the thread for selection.
*/
override def wakeup() = selector.wakeup()
override def shutdown(): Unit = {
super.shutdown()
removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
}
}
此时kafka已经启动,那还有重要的业务逻辑处理KafkaApis
kafkaApis
代码语言:javascript复制/**
* Top-level method that handles all requests and multiplexes to the right api
* 顶级方法,用于处理所有请求并多路复用到正确的api
*/
def handle(request: RequestChannel.Request) {
try {
//使用Scala的模式匹配,配置到每一个对应的请求,进行相应的处理
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};"
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
request.header.apiKey match {
//匹配生产,处理生产请求
case ApiKeys.PRODUCE => handleProduceRequest(request)
//匹配获取消息,处理获取请求
case ApiKeys.FETCH => handleFetchRequest(request)
//匹配偏移量列表,处理偏移量列表请求
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
//匹配元数据,处理主体元数据请求
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
//匹配主和正在同步的副本 ISR:in sync replica 正在同步副本,AR: all replica 所有副本
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
//停止复制,处理停止复制请求
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
//更新元数据
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
//控制器关闭
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
//偏移量提交请求
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
//偏移量获取请求
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
//找到协调器
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
//加入组请求
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
//心跳请求
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
//离开组请求
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
//同步组请求
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
//处理描述组请求
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
//组列表请求
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
//处理Sasl握手请求
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
//创建主题请求
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
//删除主题请求
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
//删除消息请求
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
//初始化生产者id请求
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
//处理leaderEpoch请求的偏移
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
//添加分区到事务请求
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
//添加偏移量到事务请求
case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
case ApiKeys.END_TXN => handleEndTxnRequest(request)
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
//处理更改副本日志目录请求
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
//描述日志目录请求
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
//创建新分区请求
case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
//删除队请求
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
}
} catch {
case e: FatalExitError => throw e
case e: Throwable => handleError(request, e)
} finally {
request.apiLocalCompleteTimeNanos = time.nanoseconds
}
}
说到这里,我们还是没有看到kafka中Broker存储和生产者、消费者的联系,根据我们知道的,kafka的生产者和消费者和存储的关系必然会通过LogFile和IndexFile进行联系。但此时我们依然没有看到LogFile和IndexFile。
要研究存储和生产者和消费者的联系,必然我们需要研究Log,日志相关的信息。而根据认识,我们知道一个主题,对应多个分区,而一个分区中会有多个日志目录,而一个日志目录,会有日志文件和索引文件、时间戳文件、恢复文件等,而一个Log文件有多个LogSegment。