问题
最近出现很多任务经常跑着跑着就failed了,也不怎么重启,翻了下异常信息如下,大概意思就是进行stream load的时候失败了,然后回滚了下,然后就空指针了
代码语言:javascript复制2023-00-00 16:02:28,037 ERROR com.starrocks.data.load.stream.DefaultStreamLoadManager [] - catch exception, wait rollback
java.lang.NullPointerException: null
at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:257) ~[flink-connector-starrocks-1.2.5_flink-1.13_2.11.jar:?]
at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:113) ~[flink-connector-starrocks-1.2.5_flink-1.13_2.11.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_302]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_302]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_302]
at java.lang.Thread.run(Thread.java:877) [?:1.8.0_302]
2023-00-00 16:02:28,038 WARN org.apache.flink.runtime.taskmanager.Task [] - Sink: Unnamed with job vertex id 3e7208d04a005e77687e0b389f9e3814 (1/1)#10 (346a2afcaa8b762453dd8152cd1915b3) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: java.lang.NullPointerException
at com.starrocks.data.load.stream.DefaultStreamLoadManager.AssertNotException(DefaultStreamLoadManager.java:337)
at com.starrocks.data.load.stream.DefaultStreamLoadManager.write(DefaultStreamLoadManager.java:196)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.invoke(StarRocksDynamicSinkFunctionV2.java:155)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:135)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:799)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:586)
at java.lang.Thread.run(Thread.java:877)
Suppressed: java.lang.RuntimeException: java.lang.NullPointerException
at com.starrocks.data.load.stream.DefaultStreamLoadManager.AssertNotException(DefaultStreamLoadManager.java:337)
at com.starrocks.data.load.stream.DefaultStreamLoadManager.flush(DefaultStreamLoadManager.java:267)
at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.close(StarRocksDynamicSinkFunctionV2.java:179)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:865)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:844)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:757)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:663)
... 4 more
Caused by: java.lang.NullPointerException
at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:257)
at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:113)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: [CIRCULAR REFERENCE: java.lang.NullPointerException]
原因分析
1.根据经验,NPE的问题一般是数据异常导致的,但是这里没打出来数据,所以无法判断是不是数据问题
2.注意关键字rollback,意思是写入失败了在回滚,问题是StarRocks版本是2.3.x,StarRocks的事务是2.4才支持的,有啥好回滚的,根本不支持,再想一下最近升级了connector版本到1.2.5,另外sink的执行的是StarRocksDynamicSinkFunctionV2。
翻了下文档,这个V2是1.2.4版本增加的事务接口,不支持事务的sr版本默认用的非事务接口,也就是v1,有点不对,我们的sr版本明明不支持事务,为啥还是用了v2版本呢。
源码分析
1.我们找到StarRocks connector的1.2.5版本源码的SinkFunctionFactory.java类,可以看到核心方法是StarRocksDynamicSinkFunctionBase
2.这个方法的第一步是getSinkVersion,从option中判断是v1、v2、还是auto,我们从低版本升级来的,低版本connector完全没有v1、v2的概念,这里肯定是走的auto了
代码语言:java复制/** Create sink function according to the configuration. */
public class SinkFunctionFactory {
private static final Logger LOG = LoggerFactory.getLogger(SinkFunctionFactory.class);
enum SinkVersion {
// Implement exactly-once using stream load which has a
// poor performance. All versions of StarRocks are supported
V1,
// Implement exactly-once using transaction load since StarRocks 2.4
V2,
// Select sink version automatically according to whether StarRocks
// supports transaction load
AUTO
}
public static boolean isStarRocksSupportTransactionLoad(StarRocksSinkOptions sinkOptions) {
String host = ConnectionUtils.selectAvailableHttpHost(
sinkOptions.getLoadUrlList(), sinkOptions.getConnectTimeout());
if (host == null) {
throw new RuntimeException("Can't find an available host in " sinkOptions.getLoadUrlList());
}
String beginUrlStr = "http://" StreamLoadConstants.getBeginUrl(host);
HttpPost httpPost = new HttpPost(beginUrlStr);
httpPost.addHeader(HttpHeaders.AUTHORIZATION,
StreamLoadUtils.getBasicAuthHeader(sinkOptions.getUsername(), sinkOptions.getPassword()));
httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build());
LOG.info("Transaction load probe post {}", httpPost);
HttpClientBuilder clientBuilder = HttpClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
try (CloseableHttpClient client = clientBuilder.build()) {
CloseableHttpResponse response = client.execute(httpPost);
String responseBody = EntityUtils.toString(response.getEntity());
LOG.info("Transaction load probe response {}", responseBody);
JSONObject bodyJson = JSON.parseObject(responseBody);
String status = bodyJson.getString("status");
String msg = bodyJson.getString("msg");
// If StarRocks does not support transaction load, FE's NotFoundAction#executePost
// will be called where you can know how the response json is constructed
if ("FAILED".equals(status) && "Not implemented".equals(msg)) {
return false;
}
return true;
} catch (IOException e) {
String errMsg = "Failed to probe transaction load for " host;
LOG.warn("{}", errMsg, e);
throw new RuntimeException(errMsg, e);
}
}
public static SinkVersion chooseSinkVersionAutomatically(StarRocksSinkOptions sinkOptions) {
try {
if (StarRocksSinkSemantic.AT_LEAST_ONCE.equals(sinkOptions.getSemantic())) {
LOG.info("Choose sink version V2 for at-least-once.");
return SinkVersion.V2;
}
boolean supportTransactionLoad = isStarRocksSupportTransactionLoad(sinkOptions);
if (supportTransactionLoad) {
LOG.info("StarRocks supports transaction load, and choose sink version V2");
return SinkVersion.V2;
} else {
LOG.info("StarRocks does not support transaction load, and choose sink version V1");
return SinkVersion.V1;
}
} catch (Exception e) {
LOG.warn("Can't decide whether StarRocks supports transaction load, and sink version V2 "
"will be used by default. If your StarRocks does not support transaction load, please "
"configure '{}' to 'V1' manually", StarRocksSinkOptions.SINK_VERSION.key());
return SinkVersion.V2;
}
}
public static SinkVersion getSinkVersion(StarRocksSinkOptions sinkOptions) {
String sinkTypeOption = sinkOptions.getSinkVersion().trim().toUpperCase();
SinkVersion sinkVersion;
if (SinkVersion.V1.name().equals(sinkTypeOption)) {
sinkVersion = SinkVersion.V1;
} else if (SinkVersion.V2.name().equals(sinkTypeOption)) {
sinkVersion = SinkVersion.V2;
} else if (SinkVersion.AUTO.name().equals(sinkTypeOption)) {
sinkVersion = chooseSinkVersionAutomatically(sinkOptions);
} else {
throw new UnsupportedOperationException("Unsupported sink type " sinkTypeOption);
}
LOG.info("Choose sink version {}", sinkVersion.name());
return sinkVersion;
}
public static <T> StarRocksDynamicSinkFunctionBase<T> createSinkFunction(
StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer<T> rowTransformer) {
SinkVersion sinkVersion = getSinkVersion(sinkOptions);
switch (sinkVersion) {
case V1:
return new StarRocksDynamicSinkFunction<>(sinkOptions, schema, rowTransformer);
case V2:
return new StarRocksDynamicSinkFunctionV2<>(sinkOptions, schema, rowTransformer);
default:
throw new UnsupportedOperationException("Unsupported sink type " sinkVersion.name());
}
}
public static <T> StarRocksDynamicSinkFunctionBase<T> createSinkFunction(StarRocksSinkOptions sinkOptions) {
SinkVersion sinkVersion = getSinkVersion(sinkOptions);
switch (sinkVersion) {
case V1:
return new StarRocksDynamicSinkFunction<>(sinkOptions);
case V2:
return new StarRocksDynamicSinkFunctionV2<>(sinkOptions);
default:
throw new UnsupportedOperationException("Unsupported sink type " sinkVersion.name());
}
}
}
3.auto执行了一个chooseSinkVersionAutomatically方法,然后调用了isStarRocksSupportTransactionLoad判断是否支持事务,这个方法向fe发送一个http请求,根据返回值来判断是否支持事务
4.我们的StarRocks是在云上的,和本地网络不通,debug有点困难,我照着isStarRocksSupportTransactionLoad中的逻辑构造了一个http请求发送给fe,返回值长这样
代码语言:javascript复制{"msg":"Not implemented","status":,"FAILED"}
再和代码中对比一下,正常返回的情况下,用msg和status就能判断出当前的服务端不支持事务了,connector判断逻辑是没问题的
代码语言:javascript复制 try (CloseableHttpClient client = clientBuilder.build()) {
CloseableHttpResponse response = client.execute(httpPost);
String responseBody = EntityUtils.toString(response.getEntity());
LOG.info("Transaction load probe response {}", responseBody);
JSONObject bodyJson = JSON.parseObject(responseBody);
String status = bodyJson.getString("status");
String msg = bodyJson.getString("msg");
// If StarRocks does not support transaction load, FE's NotFoundAction#executePost
// will be called where you can know how the response json is constructed
if ("FAILED".equals(status) && "Not implemented".equals(msg)) {
return false;
}
return true;
} catch (IOException e) {
String errMsg = "Failed to probe transaction load for " host;
LOG.warn("{}", errMsg, e);
throw new RuntimeException(errMsg, e);
}
5.既然判断逻辑没问题,为啥我们还是走了v2版本呢,再看看发送http请求外层还有个try-catch,如果访问fe失败,就无法判断了,无法判断服务端是否支持事务时默认是使用v2版本来sink,根本原因找到了,就是走了这个默认值所以回滚失败了
解决
原因确定了,解决方案其实很简单,在sink option中加上默认用v1版本就好了。改完代码,重新上线,问题解决。
思考
如果我是StarRocks connector的开发者,有没有更好的办法避免这个问题。想了一下是有的,现在的版本实际上是每次sink触发都会去判断到底用v2还是v1,这个其实很没有必要,设置一个全局变量,在任务启动的时候判断一次是否支持事务决定v1还是v2,以后的sink直接用上面的结论,如果启动时http请求就失败了,直接启动失败,比运行一段时间之后再失败会好一些。