Flink写Elasticsearch导致Checkpoint频繁失败的解决方案

2022-06-17 14:01:05 浏览数 (1)

最近做的一个需求,在一个Flink程序中,根据数据里面的ip进行分流,每个流对应一个ES的索引,一共有14个索引,开启checkpoint。

运行报错:

有些sink始终会导致ck失败,数据量也就100条。

出现这个问题后,把ck去掉,程序正常写入,不报错了。

没有ck肯定不行,所以将ck加回来,后来看了下ElasticsearchSinkBase类的代码,实现了CheckpointedFunction接口,重写了snapshotState方法,里面会根据flushOnCheckpoint成员变量判断是否进行flush。

代码语言:javascript复制
public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends RichSinkFunction<T> implements CheckpointedFunction {

 /** If true, the producer will wait until all outstanding action requests have been sent to Elasticsearch. */
 private boolean flushOnCheckpoint = true;

 /**
  * Disable flushing on checkpoint. When disabled, the sink will not wait for all
  * pending action requests to be acknowledged by Elasticsearch on checkpoints.
  *
  * <p>NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT
  * provide any strong guarantees for at-least-once delivery of action requests.
  */
 public void disableFlushOnCheckpoint() {
  this.flushOnCheckpoint = false;
 }

 @Override
 public void snapshotState(FunctionSnapshotContext context) throws Exception {
  checkAsyncErrorsAndRequests();

  if (flushOnCheckpoint) {
   while (numPendingRequests.get() != 0) {
    bulkProcessor.flush();
    checkAsyncErrorsAndRequests();
   }
  }
 }
}

很明显,是这里导致的问题,调用disableFlushOnCheckpoint关闭flush

代码语言:javascript复制
ElasticsearchSink<Row> elasticsearchSink = esSinkBuilder.build();
// 关掉基于Checkpoint的flush
elasticsearchSink.disableFlushOnCheckpoint();

问题解决

0 人点赞