优化flume:
用flume接受tomcat的日志文件catalina.out,将接受的日志文件发送到kafka主题。问题是flume经常挂,临时解决方法是写脚本自动拉起。
flume主进程不容易挂,容易挂的是子进程,也就是读取tomcat文件的命令所再进程容易挂。
flume配置文件和拉起脚本如下:
flume配置文件:
# Name the components on this agent
a1.sources = x1
a1.channels = c
a1.sinks = r
# Describe/configure the source
a1.sources.x1.type = exec
a1.sources.x1.command = tail -F /app/tomcat9991/logs/catalina.out
a1.sources.x1.channels = c
# Describe the sink
a1.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.r.kafka.topic = xcxInfoLog
a1.sinks.r.kafka.bootstrap.servers = 10.251.27.123:9092,10.251.27.124:9092,10.251.27.125:9092
a1.sinks.r.channel = c
# Use a channel which buffers events in memory
a1.channels.c.type = memory
a1.channels.c.capacity = 5000
a1.channels.c.transactionCapacity = 5000
a1.sources.x1.interceptors = i1
a1.sources.x1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.x1.interceptors.i1.headerName = key
a1.sources.x1.interceptors.i1.preserveExisting = false
拉起flume进程的脚本:
#小程序Tomcat9991日志,flume中Source端监控并重启
source /home/logv/.bash_profile
num=1
result01=''
result02=`ps -ef|awk '{print $8 $9 $10}'|grep tail-F/app/tomcat9991/logs/catalina.out|grep -v grep|wc -l`
if [ $result02 -lt $num ]; then
result01=`ps -ef|grep xcxInfoLogTomcat9991.properties|grep -v grep|awk '{print $2}'`
kill -9 $result01
nohup /home/logv/bigdata/flume1.8/bin/flume-ng agent -c /home/logv/bigdata/flume1.8/conf -f /home/logv/flumeProperties/xcxLog/xcxInfoLogTomcat9991.properties -n a1 -Dflume.root.logger=ERROR,console > /home/logv/flumeProperties/logs/xcxInfoLogTomcat9991.log 2>&1 &
fi
解决方案:
提升flume中channel的内存:修改flume-env.sh文件,调整flume-env.sh文件中agent默认初始化的内存:默认是20M,现改成2G
export JAVA_OPTS="-Xms1024m -Xmx2048m -Xss256k -Xmn1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xloggc:/work/app/flume/logs/server-gc.log.$(date +%F) -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=1 -XX:GCLogFileSize=200M"
其他的配置在配置文件中稍加改动,优化后的配置文件:
# Name the components on this agent
a2.sources = s1
a2.channels = c
a2.sinks = s2
# Describe/configure the source
a2.sources.s1.type = exec
a2.sources.s1.command = tail -F /app/tomcat9991/logs/catalina.out
#增加了该配置,每次doput的容量,也就是source读取多少event时,将数据发送到channel
a2.sources.s1.batchSize=1000
#增加了该配置,source每隔多久将数据发送到channel
a2.sources.s1.batchTimeout = 3000
a2.sources.s1.channels = c
# Describe the sink
a2.sinks.s2.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.s2.kafka.topic = test1
a2.sinks.s2.kafka.bootstrap.servers = 10.251.27.123:9092,10.251.27.124:9092,10.251.27.125:9092,10.251.27.102:9092,10.251.27.121:9092
#更改了生产者确认机制,该配置项默认是-1.配置成1是一种折中的优化方式,这种方式需要leader成功将数据写入本地log,但是所有的follower
#是否成功写入没有经过确认,这种情况下如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。该方法是性能和安全性的一种折中
a2.sinks.s2.kafka.producer.acks= 1
#增加了该配置,增大了sink批处理的大小,提升处理速度
a2.sinks.s2.flumeBatchSize = 2000
a2.sinks.s2.kafka.producer.max.request.size=20971520
a2.sinks.s2.channel = c
a2.channels.c.type = memory
#增大该配置,增大的是channel的容量
a2.channels.c.capacity = 1000000
#增大该配置,putlist和takelist的大小,该值需小于capacity
a2.channels.c.transactionCapacity = 100000
#增加该配置项,该配置项指event header占用jvm的比例,默认是百分之20,现降低至百分之10
a2.channels.c.byteCapacityBufferPercentage=10
a2.sources.s1.interceptors = i1
a2.sources.s1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a2.sources.s1.interceptors.i1.headerName = key
a2.sources.s1.interceptors.i1.preserveExisting = false
启动命令:
nohup /home/logv/bigData/flume1.8/bin/flume-ng agent -n a2 -c /home/logv/bigData/flume1.8/conf -f /home/logv/flumeProperties/xcxLog/testKafka.properties &
启动后flume子进程不挂,部分数据也正常写入kafka,但是报错:
2018-12-26 23:41:12,598 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:264)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
报错原因提示是请求包含的消息大于服务器将接受的最大消息大小。这不是flume的错误,而是kafka的问题,kafka中,能接受的单条消息的大小是有限制的,默认是1M,由于现有日志中包含图片信息,远大于1M,所以提升kafka能接受的单条消息的大小程度。有两种方式,一种是修改某一个topic,一种是修改kafka的配置文件。
修改某一个topic:/app/bigData/kafka/bin/kafka-topics.sh --zookeeper 10.251.27.123:2181 --alter --topic test1 --config max.message.bytes=209715200 提升至200M
修改配置文件:在kafka的server.properties配置上添加两个配置:
#broker能接收消息的最大字节数
message.max.bytes=209715200
#broker可复制的消息的最大字节数,该配置项必须不小message.max.bytes,因为该配置项是消费者从partition中获取消息放入内存中所用的内存大小,
#如果小于message.max.bytes,可能会导致给消费者分配的内存放不下一个message
replica.fetch.max.bytes=209715200