天天看點

Elasticsearch 與 Kafka 整合剖析1.概述2.内容3.實作4.排程5.總結6.結束語

1.概述

  目前,随着大資料的浪潮,Kafka 被越來越多的企業所認可,如今的Kafka已發展到0.10.x,其優秀的特性也帶給我們解決實際業務的方案。對于資料分流來說,既可以分流到離線存儲平台(HDFS),離線計算平台(Hive倉庫),也可以分流實時流水計算(Storm,Spark)等,同樣也可以分流到海量資料查詢(HBase),或是及時查詢(ElasticSearch)。而今天筆者給大家分享的就是Kafka 分流資料到 ElasticSearch。

2.内容

  我們知道,ElasticSearch是有其自己的套件的,簡稱ELK,即ElasticSearch,Logstash以及Kibana。ElasticSearch負責存儲,Logstash負責收集資料來源,Kibana負責可視化資料,分工明确。想要分流Kafka中的消息資料,可以使用Logstash的插件直接消費,但是需要我們編寫複雜的過濾條件,和特殊的映射處理,比如系統保留的`_uid`字段等需要我們額外的轉化。今天我們使用另外一種方式來處理資料,使用Kafka的消費API和ES的存儲API來處理分流資料。通過編寫Kafka消費者,消費對應的業務資料,将消費的資料通過ES存儲API,通過建立對應的索引的,存儲到ES中。其流程如下圖所示:

  上圖可知,消費收集的資料,通過ES提供的存儲接口進行存儲。存儲的資料,這裡我們可以規劃,做定時排程。最後,我們可以通過Kibana來可視化ES中的資料,對外提供業務調用接口,進行資料共享。

3.實作

  下面,我們開始進行實作細節處理,這裡給大家提供實作的核心代碼部分,實作代碼如下所示:

3.1 定義ES格式

  我們以插件的形式進行消費,從Kafka到ES的資料流向,隻需要定義插件格式,如下所示:

{         "job": {             "content": {                 "reader": {                     "name": "kafka",                     "parameter": {                         "topic": "kafka_es_client_error",                         "groupid": "es2",                         "bootstrapServers": "k1:9094,k2:9094,k3:9094"                     },                     "threads": 6                 },                 "writer": {                     "name": "es",                     "parameter": {                         "host": [                             "es1:9300,es2:9300,es3:9300"                         ],                         "index": "client_error_%s",                         "type": "client_error"                     }                 }             }         }     }      

  這裡處理消費存儲的方式,将讀和寫的源分開,配置各自屬性即可。

3.2 資料存儲

  這裡,我們通過每天建立索引進行存儲,便于業務查詢,實作細節如下所示:

public class EsProducer {         private final static Logger LOG = LoggerFactory.getLogger(EsProducer.class);         private final KafkaConsumer<String, String> consumer;         private ExecutorService executorService;         private Configuration conf = null;         private static int counter = 0;         public EsProducer() {             String root = System.getProperty("user.dir") + "/conf/";             String path = SystemConfigUtils.getProperty("kafka.x.plugins.exec.path");             conf = Configuration.from(new File(root + path));             Properties props = new Properties();             props.put("bootstrap.servers", conf.getString("job.content.reader.parameter.bootstrapServers"));             props.put("group.id", conf.getString("job.content.reader.parameter.groupid"));             props.put("enable.auto.commit", "true");             props.put("auto.commit.interval.ms", "1000");             props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");             props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");             consumer = new KafkaConsumer<String, String>(props);             consumer.subscribe(Arrays.asList(conf.getString("job.content.reader.parameter.topic")));         }         public void execute() {             executorService = Executors.newFixedThreadPool(conf.getInt("job.content.reader.threads"));             while (true) {                 ConsumerRecords<String, String> records = consumer.poll(100);                 if (null != records) {                     executorService.submit(new KafkaConsumerThread(records, consumer));                 }             }         }         public void shutdown() {             try {                 if (consumer != null) {                     consumer.close();                 }                 if (executorService != null) {                     executorService.shutdown();                 }                 if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {                     LOG.error("Shutdown kafka consumer thread timeout.");                 }             } catch (InterruptedException ignored) {                 Thread.currentThread().interrupt();             }         }         class KafkaConsumerThread implements Runnable {             private ConsumerRecords<String, String> records;             public KafkaConsumerThread(ConsumerRecords<String, String> records, KafkaConsumer<String, String> consumer) {                 this.records = records;             }             @Override             public void run() {                 String index = conf.getString("job.content.writer.parameter.index");                 String type = conf.getString("job.content.writer.parameter.type");                 for (TopicPartition partition : records.partitions()) {                     List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);                     for (ConsumerRecord<String, String> record : partitionRecords) {                         JSONObject json = JSON.parseObject(record.value());                         List<Map<String, Object>> list = new ArrayList<>();                         Map<String, Object> map = new HashMap<>();                         index = String.format(index, CalendarUtils.timeSpan2EsDay(json.getLongValue("_tm") * 1000L));                         if (counter < 10) {                             LOG.info("Index : " + index);                             counter++;                         }                         for (String key : json.keySet()) {                             if ("_uid".equals(key)) {                                 map.put("uid", json.get(key));                             } else {                                 map.put(key, json.get(key));                             }                             list.add(map);                         }                         EsUtils.write2Es(index, type, list);                     }                 }             }         }     }      

  這裡消費的資料源就處理好了,接下來,開始ES的存儲,實作代碼如下所示:

public class EsUtils {     	private static TransportClient client = null;     	static {     		if (client == null) {     			client = new PreBuiltTransportClient(Settings.EMPTY);     		}     		String root = System.getProperty("user.dir") + "/conf/";     		String path = SystemConfigUtils.getProperty("kafka.x.plugins.exec.path");     		Configuration conf = Configuration.from(new File(root + path));     		List<Object> hosts = conf.getList("job.content.writer.parameter.host");     		for (Object object : hosts) {     			try {     				client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(object.toString().split(":")[0]), Integer.parseInt(object.toString().split(":")[1])));     			} catch (Exception e) {     				e.printStackTrace();     			}     		}     	}     	public static void write2Es(String index, String type, List<Map<String, Object>> dataSets) {     		BulkRequestBuilder bulkRequest = client.prepareBulk();     		for (Map<String, Object> dataSet : dataSets) {     			bulkRequest.add(client.prepareIndex(index, type).setSource(dataSet));     		}     		bulkRequest.execute().actionGet();     		// if (client != null) {     		// client.close();     		// }     	}     	public static void close() {     		if (client != null) {     			client.close();     		}     	}	     }      

  這裡,我們利用BulkRequestBuilder進行批量寫入,減少頻繁寫入率。

4.排程

  存儲在ES中的資料,如果不需要長期存儲,比如:我們隻需要存儲及時查詢資料一個月,對于一個月以前的資料需要清除掉。這裡,我們可以編寫腳本直接使用Crontab來進行簡單調用即可,腳本如下所示:

#!/bin/sh     # <Usage>: ./delete_es_by_day.sh kafka_error_client logsdate 30 </Usage>     echo "<Usage>: ./delete_es_by_day.sh kafka_error_client logsdate 30 </Usage>"      
index_name=$1     daycolumn=$2     savedays=$3     format_day=$4     if [ ! -n "$savedays" ]; then       echo "Oops. The args is not right,please input again...."       exit 1     fi     if [ ! -n "$format_day" ]; then        format_day='%Y%m%d'     fi     sevendayago=`date -d "-${savedays} day " +${format_day}`     curl -XDELETE "es1:9200/${index_name}/_query?pretty" -d "     {             "query": {                     "filtered": {                             "filter": {                                     "bool": {                                             "must": {                                                     "range": {                                                             "${daycolumn}": {                                                                     "from": null,                                                                     "to": ${sevendayago},                                                                     "include_lower": true,                                                                     "include_upper": true                                                             }                                                     }                                             }                                     }                             }                     }             }     }"     echo "Finished."      

然後,在Crontab中進行定時排程即可。

5.總結

  這裡,我們在進行資料寫入ES的時候,需要注意,有些字段是ES保留字段,比如`_uid`,這裡我們需要轉化,不然寫到ES的時候,會引發沖突導緻異常,最終寫入失敗。

6.結束語

  這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行讨論或發送郵件給我,我會盡我所能為您解答,與君共勉

聯系方式:

郵箱:[email protected]

Twitter:

https://twitter.com/smartloli

QQ群(Hadoop - 交流社群1):

424769183

溫馨提示:請大家加群的時候寫上加群理由(姓名+公司/學校),友善管理者稽核,謝謝!

熱愛生活,享受程式設計,與君共勉!

作者:哥不是小蘿莉 [ 關于我 ][ 犒賞

出處: http://www.cnblogs.com/smartloli/

轉載請注明出處,謝謝合作!