天天看點

flume bucketpath的bug一例

今天在做flume+kerberos寫入hdfs時遇到的問題。

測試的配置檔案:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

<code>agent-server1.sources= testtail</code>

<code>agent-server1.sinks = hdfs-sink</code>

<code>agent-server1.channels= hdfs-channel</code>

<code>agent-server1.sources.testtail.</code><code>type</code> <code>= netcat</code>

<code>agent-server1.sources.testtail.bind = localhost</code>

<code>agent-server1.sources.testtail.port = 9999</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.kerberosPrincipal = hdfs</code><code>/_HOST</code><code>@KERBEROS_HADOOP</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.kerberosKeytab = </code><code>/home/vipshop/conf/hdfs</code><code>.keytab</code>

<code>agent-server1.channels.hdfs-channel.</code><code>type</code> <code>= memory</code>

<code>agent-server1.channels.hdfs-channel.capacity = 200000000</code>

<code>agent-server1.channels.hdfs-channel.transactionCapacity = 10000</code>

<code>agent-server1.sinks.hdfs-sink.</code><code>type</code> <code>= hdfs</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.path = hdfs:</code><code>//bipcluster/tmp/flume/</code><code>%Y%m%d</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.rollInterval = 60</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.rollSize = 0</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.rollCount = 0</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.threadsPoolSize = 10</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.round = </code><code>false</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.roundValue = 30</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.roundUnit = minute</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.batchSize = 100</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.fileType = DataStream</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.writeFormat = Text</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.callTimeout = 60000</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.idleTimeout = 100</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.filePrefix = ip</code>

<code>agent-server1.sinks.hdfs-sink.channel = hdfs-channel</code>

<code>agent-server1.sources.testtail.channels = hdfs-channel</code>

在啟動服務後,使用telnet進行測試,發現如下報錯:

29

30

31

<code>14</code><code>/</code><code>03</code><code>/</code><code>24</code> <code>18</code><code>:</code><code>03</code><code>:</code><code>07</code> <code>ERROR hdfs.HDFSEventSink: process failed</code>

<code>java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing.</code>

<code> </code><code>Please check that you're correctly populating timestamp header (</code><code>for</code> <code>example using TimestampInterceptor source interceptor).</code>

<code>        </code><code>at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:</code><code>160</code><code>)</code>

<code>        </code><code>at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:</code><code>343</code><code>)</code>

<code>        </code><code>at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:</code><code>392</code><code>)</code>

<code>        </code><code>at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:</code><code>68</code><code>)</code>

<code>        </code><code>at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:</code><code>147</code><code>)</code>

<code>        </code><code>at java.lang.Thread.run(Thread.java:</code><code>662</code><code>)</code>

<code>Caused by: java.lang.NumberFormatException: </code><code>null</code>

<code>        </code><code>at java.lang.Long.parseLong(Long.java:</code><code>375</code><code>)</code>

<code>        </code><code>at java.lang.Long.valueOf(Long.java:</code><code>525</code><code>)</code>

<code>        </code><code>at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:</code><code>158</code><code>)</code>

<code>        </code><code>... </code><code>5</code> <code>more</code>

<code>14</code><code>/</code><code>03</code><code>/</code><code>24</code> <code>18</code><code>:</code><code>03</code><code>:</code><code>07</code> <code>ERROR flume.SinkRunner: Unable to deliver event. Exception follows.</code>

<code>org.apache.flume.EventDeliveryException: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to</code>

<code>resolve time based bucketing. Please check that you're correctly populating timestamp header (</code><code>for</code> <code>example using TimestampInterceptor source interceptor).</code>

<code>        </code><code>at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:</code><code>461</code><code>)</code>

<code>Caused by: java.lang.RuntimeException: Flume wasn</code><code>'t able to parse timestamp header in the event to resolve time based bucketing. Please check that you'</code><code>re correctly populating timestamp header (</code><code>for</code> <code>example using TimestampInterceptor source interceptor).</code>

<code>        </code><code>... </code><code>3</code> <code>more</code>

從調用棧的資訊來看,錯誤出在org.apache.flume.formatter.output.BucketPath類的replaceShorthand方法。

在org.apache.flume.sink.hdfs.HDFSEventSink類中,使用process方法來生成hdfs的url,其中主要是調用了BucketPath類的escapeString方法來進行字元的轉換,并最終調用了replaceShorthand方法。

其中replaceShorthand方法的相關代碼如下:

<code>  </code><code>public</code> <code>static</code> <code>String replaceShorthand(</code><code>char</code> <code>c, Map&lt;String, String&gt; headers,</code>

<code>      </code><code>TimeZone timeZone, </code><code>boolean</code> <code>needRounding, </code><code>int</code> <code>unit, </code><code>int</code> <code>roundDown) {</code>

<code>    </code><code>String timestampHeader = headers.get(</code><code>"timestamp"</code><code>);</code>

<code>    </code><code>long</code> <code>ts;</code>

<code>    </code><code>try</code> <code>{</code>

<code>      </code><code>ts = Long.valueOf(timestampHeader);</code>

<code>    </code><code>} </code><code>catch</code> <code>(NumberFormatException e) {</code>

<code>      </code><code>throw</code> <code>new</code> <code>RuntimeException(</code><code>"Flume wasn't able to parse timestamp header"</code>

<code>        </code><code>+ </code><code>" in the event to resolve time based bucketing. Please check that"</code>

<code>        </code><code>+ </code><code>" you're correctly populating timestamp header (for example using"</code>

<code>        </code><code>+ </code><code>" TimestampInterceptor source interceptor)."</code><code>, e);</code>

<code>    </code><code>}</code>

<code>    </code><code>if</code><code>(needRounding){</code>

<code>      </code><code>ts = roundDown(roundDown, unit, ts);</code>

<code>........</code>

從代碼中可以看到,timestampHeader 的值如果取不到,在向ts指派時就會報錯。。

這其實是flume的一個bug,bug id:

<a href="https://issues.apache.org/jira/browse/FLUME-1419" target="_blank">https://issues.apache.org/jira/browse/FLUME-1419</a>

解決方法有3個:

1.更改配置,更新hdfs檔案的路徑格式

<code>agent-server1.sinks.hdfs-sink.hdfs.path = hdfs:</code><code>//bipcluster/tmp/flume</code>

但是這樣就不能按天來存放日志了

2.通過更改相關的代碼

(patch:https://issues.apache.org/jira/secure/attachment/12538891/FLUME-1419.patch)

如果在headers中擷取不到timestamp的值,就給它一個目前timestamp的值。

相關代碼:

<code>     </code><code>String timestampHeader = headers.get(</code><code>"timestamp"</code><code>);</code>

<code>     </code><code>long ts;</code>

<code>     </code><code>try {</code>

<code>      </code><code>if</code> <code>(timestampHeader == null) {</code>

<code>        </code><code>ts = System.currentTimeMillis();</code>

<code>      </code><code>} </code><code>else</code> <code>{</code>

<code>        </code><code>ts = Long.valueOf(timestampHeader);</code>

<code>      </code><code>}</code>

<code>     </code><code>} catch (NumberFormatException e) {</code>

<code>       </code><code>throw new RuntimeException(</code><code>"Flume wasn't able to parse timestamp header"</code>

<code>         </code><code>+ </code><code>" in the event to resolve time based bucketing. Please check that"</code>

<code>         </code><code>+ </code><code>" you're correctly populating timestamp header (for example using"</code>

<code>                  </code><code>+ </code><code>" TimestampInterceptor source interceptor)."</code><code>, e);</code>

<code>}</code>

3.為source定義基于timestamp的interceptors 

在配置中增加兩行即可:

<code>agent-server1.sources.testtail.interceptors = i1</code>

<code>agent-server1.sources.testtail.interceptors.i1.</code><code>type</code> <code>= org.apache.flume.interceptor.TimestampInterceptor$Builder</code>

一個技巧:

在debug flume的問題時,可以在flume的啟動參數中設定把debug日志打到console中。

<code>-Dflume.root.logger=DEBUG,console,LOGFILE</code>

<code></code>

本文轉自菜菜光 51CTO部落格,原文連結:http://blog.51cto.com/caiguangguang/1384187,如需轉載請自行聯系原作者

繼續閱讀