最近做的一个需求,在一个Flink程序中,根据数据里面的ip进行分流,每个流对应一个ES的索引,一共有14个索引,开启checkpoint。
运行报错:
有些sink始终会导致ck失败,数据量也就100条。
出现这个问题后,把ck去掉,程序正常写入,不报错了。
没有ck肯定不行,所以将ck加回来,后来看了下ElasticsearchSinkBase类的代码,实现了CheckpointedFunction接口,重写了snapshotState方法,里面会根据flushOnCheckpoint成员变量判断是否进行flush。
代码语言:javascript复制public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends RichSinkFunction<T> implements CheckpointedFunction {
/** If true, the producer will wait until all outstanding action requests have been sent to Elasticsearch. */
private boolean flushOnCheckpoint = true;
/**
* Disable flushing on checkpoint. When disabled, the sink will not wait for all
* pending action requests to be acknowledged by Elasticsearch on checkpoints.
*
* <p>NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT
* provide any strong guarantees for at-least-once delivery of action requests.
*/
public void disableFlushOnCheckpoint() {
this.flushOnCheckpoint = false;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkAsyncErrorsAndRequests();
if (flushOnCheckpoint) {
while (numPendingRequests.get() != 0) {
bulkProcessor.flush();
checkAsyncErrorsAndRequests();
}
}
}
}
很明显,是这里导致的问题,调用disableFlushOnCheckpoint关闭flush
代码语言:javascript复制ElasticsearchSink<Row> elasticsearchSink = esSinkBuilder.build();
// 关掉基于Checkpoint的flush
elasticsearchSink.disableFlushOnCheckpoint();
问题解决