flink源码分析之kafka consumer的执行流程

2021-04-29 17:40:08 浏览数 (1)

背景

线上flink任务稳定运行了两个多月了,突然之间收到了消息堆积较多的报警,kafka上看到的现象是消息堆积较多。问过业务人员得知,对应的流表在前一天重新刷了一遍数据,在我们的这个任务中有两次维表关联,而且内层有一个split操作会造成外层维表关联的数据量膨胀(最大可能为80倍,即split之后产生了80条新记录)。开始了问题分析之路。

问题

查看taskmanager的log时发现有如下报警信息:

代码语言:javascript复制
WARN  org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher  - Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity.

问题是说在flink执行checkpoint的间隔内,从kafka中拉取到的数据还没有处理完成,导致offset没办法提交,而下一次的checkpoint已经开始了,这样flink会跳过对offset的提交。

分析

我们的场景是业务刷了大量的数据,导致短时间内生产了大量的数据,flink从kafka拉取的第一批还没有处理完成时,下一次checkpoint开始了,此时检查到上一次的checkpoint还未提交就会报这个警告并跳过当前这次checkpoint时对offset的提交。由于kafka中堆积的数据量足够,下一批还是会拉取一批数据在我们这里是500条(外层膨胀后会有几万条),然后仍然会处理超时,长此以往会频繁跳过offfset的提交,在kafka控制台上看到的结果是该消费者对应的consumer Group消息堆积越来越多(实际上是有消费,只是消费得比较慢)。为什么慢?库里同时有大量写入的操作,维表关联的性能急剧下降。这里不讨论维表性能的优化,我们主要基于问题来分析下flink中消费kafka的源码流程。

针对kafka消费的流程,我们来从头分析一下:

Task角度

Task是一个Runnable对象,它的run方法定义如下:

代码语言:javascript复制
    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
        try {
            doRun();
        } finally {
            terminationFuture.complete(executionState);
        }
    }

在它的doRun方法中会真正执行StreamTask的逻辑,StreamTask同时也是AbstractInvokable的子类。Task的doRun方法的部分代码如下:

它会初始化invokable实例并调用invokable的invoke方法。invokable实例是StreamTask类型的。

StreamTask角度

kafka源对应的StreamTask为SourceStreamTask,它的结构为:

我们来看SourceStreamTask的invokable过程,它的invokable方法为org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:

代码语言:javascript复制
try {
            beforeInvoke();
            // final check to exit early before starting to run
            if (canceled) {
                throw new CancelTaskException();
            }
            // let the task do its work
            runMailboxLoop();
            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }
            afterInvoke();
        }
        //----------省略部分代码

在这里我们不深究StreamTask的完整的初始化流程,只关注下我们本文要关注的重点,其他内容后面再在专门的篇幅中具体分析。

beforeInvoke方法

我们来看beforeInvoke方法:

我们主要关注它的两个动作:

•执行SourceStreamTask的init方法。在init方法中主要执行一些和checkpoint和operator的udf相关的信息。•执行operatorChain.initializeStateAndOpenOperators方法。在org.apache.flink.streaming.runtime.tasks.OperatorChain#initializeStateAndOpenOperators方法中主要执行operator的open操作,代码如下:

代码语言:javascript复制
      protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
          for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {
              StreamOperator<?> operator = operatorWrapper.getStreamOperator();
              operator.initializeState(streamTaskStateInitializer);
              operator.open();
          }
      }

kafka Source对应的Operator为StreamSource类型的,是AbstractUdfStreamOperator的一个子类,它的open方法代码如下:

代码语言:javascript复制
      @Override
      public void open() throws Exception {
          super.open();
          FunctionUtils.openFunction(userFunction, new Configuration());
      }

需要注意的是,这个userFunction是FlinkKafkaConsumer的一个实例。FlinkKafkaConsumer是FlinkKafkaConsumerBase类型的,openFunction方法会调用到org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#open方法,在该方法中会使用partitionDiscoverer获取到分区信息,然后尝试去state中获取,如果restoreState不为空则将partition信息与restoreState进行同步,将放入到subscribedPartitionsToStartOffsets容器中;如果restoreState为空则根据StartupMode来按照相应的模式处理partition列表中的信息。

在FlinkKafkaConsumerBase的open方法中还有一点需要注意的是,对于offset的处理逻辑。我们来看下第一行代码:

代码语言:javascript复制
  // determine the offset commit mode
          this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
              getIsAutoCommitEnabled(),
              enableCommitOnCheckpoints,
              ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

OffsetCommitModes.fromConfiguration方法的参数定义为:第一个参数为是否启动了自动提交、第二个参数为是否允许在checkpoint的时候进行offset的提交、第三个参数为是否启动了checkpoint。关于这一段在flink官方文档中也有说明,地址为:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration,我们直接从源码上来分析:

•如果启用了checkpoint,直接通过是否启动了enableCommitOnCheckpoint来决定提交的模式,enableCommitOnCheckpoint默认为true,可以通过org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#setCommitOffsetsOnCheckpoints方法来修改。如果enableCommitOnCheckpoint为false则不进行offset的提交。•如果禁用了checkpoint,则根据是否启动了自动提交来判断,如果没有启动则不进行offset提交。

runMailboxLoop方法

我们直接来看org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop代码:

代码语言:javascript复制
  public void runMailboxLoop() throws Exception {
          mailboxProcessor.runMailboxLoop();
      }

我们先来看下在StreamTask的构造方法中对mailboxProcessor的定义:

代码语言:javascript复制
  // 创建 mailboxProcessor
          this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);

第一个入参为MailboxDefaultAction,第二个入参为一个mailbox队列,第三人入参为线程执行器。其中MailboxDefaultAction对象为lambda表达式。

接下来我们来看org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop方法:

代码语言:javascript复制
      /**
       * Runs the mailbox processing loop. This is where the main work is done.
       */
      public void runMailboxLoop() throws Exception {
          // 邮箱
          final TaskMailbox localMailbox = mailbox;

          Preconditions.checkState(
              localMailbox.isMailboxThread(),
              "Method must be executed by declared mailbox thread!");

          assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";
          // 邮箱controller,与processor绑定
          final MailboxController defaultActionContext = new MailboxController(this);
          // 当mailbox循环处于运行状态时,会一直消费Mailbox中的message(实际上是一个FIFO的队列)
          while (isMailboxLoopRunning()) {
              // The blocking `processMail` call will not return until default action is available.
              processMail(localMailbox, false);
              if (isMailboxLoopRunning()) {
                  // 进行 task 的 default action,也就是调用 processInput()
                  // 这里的defaultAction是在StreamTask的构造方法中的this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor)中的this::processInput。
                  mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed
              }
          }
      }

可以看到在该方法中会先循环执行processMail方法,然后执行mailboxDefaultAction.runDefaultAction(defaultActionContext)方法。这里我们主要关心后者,通过上文我们知道mailboxDefaultAction初始化为一个lambda表达式,在执行runDefaultAction时实际调用的是org.apache.flink.streaming.runtime.tasks.StreamTask#processInput方法。在我们本文的分析中它对应的是org.apache.flink.streaming.runtime.tasks.SourceStreamTask#processInput方法:

这里会启动sourceThread线程,sourceThread线程为LegacySourceFunctionThread类型的,我们来看下它run方法中的运行逻辑:

mainOperator的run方法的调用链为:

org.apache.flink.streaming.api.operators.StreamSource#run(java.lang.Object, org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer, org.apache.flink.streaming.runtime.tasks.OperatorChain) 到org.apache.flink.streaming.api.operators.StreamSource#run(java.lang.Object, org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer, org.apache.flink.streaming.api.operators.Output<org.apache.flink.streaming.runtime.streamrecord.StreamRecord>, org.apache.flink.streaming.runtime.tasks.OperatorChain),然后调用里面的userFunction.run(ctx)方法,继而调用到org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#run方法。

FlinkKafkaConsumerBase的run方法部分代码如下:

主要有两种操作:

1.创建KafkaFetcher;2.执行KafkaFetcher的loop操作,最多是增加一路partition discovery的操作。

KafkaFetcher

构造方法的部分代码如下:

主要的操作是设置上下文信息、watermark信息、checkpoint信息、checkpointLock、类加载器、handover缓存、consumerThread、kafkaCollector等。这里我们来关注下对于consumerThread的初始化操作:

代码语言:javascript复制
this.consumerThread = new KafkaConsumerThread(
            LOG,
            handover,
            kafkaProperties,
            unassignedPartitionsQueue,
            getFetcherName()   " for "   taskNameWithSubtasks,
            pollTimeout,
            useMetrics,
            consumerMetricGroup,
            subtaskMetricGroup);

这里我们主要关注下unassignedPartitionsQueue,它是在AbstractFetcher中初始化的,AbstractFetcher的构造方法部分代码为:

也就是说在初始化Fetcher时会将所有的partition信息放到unassignedPartitionsQueue中,意思是未分配的partition队列。

我们接着来看KafkaFetcher的runFetchLoop方法:

主要操作是启动consumerThread,然后执行loop操作从handover中消费数据。

拉取数据与提交offset的核心逻辑在ConsumerThread中,我们来看对应代码:

1.run方法的初始部分

代码语言:javascript复制
   public void run() {
           // early exit check
           if (!running) {
               return;
           }

           // this is the means to talk to FlinkKafkaConsumer's main thread
           final Handover handover = this.handover;

           // This method initializes the KafkaConsumer and guarantees it is torn down properly.
           // This is important, because the consumer has multi-threading issues,
           // including concurrent 'close()' calls.
           try {
               // 获取consumer,每次来获取都是new一个新的KafkaConsumer
               this.consumer = getConsumer(kafkaProperties);
           }
           catch (Throwable t) {
               handover.reportError(t);
               return;
           }
           // -------------省略-------

•初始化实例的handover对象;•获取一个新的consumer实例(因为kafkaConsumer是线程不安全的)。

1.fetch loop

代码语言:javascript复制
       // -------省略代码---------------
       // main fetch loop
               while (running) {

                   // check if there is something to commit
                   if (!commitInProgress) {// 默认为false
                       // get and reset the work-to-be committed, so we don't repeatedly commit the same
                       final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
                               nextOffsetsToCommit.getAndSet(null);

                       if (commitOffsetsAndCallback != null) {// 如果不为空,代表上次的提交没有完成,这里会继续进行异步提交
                           log.debug("Sending async offset commit request to Kafka broker");

                           // also record that a commit is already in progress
                           // the order here matters! first set the flag, then send the commit command.
                           commitInProgress = true;
                           consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
                       }
                   }
                   try {
                       if (hasAssignedPartitions) {
                           // 如果已经分配好分区,查看是否有未分配的partition,分配分区的
                           newPartitions = unassignedPartitionsQueue.pollBatch();
                       }
                       else {
                           // if no assigned partitions block until we get at least one
                           // instead of hot spinning this loop. We rely on a fact that
                           // unassignedPartitionsQueue will be closed on a shutdown, so
                           // we don't block indefinitely
                           newPartitions = unassignedPartitionsQueue.getBatchBlocking();
                       }
                       if (newPartitions != null) {
                           reassignPartitions(newPartitions);
                       }
                   } catch (AbortedReassignmentException e) {
                       continue;
                   }
                   if (!hasAssignedPartitions) {
                       // Without assigned partitions KafkaConsumer.poll will throw an exception
                       continue;
                   }
                   // get the next batch of records, unless we did not manage to hand the old batch over
                   if (records == null) {
                       try {
                           records = consumer.poll(pollTimeout);
                       }
                       catch (WakeupException we) {
                           continue;
                       }
                   }
                   try {
                       handover.produce(records);
                       records = null;
                   }
                   //---------省略代码

这个部分需要注意以下几点:

•commitInProgress的状态变化:默认为false,在执行consumer.commitAsync之前会置为true,在consumer.commitAsync操作callback通知后会置为false以便进行下一次的consumer.commitAsync操作;•nextOffsetsToCommit变量的变化点在org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread#setOffsetsToCommit方法中,该方法的调用链路为:

这个发生在notifyCheckpointComplete方法的调用中,也就是说在一次checkpoint完成后会执行setOffsetsToCommit方法。同时这里提一点题外话,这个过程是在processMail中执行的,也证明了flink在处理event processing、Processing-Time的定时器和checkpoint使用mailbox后的改进(题外话,可以参考之前写过的StreamTask线程模型分析的文章)。同时在执行一次consumer.commitAsync操作后会将nextOffsetsToCommit的值置为null。

我们来看下setOffsetsToCommit的一段代码:

代码语言:javascript复制
    // record the work to be committed by the main consumer thread and make sure the consumer notices that
        if (nextOffsetsToCommit.getAndSet(Tuple2.of(offsetsToCommit, commitCallback)) != null) {//设置新值,返回老值,老值是否为null
            log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. "  
                    "Skipping commit of previous offsets because newer complete checkpoint offsets are available. "  
                    "This does not compromise Flink's checkpoint integrity.");
        }

如果执行checkpoint操作时,上一次拉取的数据的offset还没有提交,这里会抛出警告并跳过本次offset的提交。这里需要注意的是consumer每次拉取数据会自己维护offset的变化,不依赖于kafka broker上当前消费者组的offset(如下图所示),但是在consumer重新初始化时会依赖这个。

•newPartitions的初始化,第一次进入时hasAssignedPartitions为false,会依赖unassignedPartitionsQueue.getBatchBlocking()方法进行初始化,并进入reassignPartitions方法进行分区的分配逻辑,将hasAssignedPartitions置为true,后面loop到这段代码时会执行 unassignedPartitionsQueue.pollBatch(),将一些新加入的或者之前分配失败的分区进行分配。•consumer.poll 执行kafkaConsumer的拉取数据的操作。•handover.produce:将数据放入到handover中,这里的数据会被KafkaFetcher中的Loop操作消费掉。

0 人点赞