Flink实时写入StarRocks NullPointerException问题解决

2023-05-22 23:38:49 浏览数 (1)

问题

最近出现很多任务经常跑着跑着就failed了,也不怎么重启,翻了下异常信息如下,大概意思就是进行stream load的时候失败了,然后回滚了下,然后就空指针了

代码语言:javascript复制
2023-00-00 16:02:28,037 ERROR com.starrocks.data.load.stream.DefaultStreamLoadManager      [] - catch exception, wait rollback 
java.lang.NullPointerException: null
	at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:257) ~[flink-connector-starrocks-1.2.5_flink-1.13_2.11.jar:?]
	at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:113) ~[flink-connector-starrocks-1.2.5_flink-1.13_2.11.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_302]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_302]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_302]
	at java.lang.Thread.run(Thread.java:877) [?:1.8.0_302]
2023-00-00 16:02:28,038 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Sink: Unnamed with job vertex id 3e7208d04a005e77687e0b389f9e3814 (1/1)#10 (346a2afcaa8b762453dd8152cd1915b3) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: java.lang.NullPointerException
	at com.starrocks.data.load.stream.DefaultStreamLoadManager.AssertNotException(DefaultStreamLoadManager.java:337)
	at com.starrocks.data.load.stream.DefaultStreamLoadManager.write(DefaultStreamLoadManager.java:196)
	at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.invoke(StarRocksDynamicSinkFunctionV2.java:155)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:135)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:799)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:586)
	at java.lang.Thread.run(Thread.java:877)
	Suppressed: java.lang.RuntimeException: java.lang.NullPointerException
		at com.starrocks.data.load.stream.DefaultStreamLoadManager.AssertNotException(DefaultStreamLoadManager.java:337)
		at com.starrocks.data.load.stream.DefaultStreamLoadManager.flush(DefaultStreamLoadManager.java:267)
		at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.close(StarRocksDynamicSinkFunctionV2.java:179)
		at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
		at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
		at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:865)
		at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:844)
		at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:757)
		at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:663)
		... 4 more
	Caused by: java.lang.NullPointerException
		at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:257)
		at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:113)
		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
		... 1 more
Caused by: [CIRCULAR REFERENCE: java.lang.NullPointerException]

原因分析

1.根据经验,NPE的问题一般是数据异常导致的,但是这里没打出来数据,所以无法判断是不是数据问题

2.注意关键字rollback,意思是写入失败了在回滚,问题是StarRocks版本是2.3.x,StarRocks的事务是2.4才支持的,有啥好回滚的,根本不支持,再想一下最近升级了connector版本到1.2.5,另外sink的执行的是StarRocksDynamicSinkFunctionV2。

翻了下文档,这个V2是1.2.4版本增加的事务接口,不支持事务的sr版本默认用的非事务接口,也就是v1,有点不对,我们的sr版本明明不支持事务,为啥还是用了v2版本呢。

源码分析

1.我们找到StarRocks connector的1.2.5版本源码的SinkFunctionFactory.java类,可以看到核心方法是StarRocksDynamicSinkFunctionBase

2.这个方法的第一步是getSinkVersion,从option中判断是v1、v2、还是auto,我们从低版本升级来的,低版本connector完全没有v1、v2的概念,这里肯定是走的auto了

代码语言:java复制
/** Create sink function according to the configuration. */
public class SinkFunctionFactory {

    private static final Logger LOG = LoggerFactory.getLogger(SinkFunctionFactory.class);

    enum SinkVersion {
        // Implement exactly-once using stream load which has a
        // poor performance. All versions of StarRocks are supported
        V1,
        // Implement exactly-once using transaction load since StarRocks 2.4
        V2,
        // Select sink version automatically according to whether StarRocks
        // supports transaction load
        AUTO
    }

    public static boolean isStarRocksSupportTransactionLoad(StarRocksSinkOptions sinkOptions) {
        String host = ConnectionUtils.selectAvailableHttpHost(
                sinkOptions.getLoadUrlList(), sinkOptions.getConnectTimeout());
        if (host == null) {
            throw new RuntimeException("Can't find an available host in "   sinkOptions.getLoadUrlList());
        }

        String beginUrlStr = "http://"   StreamLoadConstants.getBeginUrl(host);
        HttpPost httpPost = new HttpPost(beginUrlStr);
        httpPost.addHeader(HttpHeaders.AUTHORIZATION,
                StreamLoadUtils.getBasicAuthHeader(sinkOptions.getUsername(), sinkOptions.getPassword()));
        httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build());
        LOG.info("Transaction load probe post {}", httpPost);

        HttpClientBuilder clientBuilder = HttpClients.custom()
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                });

        try (CloseableHttpClient client = clientBuilder.build()) {
            CloseableHttpResponse response = client.execute(httpPost);
            String responseBody = EntityUtils.toString(response.getEntity());
            LOG.info("Transaction load probe response {}", responseBody);

            JSONObject bodyJson = JSON.parseObject(responseBody);
            String status = bodyJson.getString("status");
            String msg = bodyJson.getString("msg");

            // If StarRocks does not support transaction load, FE's NotFoundAction#executePost
            // will be called where you can know how the response json is constructed
            if ("FAILED".equals(status) && "Not implemented".equals(msg)) {
                return false;
            }
            return true;
        } catch (IOException e) {
            String errMsg = "Failed to probe transaction load for "   host;
            LOG.warn("{}", errMsg, e);
            throw new RuntimeException(errMsg, e);
        }
    }

    public static SinkVersion chooseSinkVersionAutomatically(StarRocksSinkOptions sinkOptions) {
        try {
            if (StarRocksSinkSemantic.AT_LEAST_ONCE.equals(sinkOptions.getSemantic())) {
                LOG.info("Choose sink version V2 for at-least-once.");
                return SinkVersion.V2;
            }

            boolean supportTransactionLoad = isStarRocksSupportTransactionLoad(sinkOptions);
            if (supportTransactionLoad) {
                LOG.info("StarRocks supports transaction load, and choose sink version V2");
                return SinkVersion.V2;
            } else {
                LOG.info("StarRocks does not support transaction load, and choose sink version V1");
                return SinkVersion.V1;
            }
        } catch (Exception e) {
            LOG.warn("Can't decide whether StarRocks supports transaction load, and sink version V2 "  
                    "will be used by default. If your StarRocks does not support transaction load, please "  
                    "configure '{}' to 'V1' manually", StarRocksSinkOptions.SINK_VERSION.key());
            return SinkVersion.V2;
        }
    }

    public static SinkVersion getSinkVersion(StarRocksSinkOptions sinkOptions) {
        String sinkTypeOption = sinkOptions.getSinkVersion().trim().toUpperCase();
        SinkVersion sinkVersion;
        if (SinkVersion.V1.name().equals(sinkTypeOption)) {
            sinkVersion = SinkVersion.V1;
        } else if (SinkVersion.V2.name().equals(sinkTypeOption)) {
            sinkVersion = SinkVersion.V2;
        } else if (SinkVersion.AUTO.name().equals(sinkTypeOption)) {
            sinkVersion = chooseSinkVersionAutomatically(sinkOptions);
        } else {
            throw new UnsupportedOperationException("Unsupported sink type "   sinkTypeOption);
        }
        LOG.info("Choose sink version {}", sinkVersion.name());
        return sinkVersion;
    }

    public static <T> StarRocksDynamicSinkFunctionBase<T> createSinkFunction(
            StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer<T> rowTransformer) {
        SinkVersion sinkVersion = getSinkVersion(sinkOptions);
        switch (sinkVersion) {
            case V1:
                return new StarRocksDynamicSinkFunction<>(sinkOptions, schema, rowTransformer);
            case V2:
                return new StarRocksDynamicSinkFunctionV2<>(sinkOptions, schema, rowTransformer);
            default:
                throw new UnsupportedOperationException("Unsupported sink type "   sinkVersion.name());
        }
    }

    public static <T> StarRocksDynamicSinkFunctionBase<T> createSinkFunction(StarRocksSinkOptions sinkOptions) {
        SinkVersion sinkVersion = getSinkVersion(sinkOptions);
        switch (sinkVersion) {
            case V1:
                return new StarRocksDynamicSinkFunction<>(sinkOptions);
            case V2:
                return new StarRocksDynamicSinkFunctionV2<>(sinkOptions);
            default:
                throw new UnsupportedOperationException("Unsupported sink type "   sinkVersion.name());
        }
    }
}

3.auto执行了一个chooseSinkVersionAutomatically方法,然后调用了isStarRocksSupportTransactionLoad判断是否支持事务,这个方法向fe发送一个http请求,根据返回值来判断是否支持事务

4.我们的StarRocks是在云上的,和本地网络不通,debug有点困难,我照着isStarRocksSupportTransactionLoad中的逻辑构造了一个http请求发送给fe,返回值长这样

代码语言:javascript复制
{"msg":"Not implemented","status":,"FAILED"}

再和代码中对比一下,正常返回的情况下,用msg和status就能判断出当前的服务端不支持事务了,connector判断逻辑是没问题的

代码语言:javascript复制
    try (CloseableHttpClient client = clientBuilder.build()) {
            CloseableHttpResponse response = client.execute(httpPost);
            String responseBody = EntityUtils.toString(response.getEntity());
            LOG.info("Transaction load probe response {}", responseBody);

            JSONObject bodyJson = JSON.parseObject(responseBody);
            String status = bodyJson.getString("status");
            String msg = bodyJson.getString("msg");

            // If StarRocks does not support transaction load, FE's NotFoundAction#executePost
            // will be called where you can know how the response json is constructed
            if ("FAILED".equals(status) && "Not implemented".equals(msg)) {
                return false;
            }
            return true;
        } catch (IOException e) {
            String errMsg = "Failed to probe transaction load for "   host;
            LOG.warn("{}", errMsg, e);
            throw new RuntimeException(errMsg, e);
        }

5.既然判断逻辑没问题,为啥我们还是走了v2版本呢,再看看发送http请求外层还有个try-catch,如果访问fe失败,就无法判断了,无法判断服务端是否支持事务时默认是使用v2版本来sink,根本原因找到了,就是走了这个默认值所以回滚失败了

解决

原因确定了,解决方案其实很简单,在sink option中加上默认用v1版本就好了。改完代码,重新上线,问题解决。

思考

如果我是StarRocks connector的开发者,有没有更好的办法避免这个问题。想了一下是有的,现在的版本实际上是每次sink触发都会去判断到底用v2还是v1,这个其实很没有必要,设置一个全局变量,在任务启动的时候判断一次是否支持事务决定v1还是v2,以后的sink直接用上面的结论,如果启动时http请求就失败了,直接启动失败,比运行一段时间之后再失败会好一些。

0 人点赞