【Flink】第十二篇:记kudu-connector写CDC数据的-D数据时,报主键不存在的异常

2022-03-31 11:07:05 浏览数 (1)

现象

source:

oracle的cdc数据,这个表的特点是有大量的删除操作

sink:

写到kudu表,用的connector是根据apache bahir的kudu-connector修改的jar

transform:

做一个基本的字段筛选和标准化以及添加一些用于辅助问题排查的kafka metadata

报错信息:

代码语言:javascript复制
java.io.IOException: Error while sending value. 
Row error for primary key="202011110000493806x00x0010", tablet=null, server=01bc92eab5d342488d5b7793b04d82ee, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x003", tablet=null, server=5f50378b13ed437aa2f9cecdbd3d5d57, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x0011", tablet=null, server=980cd7ad47a848689bb025310558ea07, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x008", tablet=null, server=ec8da9d650834ba48f22db70ea8bba66, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x004", tablet=null, server=5f50378b13ed437aa2f9cecdbd3d5d57, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x0012", tablet=null, server=ec8da9d650834ba48f22db70ea8bba66, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x007", tablet=null, server=5f50378b13ed437aa2f9cecdbd3d5d57, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x006", tablet=null, server=01bc92eab5d342488d5b7793b04d82ee, status=Not found: key not found (error 0)
Row error for primary key="202106030000437663x00x0010", tablet=null, server=01bc92eab5d342488d5b7793b04d82ee, status=Not found: key not found (error 0)

at org.colloh.flink.kudu.connector.internal.failure.DefaultKuduFailureHandler.onFailure(DefaultKuduFailureHandler.java:37) ~[rt-calc-sql-1.12.1-20210715.jar:na]
at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.checkAsyncErrors(KuduWriter.java:156) ~[rt-calc-sql-1.12.1-20210715.jar:na]
at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.write(KuduWriter.java:97) ~[rt-calc-sql-1.12.1-20210715.jar:na]
at org.colloh.flink.kudu.connector.table.sink.KuduSink.invoke(KuduSink.java:93) ~[rt-calc-sql-1.12.1-20210715.jar:na]
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:72) ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at StreamExecCalc$102.processElement(Unknown Source) ~[na:na]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) ~[flink-dist_2.11-1.12.1.jar:1.12.1]

分析

之前的作业没有问题,这张表的特点是-D操作很多,异常信息是报主键不存在,那么可能是删除了不存在的数据。

但是,用kudu client直接测试直接delete不存在的数据发现没有这个现象,代码如下

代码语言:javascript复制
public static void main(String[] args) {
// master地址
String masterAddr = "xxxxxx";
KuduClient client = new KuduClient.KuduClientBuilder(masterAddr).build();
try {
    KuduTable table = client.openTable("rt_dwd.cdc2kudu");
    deleteRow(client, table);
} catch (KuduException | InterruptedException e) {
    e.printStackTrace();
} finally {
try {
        client.close();
    } catch (KuduException e) {
        e.printStackTrace();
    }
}
public static void deleteRow(KuduClient client, KuduTable table) throws KuduException {
    KuduSession session = client.newSession();
    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);    
    Delete delete = table.newDelete();
delete.getRow().addLong("appl_seq", 1);
delete.getRow().addInt("amount", 1);
    session.apply(delete);
    session.flush();
    session.close();
}

经过查阅资料及各种测试验证和源码分析,发现这个异常是Kudu自身的一种校验机制。并且这种机制在AUTO_FLUSH_BACKGROUND下是比较严格的,会抛出异常给最外层的用户代码,让用户选择是否忽略这种删除数据库中不存在的数据的异常。

代码语言:javascript复制
/**
  * 最终flush操作,主要解决删除的时候,删除了不存在的数据
  */
private void terminalFlush() throws KuduException {
final java.util.List<OperationResponse> responses = session.flush();
for (OperationResponse response : responses) {
if (response.hasRowError()) {
throw new SinkException("encounter key not found error.More.detail "  
"=> table : "  response.getRowError().getOperation().getTable().getName()  ""  
"=> row : " response.getRowError().getOperation().getRow().stringifyRowKey());
         }
     }
     LOG.debug("Sub task {} flushed sink records", id);
 }

解决

所以,我选择了修改kudu-connector的源码来降低这种异常的影响。

解决思路有两种:

1. 选择MANUAL_FLUSH,这种思路需要另外增加一些主动flush的逻辑。

2. 选择AUTO_FLUSH_BACKGROUND,这种思路需要在异常捕获中过滤掉这种异常,其他异常依旧按原逻辑抛出。

我选2:

代码语言:javascript复制
KuduWriter:修改write的逻辑

public void write(T input) throws IOException {
try {
this.checkAsyncErrors();
    } catch (Exception var7) {
        String errorString = var7.getMessage();
        // 主要过滤kudu删除的时候,删除到了不存在的数据
if (!errorString.contains("key not found (error 0)")) {
throw var7;
        }
this.log.warn("encounter key not found "   errorString);
    }

    Iterator var2 = this.operationMapper.createOperations(input, this.table).iterator();

while(var2.hasNext()) {
        Operation operation = (Operation)var2.next();

try {
this.checkErrors(this.session.apply(operation));
        } catch (Exception var6) {
            String errorString = var6.getMessage();
if (!errorString.contains("key not found (error 0)")) {
throw var6;
            }
this.log.warn("encounter key not found "   errorString);
        }
    }

}

复盘


kudu client的写机制在这个类中基本可以了解

结合flink,将写入kudu的逻辑简单总结如下图

这里应该特别注意的是,flink subtask和buffer应该是1:1的关系,否则数据顺序性无法保证。

经过调整后,可以复现原本的问题,代码如下

代码语言:javascript复制
/**
 * 验证在FlushMode.AUTO_FLUSH_BACKGROUND模式下检查删除Row的主键不存在抛异常的问题。
 *
 * exceptionbuffer有数据的条件:
 * AUTO_FLUSH_BACKGROUND
 *
 * @param client
 * @param table
 * @throws KuduException
 */
public static void testCDC_ID(KuduClient client, KuduTable table) throws KuduException, InterruptedException {
    KuduSession session = client.newSession();
    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
    session.setMutationBufferSpace(1);
    for (int i = 0; i < 1; i  ) {
        Delete delete = table.newDelete();
        delete.getRow().addLong("appl_seq", i % 1);
        delete.getRow().addInt("amount", i % 1);
        session.apply(delete);
    }
    // 这里要延迟等一下,因为AUTO_FLUSH_BACKGROUND模式下,提交数据的kuduclient是异步的,数据达到kudu数据库后才会知道主键存不存在。
    // 返回错误信息放在errorbuffer里后,这个时候当前线程再去拿countPendingErrors才会取到错误条数。
    Thread.sleep(1000);
    System.out.println(session.countPendingErrors());
    session.close();
}

0 人点赞