天天看点

flume子进程容易死亡的问题解决以及kafka对单条消息的限制

优化flume:

用flume接受tomcat的日志文件catalina.out,将接受的日志文件发送到kafka主题。问题是flume经常挂,临时解决方法是写脚本自动拉起。

flume主进程不容易挂,容易挂的是子进程,也就是读取tomcat文件的命令所再进程容易挂。

flume配置文件和拉起脚本如下:

flume配置文件:

# Name the components on this agent
		a1.sources = x1
		a1.channels = c
		a1.sinks = r

		# Describe/configure the source
		a1.sources.x1.type = exec
		a1.sources.x1.command = tail -F /app/tomcat9991/logs/catalina.out
		a1.sources.x1.channels = c

		# Describe the sink
		a1.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
		a1.sinks.r.kafka.topic = xcxInfoLog
		a1.sinks.r.kafka.bootstrap.servers = 10.251.27.123:9092,10.251.27.124:9092,10.251.27.125:9092
		a1.sinks.r.channel = c

		# Use a channel which buffers events in memory
		a1.channels.c.type = memory
		a1.channels.c.capacity = 5000
		a1.channels.c.transactionCapacity = 5000

		a1.sources.x1.interceptors = i1
		a1.sources.x1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
		a1.sources.x1.interceptors.i1.headerName = key
		a1.sources.x1.interceptors.i1.preserveExisting = false
           

拉起flume进程的脚本:

#小程序Tomcat9991日志,flume中Source端监控并重启
		source /home/logv/.bash_profile
		num=1
		result01=''
		result02=`ps -ef|awk '{print $8 $9 $10}'|grep tail-F/app/tomcat9991/logs/catalina.out|grep -v grep|wc -l`
		if [ $result02 -lt $num ]; then
		result01=`ps -ef|grep xcxInfoLogTomcat9991.properties|grep -v grep|awk '{print $2}'`
		kill -9 $result01
		nohup /home/logv/bigdata/flume1.8/bin/flume-ng agent -c /home/logv/bigdata/flume1.8/conf -f /home/logv/flumeProperties/xcxLog/xcxInfoLogTomcat9991.properties -n a1 -Dflume.root.logger=ERROR,console > /home/logv/flumeProperties/logs/xcxInfoLogTomcat9991.log 2>&1 &
		fi
           

解决方案:

提升flume中channel的内存:修改flume-env.sh文件,调整flume-env.sh文件中agent默认初始化的内存:默认是20M,现改成2G

export JAVA_OPTS="-Xms1024m -Xmx2048m -Xss256k -Xmn1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xloggc:/work/app/flume/logs/server-gc.log.$(date +%F) -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=1 -XX:GCLogFileSize=200M"

其他的配置在配置文件中稍加改动,优化后的配置文件:

# Name the components on this agent
a2.sources = s1 
a2.channels = c
a2.sinks = s2

# Describe/configure the source
a2.sources.s1.type = exec
a2.sources.s1.command =	tail -F /app/tomcat9991/logs/catalina.out
#增加了该配置,每次doput的容量,也就是source读取多少event时,将数据发送到channel
a2.sources.s1.batchSize=1000
#增加了该配置,source每隔多久将数据发送到channel
a2.sources.s1.batchTimeout = 3000
a2.sources.s1.channels = c

# Describe the sink
a2.sinks.s2.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.s2.kafka.topic = test1
a2.sinks.s2.kafka.bootstrap.servers = 10.251.27.123:9092,10.251.27.124:9092,10.251.27.125:9092,10.251.27.102:9092,10.251.27.121:9092
#更改了生产者确认机制,该配置项默认是-1.配置成1是一种折中的优化方式,这种方式需要leader成功将数据写入本地log,但是所有的follower
#是否成功写入没有经过确认,这种情况下如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。该方法是性能和安全性的一种折中
a2.sinks.s2.kafka.producer.acks= 1
#增加了该配置,增大了sink批处理的大小,提升处理速度
a2.sinks.s2.flumeBatchSize = 2000
a2.sinks.s2.kafka.producer.max.request.size=20971520
a2.sinks.s2.channel = c

a2.channels.c.type = memory
#增大该配置,增大的是channel的容量
a2.channels.c.capacity = 1000000
#增大该配置,putlist和takelist的大小,该值需小于capacity
a2.channels.c.transactionCapacity = 100000
#增加该配置项,该配置项指event header占用jvm的比例,默认是百分之20,现降低至百分之10
a2.channels.c.byteCapacityBufferPercentage=10
				
a2.sources.s1.interceptors = i1
a2.sources.s1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a2.sources.s1.interceptors.i1.headerName = key
a2.sources.s1.interceptors.i1.preserveExisting = false
           

启动命令:

nohup /home/logv/bigData/flume1.8/bin/flume-ng agent -n a2 -c /home/logv/bigData/flume1.8/conf -f /home/logv/flumeProperties/xcxLog/testKafka.properties &

启动后flume子进程不挂,部分数据也正常写入kafka,但是报错:

2018-12-26 23:41:12,598 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
		org.apache.flume.EventDeliveryException: Failed to publish events
			at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:264)
			at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
			at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
			at java.lang.Thread.run(Thread.java:748)
		Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
           

报错原因提示是请求包含的消息大于服务器将接受的最大消息大小。这不是flume的错误,而是kafka的问题,kafka中,能接受的单条消息的大小是有限制的,默认是1M,由于现有日志中包含图片信息,远大于1M,所以提升kafka能接受的单条消息的大小程度。有两种方式,一种是修改某一个topic,一种是修改kafka的配置文件。

修改某一个topic:/app/bigData/kafka/bin/kafka-topics.sh --zookeeper 10.251.27.123:2181 --alter --topic test1 --config max.message.bytes=209715200 提升至200M

修改配置文件:在kafka的server.properties配置上添加两个配置:

#broker能接收消息的最大字节数

message.max.bytes=209715200

#broker可复制的消息的最大字节数,该配置项必须不小message.max.bytes,因为该配置项是消费者从partition中获取消息放入内存中所用的内存大小,

#如果小于message.max.bytes,可能会导致给消费者分配的内存放不下一个message

replica.fetch.max.bytes=209715200

继续阅读