Flink实时kafka数据写入OSS异常总结

2023-09-13 19:37:17 浏览数 (1)

目前想把kafka json格式的埋点数据写入OSS存储,但是参考官网文档出现很多异常内容,总结如下:

1.参考文档

flink官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/oss/

2.异常内容

2.1 Access key id should not be null or empty

根据官方文档,flink-conf.yaml配置oss相关的内容后,发现EnvironmentVariableCredentialsProvider读取不到对应的值内容,异常详情如下:

代码语言:javascript复制

Caused by: org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.auth.InvalidCredentialsException: Access key id should not be null or empty.
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:44) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.createDefaultContext(OSSOperation.java:166) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:114) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.getObjectMetadata(OSSObjectOperation.java:458) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:579) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:569) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.getObjectMetadata(AliyunOSSFileSystemStore.java:277) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:256) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:160) ~[flink-app-jar.jar:?]
	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:148) ~[flink-app-jar.jar:?]
	at org.apache.flink.core.fs.FileSystem.initOutPathDistFS(FileSystem.java:977) ~[flink-app-jar.jar:?]
	at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:286) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:99) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:221) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:291) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:256) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:238) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:108) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:323) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:310) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:96) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:41) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:141) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:80) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:450) ~[flink-app-jar.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]	

查看源代码发现,EnvironmentVariableCredentialsProvider使用的是OSS_ACCESS_KEY_ID,通过System.getenv的方式读取。

更改flink-conf-yaml的授权类为SystemPropertiesCredentialsProvider

代码语言:javascript复制
fs.oss.credentials.provider: com.aliyun.oss.common.auth.SystemPropertiesCredentialsProvider

发现还是报Access key id should not be null or empty的异常,阅读SystemPropertiesCredentialsProvider源代码发现:

通过System.getProperty的方式读取,主要是JVM的-D参数内容,而在flink-conf.yarm是通过

代码语言:javascript复制
        //flink conf
        Configuration conf = new Configuration();
        conf.setString("","");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
        env.getConfig().setGlobalJobParameters(conf);

类似GlobalJobParameter方式处理,对应运行任务的时候参数内容显示:

代码语言:javascript复制
2021-06-08 22:39:58,528 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.oss.accessKeyId, Lxxxxxxxxxxxxxxxxxxx
2021-06-08 22:39:58,528 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.oss.accessKeySecret, ******
2021-06-08 22:39:58,528 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: yarn.application.name, event_topic
2021-06-08 22:39:58,529 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.oss.endpoint, https://oss-xxxx.aliyuncs.com
2021-06-08 22:39:58,529 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.oss.credentials.provider, com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider

所以尝试第一次 -yD的参数处理方式

代码语言:javascript复制
/opt/flink-1.12.0/bin/flink run -m  yarn-cluster -ynm event_topic -p 1 -yqu nifi -yjm 1024m -ytm 1024m -yD oss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -yD oss.accessKeySecret= ****** -c com.am.oss.SdkKafkaToOss /home/ws_cdp_dev_admin/flink-app-jar.jar

结果还是生效到GlobalConfiguration,所以更改配置,通过jvm 参数的方式处理:

代码语言:javascript复制
env.java.opts.jobmanager: -Doss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -Doss.accessKeySecret=******
env.java.opts.taskmanager: -Doss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -Doss.accessKeySecret=******

该异常问题解决,如此看来官方文档说的不是很准。

2.2 OVERWRITE的问题

代码语言:javascript复制
streamSource.writeAsText("oss://xxxx/user_event/dt=${dt}/demo.json", FileSystem.WriteMode.NO_OVERWRITE);

这个API有两个问题,不懂动态的处理,只能在指定的地方写入对应数据,那势必造成流数据写入到该文件后文件过大的问题,另外是不支持NO_OVERWRITE。

2.3 Recoverable writers on Hadoop are only supported for HDFS异常

更改对应写入oss的逻辑代码,类似代码内容如下:

代码语言:javascript复制
      String path = "oss://xxx/user_event/day=20210608/sdk=sa_sdk/*";
      OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("user_event")
                .withPartSuffix(".json")
                .build();

        StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
                new Path(path),
                new SimpleStringEncoder<String>("UTF-8")
        )
                .withBucketAssigner(new DateTimeBucketAssigner<>())
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.SECONDS.toMillis(2))
                                .withInactivityInterval(TimeUnit.SECONDS.toMillis(1))
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .withOutputFileConfig(config)
                .build();
//或者BucketingSink的方式
        BucketingSink<String> sink = new BucketingSink<String>(path);
        sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd_HH-mm"));
        sink.setWriter(new StringWriter<>());
        sink.setBatchSize(1024 * 1024 * 256L);
        sink.setBatchRolloverInterval(30 * 60 * 1000L);
        sink.setInactiveBucketThreshold(3 * 60 * 1000L);
        sink.setInactiveBucketCheckInterval(30 * 1000L);
        sink.setInProgressSuffix(".in-progress");
        sink.setPendingSuffix(".pending");
        
        streamSource.addSink(sink);

结果都报Recoverable writers on Hadoop are only supported for HDFS异常

代码语言:javascript复制
2021-06-09 14:57:44,292 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Filter -> Sink: Unnamed (1/1) (2939f69ee024a3dd3faed2c658165ac6) switched from RUNNING to FAILED on container_e131_1618429488036_60239_01_000002 @ ws-hdp06 (dataPort=41092).
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
	at org.apache.flink.fs.osshadoop.common.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:61) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:210) ~[flink-app-jar.jar:?]
	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134) ~[flink-app-jar.jar:?]
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:260) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:412) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) ~[flink-app-jar.jar:?]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]

使用StreamingFileSink查看源代码发现:

代码语言:javascript复制
this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();

而使用的oss协议方式,不是能够Recoverable,进行回滚处理的。所以只能通过自定义sink的方式处理,只能说有时候官网的文档也会诱导人,或者功能使用的时候还是欠佳。

0 人点赞