天天看点

Flume-ng生产环境实践(二)flume-ng 测试过程中event丢失部分body数据

经过测试发现,当source端单event的body数据大于16字节后,输出到目标只剩下16字节。进过多源代码的分析,发现,源代码中进行了截取。 在LoggerSink.java中: if (event != null) {         if (logger.isInfoEnabled()) {           logger.info("Event: " + EventHelper.dumpEvent(event));         } }

我们去看EventHelper.java的dumpEvent方法: private static final int DEFAULT_MAX_BYTES = 16; public static String dumpEvent(Event event) {     return dumpEvent(event, DEFAULT_MAX_BYTES); }

public static String dumpEvent(Event event, int maxBytes) {     StringBuilder buffer = new StringBuilder();     if (event == null || event.getBody() == null) {       buffer.append("null");     } else if (event.getBody().length == 0) {       // do nothing... in this case, HexDump.dump() will throw an exception     } else {       byte[] body = event.getBody();       byte[] data = Arrays.copyOf(body, Math.min(body.length, maxBytes));       ByteArrayOutputStream out = new ByteArrayOutputStream();       try {         HexDump.dump(data, 0, out, 0);         String hexDump = new String(out.toByteArray());         // remove offset since it's not relevant for such a small dataset         if(hexDump.startsWith(HEXDUMP_OFFSET)) {           hexDump = hexDump.substring(HEXDUMP_OFFSET.length());         }         buffer.append(hexDump);       } catch (Exception e) {        if(LOGGER.isInfoEnabled()) {          LOGGER.info("Exception while dumping event", e);        }         buffer.append("...Exception while dumping: ").append(e.getMessage());       }       String result = buffer.toString();       if(result.endsWith(EOL) && buffer.length() > EOL.length()) {         buffer.delete(buffer.length() - EOL.length(), buffer.length()).toString();       }     }     return "{ headers:" + event.getHeaders() + " body:" + buffer + " }";   }

不难看出,在event处理过程中,发生了数据截取操作。