天天看點

Flink實時kafka資料寫入OSS異常總結

目前想把kafka json格式的埋點資料寫入OSS存儲,但是參考官網文檔出現很多異常内容,總結如下:

1.參考文檔

flink官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/oss/

Flink實時kafka資料寫入OSS異常總結

2.異常内容

2.1 Access key id should not be null or empty

根據官方文檔,flink-conf.yaml配置oss相關的内容後,發現EnvironmentVariableCredentialsProvider讀取不到對應的值内容,異常詳情如下:

Caused by: org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.auth.InvalidCredentialsException: Access key id should not be null or empty.
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:44) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.createDefaultContext(OSSOperation.java:166) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:114) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.getObjectMetadata(OSSObjectOperation.java:458) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:579) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:569) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.getObjectMetadata(AliyunOSSFileSystemStore.java:277) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:256) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:160) ~[flink-app-jar.jar:?]
	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:148) ~[flink-app-jar.jar:?]
	at org.apache.flink.core.fs.FileSystem.initOutPathDistFS(FileSystem.java:977) ~[flink-app-jar.jar:?]
	at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:286) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:99) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:221) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:291) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:256) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:238) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:108) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:323) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:310) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:96) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:41) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:141) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:80) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:450) ~[flink-app-jar.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]	           

複制

檢視源代碼發現,EnvironmentVariableCredentialsProvider使用的是OSS_ACCESS_KEY_ID,通過System.getenv的方式讀取。

Flink實時kafka資料寫入OSS異常總結

更改flink-conf-yaml的授權類為SystemPropertiesCredentialsProvider

fs.oss.credentials.provider: com.aliyun.oss.common.auth.SystemPropertiesCredentialsProvider           

複制

發現還是報Access key id should not be null or empty的異常,閱讀SystemPropertiesCredentialsProvider源代碼發現:

Flink實時kafka資料寫入OSS異常總結

通過System.getProperty的方式讀取,主要是JVM的-D參數内容,而在flink-conf.yarm是通過

//flink conf
        Configuration conf = new Configuration();
        conf.setString("","");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
        env.getConfig().setGlobalJobParameters(conf);           

複制

類似GlobalJobParameter方式處理,對應運作任務的時候參數内容顯示:

2021-06-08 22:39:58,528 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.oss.accessKeyId, Lxxxxxxxxxxxxxxxxxxx
2021-06-08 22:39:58,528 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.oss.accessKeySecret, ******
2021-06-08 22:39:58,528 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: yarn.application.name, event_topic
2021-06-08 22:39:58,529 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.oss.endpoint, https://oss-xxxx.aliyuncs.com
2021-06-08 22:39:58,529 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.oss.credentials.provider, com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider           

複制

是以嘗試第一次 -yD的參數處理方式

/opt/flink-1.12.0/bin/flink run -m  yarn-cluster -ynm event_topic -p 1 -yqu nifi -yjm 1024m -ytm 1024m -yD oss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -yD oss.accessKeySecret= ****** -c com.am.oss.SdkKafkaToOss /home/ws_cdp_dev_admin/flink-app-jar.jar           

複制

結果還是生效到GlobalConfiguration,是以更改配置,通過jvm 參數的方式處理:

env.java.opts.jobmanager: -Doss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -Doss.accessKeySecret=******
env.java.opts.taskmanager: -Doss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -Doss.accessKeySecret=******           

複制

該異常問題解決,如此看來官方文檔說的不是很準。

2.2 OVERWRITE的問題

streamSource.writeAsText("oss://ws-datacenter/user_event/dt=${dt}/demo.json", FileSystem.WriteMode.NO_OVERWRITE);           

複制

這個API有兩個問題,不懂動态的處理,隻能在指定的地方寫入對應資料,那勢必造成流資料寫入到該檔案後檔案過大的問題,另外是不支援NO_OVERWRITE。

2.3 Recoverable writers on Hadoop are only supported for HDFS異常

更改對應寫入oss的邏輯代碼,代碼内容如下:

String path = "oss://ws-datacenter/user_event/day=20210608/tid=UA-FilmoraGo-Android/sdk=sa_sdk/user_event.json";
      OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("user_event")
                .withPartSuffix(".json")
                .build();

        StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
                new Path(path),
                new SimpleStringEncoder<String>("UTF-8")
        )
                .withBucketAssigner(new DateTimeBucketAssigner<>())
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.SECONDS.toMillis(2))
                                .withInactivityInterval(TimeUnit.SECONDS.toMillis(1))
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .withOutputFileConfig(config)
                .build();
//或者BucketingSink的方式
        BucketingSink<String> sink = new BucketingSink<String>(path);
        sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd_HH-mm"));
        sink.setWriter(new StringWriter<>());
        sink.setBatchSize(1024 * 1024 * 256L);
        sink.setBatchRolloverInterval(30 * 60 * 1000L);
        sink.setInactiveBucketThreshold(3 * 60 * 1000L);
        sink.setInactiveBucketCheckInterval(30 * 1000L);
        sink.setInProgressSuffix(".in-progress");
        sink.setPendingSuffix(".pending");
        
        streamSource.addSink(sink);           

複制

結果都報Recoverable writers on Hadoop are only supported for HDFS異常

2021-06-09 14:57:44,292 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Filter -> Sink: Unnamed (1/1) (2939f69ee024a3dd3faed2c658165ac6) switched from RUNNING to FAILED on container_e131_1618429488036_60239_01_000002 @ ws-hdp06 (dataPort=41092).
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
	at org.apache.flink.fs.osshadoop.common.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:61) ~[flink-app-jar.jar:?]
	at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:210) ~[flink-app-jar.jar:?]
	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134) ~[flink-app-jar.jar:?]
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:260) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:412) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-app-jar.jar:?]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) ~[flink-app-jar.jar:?]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) ~[flink-app-jar.jar:?]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]           

複制

使用StreamingFileSink檢視源代碼發現:

this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();           

複制

而使用的oss協定方式,不是能夠Recoverable,進行復原處理的。是以隻能通過自定義sink的方式處理,隻能說有時候官網的文檔也會誘導人,或者功能使用的時候還是欠佳。