Kafka Producer整体架构概述及源码分析

2022-05-17 15:44:28 浏览数 (1)

整体架构

「线程」

  • 整个 Kafka 客户端由两个线程协调运行,即Main线程和Sender线程。
  • 在Main线程中由KafkaProducer创建消息,然后通过Interceptor、Serializer和Partitioner之后缓存到RecordAccumulator(消息累加器)中。
  • Sender线程 负责从RecordAccumulator中获取消息并发送到Kafka中。

「RecordAccumulator」

  • RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
  • RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory进行配置,默认值是32MB。如果生产者发送消息的速度超过了发送到客户端的速度,则会导致生产者空间不足,此时KafkaProducer send()方法的调用要么会被阻塞,要么抛出异常。
  • KafkaProducer发送消息的速度可以有参数max.block.ms进行配置,此参数默认值为60秒。

「ProducerBatch」

  • Main线程发送过来的消息会被追加到RecordAccumulator的Deque(双端队列)中,在RecordAccumulator的内部每个Partition都维护了一个Deque,Deque中的内容就是ProducerBatch,即:Deque。
  • 消息被写入缓存时,会被追加到Deque的尾部。Sender读取消息时,会从Deque的头部进行读取。
  • ProducerBatch中可以包含一到多个ProducerRecord(生产者创建的消息),这样可以使字节的使用更加紧凑。同时,将娇小的ProducerRecord拼成一个较大的ProducerBatch也可以减少网络请求的次数以提高整体的吞吐量。
  • 如果生产者需要向多个分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。

「BufferPool」

  • 消息在网络上都是以字节进行传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中通过java.io.ByteBuffer实现消息的创建和释放,不过频繁的创建和释放比较消耗资源,在RecordAccumulator的内部还有一个BufferPool,它主要用来试验ByteBuffer的复用,已实现缓存的高效利用。
  • 但是BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会进入BufferPool。此特定值的大小可以通过参数batch.size进行配置以实现缓存不同大小的消息。

「ProducerBatch与batch.size关系」

  • 当一条消息ProducerRecord进入RecordAccumulator中时,会先寻找与消息分区所对应的的Deque(如果没有则新创建),在从这个Deque的尾部获取一个ProducerBatch(如果没有则新创建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,否则需要创建一个新的ProducerBatch。
  • 在新建ProducerBatch时需要评估这条消息的大小是否超过batch.size,如果不超过,就以batch.size的大小来创建这个ProducerBatch,这样在使用完后还可以通过BufferPool的管理进行复用。若果超过,则以消息的大小来创建ProducerBatch,此内存区域不会被复用。

「Sender」

  • Sender从RecordAccumulator中获取缓存的消息后,会进一步将原本<TopicPartition, Deque>的保存形式进一步转换为<Node,List>的形式,其中Node表示Kafka集群中的Broker节点。
  • 对于网络连接来说,生产者客户端与具体的Broker节点建立连接,也就是向具体的Broker节点发送消息,而并不关心消息属于哪个分区;对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以这里需要做一个应用逻辑层到网络I/O层面的转换。
  • 在转换成<Node,List>的形式之后,Sender还会进一步封装成<Node,List>的形式,这样就可以将Request请求发送到各个Node。

「InFlightRequests」

  • 请求从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式是Map<NodeId, Deque>,其主要作用是缓存已经发出去但还没有收到响应的请求。与此同时,InFlightRequests还提供了趣多管理类的方法,并且通过配置参数还可以限制每个连接(即客户端与Node之间的连接)最多缓存的请求数。此参数为max.in.flight.requests.per.connection,默认值是5。超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应。
  • 通过比较Deque的size与配置的最大连接数可以判断对应的node是否已经堆积了很多未响应的请求。如果已有较大未响应请求的堆积,那么说明这个Node节点负载较大或者网络连接有问题,再继续向其发送请求会增大请求超时的可能。

源码分析及图解原理

RecordAccumulator

在RecordAccumulator中,最核心的参数就是:

代码语言:javascript复制
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

它是一个ConcurrentMap,key是TopicPartition类,代表一个topic的一个partition。value是一个包含ProducerBatch的双端队列。等待Sender线程发送给broker。画张图来看下:

「再从源码角度来看如何添加到缓冲区队列里的,主要看这个方法:org.apache.kafka.clients.producer.internals.RecordAccumulator#append:」

代码语言:javascript复制
/**
* Add a record to the accumulator, return the append result
* <p>
* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
* <p>
*
* @param tp The topic/partition to which this record is being sent
* @param timestamp The timestamp of the record
* @param key The key for the record
* @param value The value for the record
* @param headers the Headers for the record
* @param callback The user-supplied callback to execute when the request is complete
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
*/
public RecordAppendResult append(TopicPartition tp,
                               long timestamp,
                               byte[] key,
                               byte[] value,
                               Header[] headers,
                               Callback callback,
                               long maxTimeToBlock) throws InterruptedException {
  // We keep track of the number of appending thread to make sure we do not miss batches in
  // abortIncompleteBatches().
  appendsInProgress.incrementAndGet();
  ByteBuffer buffer = null;
  if (headers == null) headers = Record.EMPTY_HEADERS;
  try {
      // check if we have an in-progress batch
      // 其实就是一个putIfAbsent操作的方法,不展开分析
      Deque<ProducerBatch> dq = getOrCreateDeque(tp);
      // batches是线程安全的,但是Deque不是线程安全的
      // 已有在处理中的batch
      synchronized (dq) {
          if (closed)
              throw new IllegalStateException("Cannot send after the producer is closed.");
          RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
          if (appendResult != null)
              return appendResult;
      }

      // we don't have an in-progress record batch try to allocate a new batch
      // 创建一个新的ProducerBatch
      byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
      // 分配一个内存
      int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
      log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
      // 申请不到内存
      buffer = free.allocate(size, maxTimeToBlock);
      synchronized (dq) {
          // Need to check if producer is closed again after grabbing the dequeue lock.
          if (closed)
              throw new IllegalStateException("Cannot send after the producer is closed.");

          // 再次尝试添加,因为分配内存的那段代码并不在synchronized块中
          // 有可能这时候其他线程已经创建好RecordBatch了,finally会把分配好的内存还回去
          RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
          if (appendResult != null) {
              // 作者自己都说了,希望不要总是发生,多个线程都去申请内存,到时候还不是要还回去?
              // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
              return appendResult;
          }

          // 创建ProducerBatch
          MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
          ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
          FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

          dq.addLast(batch);
          // incomplete是一个Set集合,存放不完整的batch
          incomplete.add(batch);

          // Don't deallocate this buffer in the finally block as it's being used in the record batch
          buffer = null;

          // 返回记录添加结果类
          return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
      }
  } finally {
      // 释放要还的内存
      if (buffer != null)
          free.deallocate(buffer);
      appendsInProgress.decrementAndGet();
  }
}

附加tryAppend()方法,不多说,都在代码注释里:

代码语言:javascript复制
  /**
 *  Try to append to a ProducerBatch.
 *
 *  If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
 *  resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
 *  and memory records built) in one of the following cases (whichever comes first): right before send,
 *  if it is expired, or when the producer is closed.
 */
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {
    // 获取最新加入的ProducerBatch
    ProducerBatch last = deque.peekLast();
    if (last != null) {
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
        if (future == null)
            last.closeForRecordAppends();
        else
            // 记录添加结果类包含future、batch是否已满的标记、是否是新batch创建的标记
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
    }
    // 如果这个Deque没有ProducerBatch元素,或者已经满了不足以加入本条消息则返回null
    return null;
}

以上代码见图解:

Sender

Sender里最重要的方法莫过于run()方法,其中比较核心的方法是org.apache.kafka.clients.producer.internals.Sender#sendProducerData

「其中pollTimeout需要认真读注释,意思是最长阻塞到至少有一个通道在你注册的事件就绪了。返回0则表示走起发车了」

代码语言:javascript复制
private long sendProducerData(long now) {
  // 获取当前集群的所有信息
  Cluster cluster = metadata.fetch();
  // get the list of partitions with data ready to send
  // @return ReadyCheckResult类的三个变量解释
  // 1.Set<Node> readyNodes 准备好发送的节点
  // 2.long nextReadyCheckDelayMs 下次检查节点的延迟时间
  // 3.Set<String> unknownLeaderTopics 哪些topic找不到leader节点
  RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  // if there are any partitions whose leaders are not known yet, force metadata update
  // 如果有些topic不知道leader信息,更新metadata
  if (!result.unknownLeaderTopics.isEmpty()) {
      // The set of topics with unknown leader contains topics with leader election pending as well as
      // topics which may have expired. Add the topic again to metadata to ensure it is included
      // and request metadata update, since there are messages to send to the topic.
      for (String topic : result.unknownLeaderTopics)
          this.metadata.add(topic);
      this.metadata.requestUpdate();
  }

  // 去除不能发送信息的节点
  // remove any nodes we aren't ready to send to
  Iterator<Node> iter = result.readyNodes.iterator();
  long notReadyTimeout = Long.MAX_VALUE;
  while (iter.hasNext()) {
      Node node = iter.next();
      if (!this.client.ready(node, now)) {
          iter.remove();
          notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
      }
  }

  // 获取将要发送的消息
  // create produce requests
  Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
          this.maxRequestSize, now);

  // 保证发送消息的顺序
  if (guaranteeMessageOrder) {
      // Mute all the partitions drained
      for (List<ProducerBatch> batchList : batches.values()) {
          for (ProducerBatch batch : batchList)
              this.accumulator.mutePartition(batch.topicPartition);
      }
  }

  // 过期的batch
  List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
  boolean needsTransactionStateReset = false;
  // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
  // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
  // we need to reset the producer id here.
  if (!expiredBatches.isEmpty())
      log.trace("Expired {} batches in accumulator", expiredBatches.size());
  for (ProducerBatch expiredBatch : expiredBatches) {
      failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException());
      if (transactionManager != null && expiredBatch.inRetry()) {
          needsTransactionStateReset = true;
      }
      this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
  }
  if (needsTransactionStateReset) {
      transactionManager.resetProducerId();
      return 0;
  }
  sensors.updateProduceRequestMetrics(batches);
  // If we have any nodes that are ready to send   have sendable data, poll with 0 timeout so this can immediately
  // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
  // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
  // with sendable data that aren't ready to send since they would cause busy looping.
  // 1.The amount of time to block if there is nothing to do
  // 2.waiting for a channel to become ready; if zero, block indefinitely;
  long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
  if (!result.readyNodes.isEmpty()) {
      log.trace("Nodes with data ready to send: {}", result.readyNodes);
      // if some partitions are already ready to be sent, the select time would be 0;
      // otherwise if some partition already has some data accumulated but not ready yet,
      // the select time will be the time difference between now and its linger expiry time;
      // otherwise the select time will be the time difference between now and the metadata expiry time;
      pollTimeout = 0;
  }

  // 发送消息
  // 最后调用client.send() 把ProducerBatch转换为对应的ProduceRequest,并调用NetworkClient将消息写入网络发送出去
  sendProduceRequests(batches, now);
  return pollTimeout;
}

sendProduceRequests()详解

代码语言:javascript复制
 private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
      if (batches.isEmpty())
          return;

      Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
      final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

      // find the minimum magic version used when creating the record sets
      byte minUsedMagic = apiVersions.maxUsableProduceMagic();
      for (ProducerBatch batch : batches) {
          if (batch.magic() < minUsedMagic)
              minUsedMagic = batch.magic();
      }

      for (ProducerBatch batch : batches) {
          TopicPartition tp = batch.topicPartition;
          MemoryRecords records = batch.records();

          // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
          // that the producer starts building the batch and the time that we send the request, and we may have
          // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
          // the new message format, but found that the broker didn't support it, so we need to down-convert on the
          // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
          // not all support the same message format version. For example, if a partition migrates from a broker
          // which is supporting the new magic version to one which doesn't, then we will need to convert.
          if (!records.hasMatchingMagic(minUsedMagic))
              records = batch.records().downConvert(minUsedMagic, 0, time).records();
          produceRecordsByPartition.put(tp, records);
          recordsByPartition.put(tp, batch);
      }

      String transactionalId = null;
      if (transactionManager != null && transactionManager.isTransactional()) {
          transactionalId = transactionManager.transactionalId();
      }
      // 将ProducerBatch转换为ProduceRequest
      ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
              produceRecordsByPartition, transactionalId);
      RequestCompletionHandler callback = new RequestCompletionHandler() {
          public void onComplete(ClientResponse response) {
              handleProduceResponse(response, recordsByPartition, time.milliseconds());
          }
      };

      String nodeId = Integer.toString(destination);
      // 将ProduceRequest转换为clientRequest
      ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
              requestTimeoutMs, callback);
      // 调用NetworkClient将消息写入网络发送出去
      client.send(clientRequest, now);
      log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
  }

其中也需要了解这个方法:org.apache.kafka.clients.producer.internals.RecordAccumulator#ready。返回的类中3个关键参数的解释都在注释里。

代码语言:javascript复制
/**
* Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
* partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
* partition batches.
* <p>
* A destination node is ready to send data if:
* <ol>
* <li>There is at least one partition that is not backing off its send
* <li><b>and</b> those partitions are not muted (to prevent reordering if
*   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
*   is set to one)</li>
* <li><b>and <i>any</i></b> of the following are true</li>
* <ul>
*     <li>The record set is full</li>
*     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
*     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
*     are immediately considered ready).</li>
*     <li>The accumulator has been closed</li>
* </ul>
* </ol>
*/
/**
* @return ReadyCheckResult类的三个变量解释
* 1.Set<Node> readyNodes 准备好发送的节点
* 2.long nextReadyCheckDelayMs 下次检查节点的延迟时间
* 3.Set<String> unknownLeaderTopics 哪些topic找不到leader节点
*
* 一个节点满足以下任一条件则表示可以发送数据
* 1.batch满了
* 2.batch没满,但是等了lingerMs的时间
* 3.accumulator满了
* 4.accumulator关了
*/
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
  Set<Node> readyNodes = new HashSet<>();
  long nextReadyCheckDelayMs = Long.MAX_VALUE;
  Set<String> unknownLeaderTopics = new HashSet<>();
  boolean exhausted = this.free.queued() > 0;
  for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
      TopicPartition part = entry.getKey();
      Deque<ProducerBatch> deque = entry.getValue();
      Node leader = cluster.leaderFor(part);
      synchronized (deque) {
          // leader没有且队列非空则添加unknownLeaderTopics
          if (leader == null && !deque.isEmpty()) {
              // This is a partition for which leader is not known, but messages are available to send.
              // Note that entries are currently not removed from batches when deque is empty.
              unknownLeaderTopics.add(part.topic());

              // 如果readyNodes不包含leader且muted不包含part
              // mute这个变量跟producer端的一个配置有关系:max.in.flight.requests.per.connection=1
              // 主要防止topic同分区下的消息乱序问题,限制了producer在单个broker连接上能够发送的未响应请求的数量
              // 如果设置为1,则producer在收到响应之前无法再给该broker发送该topic的PRODUCE请求
          } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
              ProducerBatch batch = deque.peekFirst();
              if (batch != null) {
                  long waitedTimeMs = batch.waitedTimeMs(nowMs);
                  boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                  // 等待时间
                  long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                  // batch满了
                  boolean full = deque.size() > 1 || batch.isFull();
                  // batch过期
                  boolean expired = waitedTimeMs >= timeToWaitMs;
                  boolean sendable = full || expired || exhausted || closed || flushInProgress();
                  if (sendable && !backingOff) {
                      readyNodes.add(leader);
                  } else {
                      long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                      // Note that this results in a conservative estimate since an un-sendable partition may have
                      // a leader that will later be found to have sendable data. However, this is good enough
                      // since we'll just wake up and then sleep again for the remaining time.
                      // 目前还没有leader,下次重试
                      nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                  }
              }
          }
      }
  }
  return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

还有一个方法就是org.apache.kafka.clients.producer.internals.RecordAccumulator#drain,从accumulator缓冲区获取要发送的数据,最大一次性发max.request.size大小的数据(最上面的配置参数里有):

代码语言:javascript复制
/**
* Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
* size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
*
* @param cluster The current cluster metadata
* @param nodes The list of node to drain
* @param maxSize The maximum number of bytes to drain
* maxSize也就是producer端配置参数max.request.size来控制的,一次最多发多少
* @param now The current unix time in milliseconds
* @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize.
*/
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
  if (nodes.isEmpty())
      return Collections.emptyMap();
  Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
  for (Node node : nodes) {
      // for循环获取要发的batch
      List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
      batches.put(node.id(), ready);
  }
  return batches;
}

private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
  int size = 0;
  // 获取node的partition
  List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
  List<ProducerBatch> ready = new ArrayList<>();
  /* to make starvation less likely this loop doesn't start at 0 */
  // 避免每次都从一个partition取,要雨露均沾
  int start = drainIndex = drainIndex % parts.size();
  do {
      PartitionInfo part = parts.get(drainIndex);
      TopicPartition tp = new TopicPartition(part.topic(), part.partition());
      this.drainIndex = (this.drainIndex   1) % parts.size();

      // Only proceed if the partition has no in-flight batches.
      if (isMuted(tp, now))
          continue;

      Deque<ProducerBatch> deque = getDeque(tp);
      if (deque == null)
          continue;

      // 加锁,不用说了吧
      synchronized (deque) {
          // invariant: !isMuted(tp,now) && deque != null
          ProducerBatch first = deque.peekFirst();
          if (first == null)
              continue;

          // first != null
          // 查看是否在backoff期间
          boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
          // Only drain the batch if it is not during backoff period.
          if (backoff)
              continue;

          // 超过maxSize且ready里有东西
          if (size   first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
              // there is a rare case that a single batch size is larger than the request size due to
              // compression; in this case we will still eventually send this batch in a single request
              // 有一种特殊的情况,batch的大小超过了maxSize,且batch是空的。也就是一个batch大小直接大于一次发送的maxSize
              // 这种情况下最终还是会发送这个batch在一次请求
              break;
          } else {
              if (shouldStopDrainBatchesForPartition(first, tp))
                  break;

              boolean isTransactional = transactionManager != null ? transactionManager.isTransactional() : false;
              ProducerIdAndEpoch producerIdAndEpoch =
                  transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
              ProducerBatch batch = deque.pollFirst();
              if (producerIdAndEpoch != null && !batch.hasSequence()) {
                  // If the batch already has an assigned sequence, then we should not change the producer id and
                  // sequence number, since this may introduce duplicates. In particular, the previous attempt
                  // may actually have been accepted, and if we change the producer id and sequence here, this
                  // attempt will also be accepted, causing a duplicate.
                  //
                  // Additionally, we update the next sequence number bound for the partition, and also have
                  // the transaction manager track the batch so as to ensure that sequence ordering is maintained
                  // even if we receive out of order responses.
                  batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
                  transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                  log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence "  
                          "{} being sent to partition {}", producerIdAndEpoch.producerId,
                      producerIdAndEpoch.epoch, batch.baseSequence(), tp);

                  transactionManager.addInFlightBatch(batch);
              }
              // 添加batch,并且close
              batch.close();
              size  = batch.records().sizeInBytes();
              ready.add(batch);

              batch.drained(now);
          }
      }
  } while (start != drainIndex);
  return ready;
}

总结

以上几个方法主要做了如下几件事:

  • 从RecordAccumulator中读取ProducerBatch,获取node列表,并将ProducerBatch与node建立对应关系;
  • 将ProducerBatch转换为ProducerRequest,再进一步转换为ClientRequest;
  • 调用NetWorkClient的send方法将消息发送出去

0 人点赞