记录前段时间写的一个实时解析日志应用,本次采用flume对接日志,进入kafka,spark-streaming实时消费kafka里面的数据,然后结果持久化保存在hdfs里面。
1、数据采集
数据采集使用的是flume,在这个地方flume采集到日志之后,将会在hdfs保存一份原始日志,同时将日志实时写入kafka的指定topic(例如stream_topic_1)。
2、数据解析
数据解析分为两个部分,将会启动两个流:
第一个流用来消费kafka里面的原始日志(stream_topic_1),通过正则解析,将结果写入kafka(dataset_stream_topic_1)。
消费kafka里面的原始日志,应该从最新的日志开始消费,也就是"auto.offset.reset" -> "latest",以免总是在消费历史数据,没法达到实时性。
第二个流消费解析之后的日志(dataset_stream_topic_1)然后写入snappy文件。生成的snapp的文件名,要带有相应日志的时间字段。例如:20180508100000_8d6245f0be124fdbaa5ae4e6014aa6ec.snappy。注意:在写完snappy文件的close之后,应该将文件路径修改一下,避免后面合并文件的时候造成读写混乱。
最后在启动一个定时任务,将第二个流生成的snappy文件合并成parquet文件。每次启动任务的时候,将所有未合并的snappy文件(此时所有读到的snappy文件都已经关闭了),按照时间进行group,将同一个时间段的日志一次性写进同一个parquet文件。然后关闭parquet文件。
如果系统里面还要加上其他的数据导出功能,可以直接再起一个流,消费解析之后的数据kafka(dataset_stream_topic_1)