记一次 org.apache.kudu.client.NonRecoverableException 的排查

2020-06-19 17:40:11 浏览数 (2)

在实时数仓的时候,遇到了 org.apache.kudu.client.NonRecoverableException,搞了好长时间,特此记录一下。

首先肯定是查看日志,发现如下错误:

代码语言:javascript复制
2020-06-17 16:49:27 ERROR KeyedProcess -> Sink: Unnamed (2/10) tasks.StreamTask:671 - Error during disposal of stream operator.
        org.apache.kudu.client.NonRecoverableException:
                at org.apache.kudu.client.KuduException.transformException(KuduException.java:129)
                at org.apache.kudu.client.KuduClient.close(KuduClient.java:317)
                at bigdata.ishansong.com.util.KeyedProcessFunctionImp.close(KeyedProcessFunctionImp.java:123)
                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
                at java.lang.Thread.run(Thread.java:748)
        Caused by: java.lang.InterruptedException
                at java.lang.Object.wait(Native Method)
                at java.lang.Object.wait(Object.java:502)
                at com.stumbleupon.async.Deferred.doJoin(Deferred.java:1136)
                at com.stumbleupon.async.Deferred.join(Deferred.java:1019)
                at org.apache.kudu.client.AsyncKuduClient.close(AsyncKuduClient.java:2441)
                at org.apache.kudu.client.KuduClient.close(KuduClient.java:315)
                ... 9 more

自己捣鼓了一阵,也没有发现什么原因,只知道是

代码语言:javascript复制
public OperationResponse apply(Operation operation) throws KuduException {
    while (true) {
      try {
        Deferred<OperationResponse> d = session.apply(operation);
        if (getFlushMode() == FlushMode.AUTO_FLUSH_SYNC) {
          return d.join();
        }
        break;
      } catch (PleaseThrottleException ex) {
        try {
          ex.getDeferred().join();
        } catch (Exception e) {
          // This is the error response from the buffer that was flushing,
          // we can't do much with it at this point.
          LOG.error("Previous batch had this exception", e);
        }
      } catch (Exception e) {
        throw KuduException.transformException(e);
      }
    }
    return null;
  }

d.join();打断了 sleep。不明觉厉。便问了一下同事。 同事也遇到过相似的,不过是因为 kudu 多个线程同时操作 kudu 一行记录导致的。 这不是我要找的答案,因为我是按照 主键 表名 进行keyBy 的。同样不明觉厉。

继续看日志找错误

代码语言:javascript复制
2020-06-17 16:47:04 ERROR KeyedProcess -> Sink: Unnamed (2/10) kudu.KuduOperation:661 - dbName:kudu_ods tableName:ds_mysql_order_orders err:{}
java.sql.SQLException: [Cloudera][ImpalaJDBCDriver](500540) Error caught in BackgroundFetcher. Foreground thread ID: 81. Background thread ID: 1547. Error caught: null.
        at com.cloudera.impala.hivecommon.dataengine.BackgroundFetcher.getNextBuffer(Unknown Source)
        at com.cloudera.impala.hivecommon.dataengine.HiveJDBCResultSet.moveToNextRow(Unknown Source)
        at com.cloudera.impala.jdbc.common.SForwardResultSet.next(Unknown Source)

Impala Driver 的错误,感觉有点驴唇不对马嘴。

继续找

代码语言:javascript复制
2020-06-17 16:48:10  WARN Thread-89 kryo.KryoSerializer:440 - Falling back to default Kryo serializer because Chill serializer couldn't be foun
d.
java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:436)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:454)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:337)
        at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:151)
        at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksD
bTtlCompactFiltersManager.java:202)
        at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompact
FiltersManager.java:189)
Caused by: java.lang.ClassNotFoundException: scala/Function0
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at com.twitter.chill.KryoBase$$anonfun$1.apply(KryoBase.scala:41)
        at com.twitter.chill.KryoBase$$anonfun$1.apply(KryoBase.scala:41)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at com.twitter.chill.KryoBase.<init>(KryoBase.scala:41)
        at org.apache.flink.runtime.types.EmptyFlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:45)
        at org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:82)
        ... 9 more
2020-06-17 16:48:10  INFO KeyedProcess -> Sink: Unnamed (4/10) util.KeyedProcessFunctionImp:169 - get config table infos begin:1592383690742
:
        at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksD
bTtlCompactFiltersManager.java:202)
        at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:189)
Caused by: java.lang.ClassNotFoundException: scala/Function0
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at com.twitter.chill.KryoBase$$anonfun$1.apply(KryoBase.scala:41)
        at com.twitter.chill.KryoBase$$anonfun$1.apply(KryoBase.scala:41)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at com.twitter.chill.KryoBase.<init>(KryoBase.scala:41)
        at org.apache.flink.runtime.types.EmptyFlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:45)
        at org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:82)
        ... 9 moreq

可能就是它了。结果也确实是它。是因为 ListState 中存储的是 Object 导致的

0 人点赞