现象
source:
oracle的cdc数据,这个表的特点是有大量的删除操作
sink:
写到kudu表,用的connector是根据apache bahir的kudu-connector修改的jar
transform:
做一个基本的字段筛选和标准化以及添加一些用于辅助问题排查的kafka metadata
报错信息:
代码语言:javascript复制java.io.IOException: Error while sending value.
Row error for primary key="202011110000493806x00x0010", tablet=null, server=01bc92eab5d342488d5b7793b04d82ee, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x003", tablet=null, server=5f50378b13ed437aa2f9cecdbd3d5d57, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x0011", tablet=null, server=980cd7ad47a848689bb025310558ea07, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x008", tablet=null, server=ec8da9d650834ba48f22db70ea8bba66, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x004", tablet=null, server=5f50378b13ed437aa2f9cecdbd3d5d57, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x0012", tablet=null, server=ec8da9d650834ba48f22db70ea8bba66, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x007", tablet=null, server=5f50378b13ed437aa2f9cecdbd3d5d57, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x006", tablet=null, server=01bc92eab5d342488d5b7793b04d82ee, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x0010", tablet=null, server=01bc92eab5d342488d5b7793b04d82ee, status=Not found: key not found (error 0)
at org.colloh.flink.kudu.connector.internal.failure.DefaultKuduFailureHandler.onFailure(DefaultKuduFailureHandler.java:37) ~[rt-calc-sql-1.12.1-20210715.jar:na]
at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.checkAsyncErrors(KuduWriter.java:156) ~[rt-calc-sql-1.12.1-20210715.jar:na]
at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.write(KuduWriter.java:97) ~[rt-calc-sql-1.12.1-20210715.jar:na]
at org.colloh.flink.kudu.connector.table.sink.KuduSink.invoke(KuduSink.java:93) ~[rt-calc-sql-1.12.1-20210715.jar:na]
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:72) ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at StreamExecCalc$102.processElement(Unknown Source) ~[na:na]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
分析
之前的作业没有问题,这张表的特点是-D操作很多,异常信息是报主键不存在,那么可能是删除了不存在的数据。
但是,用kudu client直接测试直接delete不存在的数据发现没有这个现象,代码如下
代码语言:javascript复制public static void main(String[] args) {
// master地址
String masterAddr = "xxxxxx";
KuduClient client = new KuduClient.KuduClientBuilder(masterAddr).build();
try {
KuduTable table = client.openTable("rt_dwd.cdc2kudu");
deleteRow(client, table);
} catch (KuduException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
public static void deleteRow(KuduClient client, KuduTable table) throws KuduException {
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
Delete delete = table.newDelete();
delete.getRow().addLong("appl_seq", 1);
delete.getRow().addInt("amount", 1);
session.apply(delete);
session.flush();
session.close();
}
经过查阅资料及各种测试验证和源码分析,发现这个异常是Kudu自身的一种校验机制。并且这种机制在AUTO_FLUSH_BACKGROUND下是比较严格的,会抛出异常给最外层的用户代码,让用户选择是否忽略这种删除数据库中不存在的数据的异常。
代码语言:javascript复制/**
* 最终flush操作,主要解决删除的时候,删除了不存在的数据
*/
private void terminalFlush() throws KuduException {
final java.util.List<OperationResponse> responses = session.flush();
for (OperationResponse response : responses) {
if (response.hasRowError()) {
throw new SinkException("encounter key not found error.More.detail "
"=> table : " response.getRowError().getOperation().getTable().getName() ""
"=> row : " response.getRowError().getOperation().getRow().stringifyRowKey());
}
}
LOG.debug("Sub task {} flushed sink records", id);
}
解决
所以,我选择了修改kudu-connector的源码来降低这种异常的影响。
解决思路有两种:
1. 选择MANUAL_FLUSH,这种思路需要另外增加一些主动flush的逻辑。
2. 选择AUTO_FLUSH_BACKGROUND,这种思路需要在异常捕获中过滤掉这种异常,其他异常依旧按原逻辑抛出。
我选2:
代码语言:javascript复制KuduWriter:修改write的逻辑
public void write(T input) throws IOException {
try {
this.checkAsyncErrors();
} catch (Exception var7) {
String errorString = var7.getMessage();
// 主要过滤kudu删除的时候,删除到了不存在的数据
if (!errorString.contains("key not found (error 0)")) {
throw var7;
}
this.log.warn("encounter key not found " errorString);
}
Iterator var2 = this.operationMapper.createOperations(input, this.table).iterator();
while(var2.hasNext()) {
Operation operation = (Operation)var2.next();
try {
this.checkErrors(this.session.apply(operation));
} catch (Exception var6) {
String errorString = var6.getMessage();
if (!errorString.contains("key not found (error 0)")) {
throw var6;
}
this.log.warn("encounter key not found " errorString);
}
}
}
复盘
kudu client的写机制在这个类中基本可以了解
结合flink,将写入kudu的逻辑简单总结如下图
这里应该特别注意的是,flink subtask和buffer应该是1:1的关系,否则数据顺序性无法保证。
经过调整后,可以复现原本的问题,代码如下
代码语言:javascript复制/**
* 验证在FlushMode.AUTO_FLUSH_BACKGROUND模式下检查删除Row的主键不存在抛异常的问题。
*
* exceptionbuffer有数据的条件:
* AUTO_FLUSH_BACKGROUND
*
* @param client
* @param table
* @throws KuduException
*/
public static void testCDC_ID(KuduClient client, KuduTable table) throws KuduException, InterruptedException {
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
session.setMutationBufferSpace(1);
for (int i = 0; i < 1; i ) {
Delete delete = table.newDelete();
delete.getRow().addLong("appl_seq", i % 1);
delete.getRow().addInt("amount", i % 1);
session.apply(delete);
}
// 这里要延迟等一下,因为AUTO_FLUSH_BACKGROUND模式下,提交数据的kuduclient是异步的,数据达到kudu数据库后才会知道主键存不存在。
// 返回错误信息放在errorbuffer里后,这个时候当前线程再去拿countPendingErrors才会取到错误条数。
Thread.sleep(1000);
System.out.println(session.countPendingErrors());
session.close();
}