目前想把kafka json格式的埋点数据写入OSS存储,但是参考官网文档出现很多异常内容,总结如下:
1.参考文档
flink官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/oss/
2.异常内容
2.1 Access key id should not be null or empty
根据官方文档,flink-conf.yaml配置oss相关的内容后,发现EnvironmentVariableCredentialsProvider读取不到对应的值内容,异常详情如下:
代码语言:javascript复制
Caused by: org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.auth.InvalidCredentialsException: Access key id should not be null or empty.
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:44) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.createDefaultContext(OSSOperation.java:166) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:114) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.getObjectMetadata(OSSObjectOperation.java:458) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:579) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:569) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.getObjectMetadata(AliyunOSSFileSystemStore.java:277) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:256) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:160) ~[flink-app-jar.jar:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:148) ~[flink-app-jar.jar:?]
at org.apache.flink.core.fs.FileSystem.initOutPathDistFS(FileSystem.java:977) ~[flink-app-jar.jar:?]
at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:286) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:99) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:221) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:291) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:256) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:238) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:108) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:323) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:310) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:96) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:41) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:141) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:80) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:450) ~[flink-app-jar.jar:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
查看源代码发现,EnvironmentVariableCredentialsProvider使用的是OSS_ACCESS_KEY_ID,通过System.getenv的方式读取。
更改flink-conf-yaml的授权类为SystemPropertiesCredentialsProvider
代码语言:javascript复制fs.oss.credentials.provider: com.aliyun.oss.common.auth.SystemPropertiesCredentialsProvider
发现还是报Access key id should not be null or empty的异常,阅读SystemPropertiesCredentialsProvider源代码发现:
通过System.getProperty的方式读取,主要是JVM的-D参数内容,而在flink-conf.yarm是通过
代码语言:javascript复制 //flink conf
Configuration conf = new Configuration();
conf.setString("","");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.getConfig().setGlobalJobParameters(conf);
类似GlobalJobParameter方式处理,对应运行任务的时候参数内容显示:
代码语言:javascript复制2021-06-08 22:39:58,528 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: fs.oss.accessKeyId, Lxxxxxxxxxxxxxxxxxxx
2021-06-08 22:39:58,528 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: fs.oss.accessKeySecret, ******
2021-06-08 22:39:58,528 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: yarn.application.name, event_topic
2021-06-08 22:39:58,529 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: fs.oss.endpoint, https://oss-xxxx.aliyuncs.com
2021-06-08 22:39:58,529 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: fs.oss.credentials.provider, com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
所以尝试第一次 -yD的参数处理方式
代码语言:javascript复制/opt/flink-1.12.0/bin/flink run -m yarn-cluster -ynm event_topic -p 1 -yqu nifi -yjm 1024m -ytm 1024m -yD oss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -yD oss.accessKeySecret= ****** -c com.am.oss.SdkKafkaToOss /home/ws_cdp_dev_admin/flink-app-jar.jar
结果还是生效到GlobalConfiguration,所以更改配置,通过jvm 参数的方式处理:
代码语言:javascript复制env.java.opts.jobmanager: -Doss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -Doss.accessKeySecret=******
env.java.opts.taskmanager: -Doss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -Doss.accessKeySecret=******
该异常问题解决,如此看来官方文档说的不是很准。
2.2 OVERWRITE的问题
代码语言:javascript复制streamSource.writeAsText("oss://xxxx/user_event/dt=${dt}/demo.json", FileSystem.WriteMode.NO_OVERWRITE);
这个API有两个问题,不懂动态的处理,只能在指定的地方写入对应数据,那势必造成流数据写入到该文件后文件过大的问题,另外是不支持NO_OVERWRITE。
2.3 Recoverable writers on Hadoop are only supported for HDFS异常
更改对应写入oss的逻辑代码,类似代码内容如下:
代码语言:javascript复制 String path = "oss://xxx/user_event/day=20210608/sdk=sa_sdk/*";
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("user_event")
.withPartSuffix(".json")
.build();
StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
new Path(path),
new SimpleStringEncoder<String>("UTF-8")
)
.withBucketAssigner(new DateTimeBucketAssigner<>())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.SECONDS.toMillis(2))
.withInactivityInterval(TimeUnit.SECONDS.toMillis(1))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.withOutputFileConfig(config)
.build();
//或者BucketingSink的方式
BucketingSink<String> sink = new BucketingSink<String>(path);
sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd_HH-mm"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 256L);
sink.setBatchRolloverInterval(30 * 60 * 1000L);
sink.setInactiveBucketThreshold(3 * 60 * 1000L);
sink.setInactiveBucketCheckInterval(30 * 1000L);
sink.setInProgressSuffix(".in-progress");
sink.setPendingSuffix(".pending");
streamSource.addSink(sink);
结果都报Recoverable writers on Hadoop are only supported for HDFS异常
代码语言:javascript复制2021-06-09 14:57:44,292 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Filter -> Sink: Unnamed (1/1) (2939f69ee024a3dd3faed2c658165ac6) switched from RUNNING to FAILED on container_e131_1618429488036_60239_01_000002 @ ws-hdp06 (dataPort=41092).
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
at org.apache.flink.fs.osshadoop.common.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:61) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:210) ~[flink-app-jar.jar:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134) ~[flink-app-jar.jar:?]
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:260) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:412) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) ~[flink-app-jar.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
使用StreamingFileSink查看源代码发现:
代码语言:javascript复制this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
而使用的oss协议方式,不是能够Recoverable,进行回滚处理的。所以只能通过自定义sink的方式处理,只能说有时候官网的文档也会诱导人,或者功能使用的时候还是欠佳。