目前想把kafka json格式的埋點資料寫入OSS存儲,但是參考官網文檔出現很多異常内容,總結如下:
1.參考文檔
flink官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/oss/
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICMyYTMvw1dvwlMvwlM3VWaWV2Zh1Wa-cmbw5yNlhjYiFmN2IDN2EmMmdzMiljZ4MzNkRjN1AzYzQTNh9CXwADMwATMtUGall3LcVmdhNXLwRHdo9CXt92YucWbpRWdvx2Yx5yazF2Lc9CX6MHc0RHaiojIsJye.png)
2.異常内容
2.1 Access key id should not be null or empty
根據官方文檔,flink-conf.yaml配置oss相關的内容後,發現EnvironmentVariableCredentialsProvider讀取不到對應的值内容,異常詳情如下:
Caused by: org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.auth.InvalidCredentialsException: Access key id should not be null or empty.
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:44) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.createDefaultContext(OSSOperation.java:166) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:114) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.getObjectMetadata(OSSObjectOperation.java:458) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:579) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.getObjectMetadata(OSSClient.java:569) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.getObjectMetadata(AliyunOSSFileSystemStore.java:277) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:256) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.exists(HadoopFileSystem.java:160) ~[flink-app-jar.jar:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:148) ~[flink-app-jar.jar:?]
at org.apache.flink.core.fs.FileSystem.initOutPathDistFS(FileSystem.java:977) ~[flink-app-jar.jar:?]
at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:286) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:99) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:221) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:291) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:256) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:238) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:108) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:323) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:310) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:96) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:41) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:141) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:80) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:450) ~[flink-app-jar.jar:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
複制
檢視源代碼發現,EnvironmentVariableCredentialsProvider使用的是OSS_ACCESS_KEY_ID,通過System.getenv的方式讀取。
更改flink-conf-yaml的授權類為SystemPropertiesCredentialsProvider
fs.oss.credentials.provider: com.aliyun.oss.common.auth.SystemPropertiesCredentialsProvider
複制
發現還是報Access key id should not be null or empty的異常,閱讀SystemPropertiesCredentialsProvider源代碼發現:
通過System.getProperty的方式讀取,主要是JVM的-D參數内容,而在flink-conf.yarm是通過
//flink conf
Configuration conf = new Configuration();
conf.setString("","");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.getConfig().setGlobalJobParameters(conf);
複制
類似GlobalJobParameter方式處理,對應運作任務的時候參數内容顯示:
2021-06-08 22:39:58,528 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: fs.oss.accessKeyId, Lxxxxxxxxxxxxxxxxxxx
2021-06-08 22:39:58,528 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: fs.oss.accessKeySecret, ******
2021-06-08 22:39:58,528 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: yarn.application.name, event_topic
2021-06-08 22:39:58,529 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: fs.oss.endpoint, https://oss-xxxx.aliyuncs.com
2021-06-08 22:39:58,529 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: fs.oss.credentials.provider, com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
複制
是以嘗試第一次 -yD的參數處理方式
/opt/flink-1.12.0/bin/flink run -m yarn-cluster -ynm event_topic -p 1 -yqu nifi -yjm 1024m -ytm 1024m -yD oss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -yD oss.accessKeySecret= ****** -c com.am.oss.SdkKafkaToOss /home/ws_cdp_dev_admin/flink-app-jar.jar
複制
結果還是生效到GlobalConfiguration,是以更改配置,通過jvm 參數的方式處理:
env.java.opts.jobmanager: -Doss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -Doss.accessKeySecret=******
env.java.opts.taskmanager: -Doss.accessKeyId=Lxxxxxxxxxxxxxxxxxxx -Doss.accessKeySecret=******
複制
該異常問題解決,如此看來官方文檔說的不是很準。
2.2 OVERWRITE的問題
streamSource.writeAsText("oss://ws-datacenter/user_event/dt=${dt}/demo.json", FileSystem.WriteMode.NO_OVERWRITE);
複制
這個API有兩個問題,不懂動态的處理,隻能在指定的地方寫入對應資料,那勢必造成流資料寫入到該檔案後檔案過大的問題,另外是不支援NO_OVERWRITE。
2.3 Recoverable writers on Hadoop are only supported for HDFS異常
更改對應寫入oss的邏輯代碼,代碼内容如下:
String path = "oss://ws-datacenter/user_event/day=20210608/tid=UA-FilmoraGo-Android/sdk=sa_sdk/user_event.json";
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("user_event")
.withPartSuffix(".json")
.build();
StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
new Path(path),
new SimpleStringEncoder<String>("UTF-8")
)
.withBucketAssigner(new DateTimeBucketAssigner<>())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.SECONDS.toMillis(2))
.withInactivityInterval(TimeUnit.SECONDS.toMillis(1))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.withOutputFileConfig(config)
.build();
//或者BucketingSink的方式
BucketingSink<String> sink = new BucketingSink<String>(path);
sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd_HH-mm"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 256L);
sink.setBatchRolloverInterval(30 * 60 * 1000L);
sink.setInactiveBucketThreshold(3 * 60 * 1000L);
sink.setInactiveBucketCheckInterval(30 * 1000L);
sink.setInProgressSuffix(".in-progress");
sink.setPendingSuffix(".pending");
streamSource.addSink(sink);
複制
結果都報Recoverable writers on Hadoop are only supported for HDFS異常
2021-06-09 14:57:44,292 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Filter -> Sink: Unnamed (1/1) (2939f69ee024a3dd3faed2c658165ac6) switched from RUNNING to FAILED on container_e131_1618429488036_60239_01_000002 @ ws-hdp06 (dataPort=41092).
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
at org.apache.flink.fs.osshadoop.common.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:61) ~[flink-app-jar.jar:?]
at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:210) ~[flink-app-jar.jar:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134) ~[flink-app-jar.jar:?]
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:260) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:412) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-app-jar.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) ~[flink-app-jar.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) ~[flink-app-jar.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
複制
使用StreamingFileSink檢視源代碼發現:
this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
複制
而使用的oss協定方式,不是能夠Recoverable,進行復原處理的。是以隻能通過自定義sink的方式處理,隻能說有時候官網的文檔也會誘導人,或者功能使用的時候還是欠佳。