00
序言
彙總作業 (rollup jobs)是一項定期任務,它将來自索引模式指定的索引中的資料進行彙總,然後将其彙總到新的索引中。彙總索引是緊湊存儲數月或數年曆史資料以供可視化和報告使用的好方法。
用到 rollup 的情況是我們有很多的曆史資料,而且通常會比較大。通過使用 rollup 功能,我們可以把很多針對大量資料的統計變為針對經過 rollup 後的索引操作,進而使得資料的統計更加有效。
在實際的應用中:
- 在很多的情況下,保留曆史資料不是一種最佳的選擇。這是因為時序資料随着時間的推移,它們的價值沒有那麼高,需要删除這些資料以節省成本;
- 但是我們還是希望能夠保留它們的統計資料用于分析。
Elasticsearch 自 6.3 版本推出 rollup 功能,它可以幫我們:
- 保留舊資料,并以一種緊湊和聚合的方式來存儲
- 僅僅保留我們感興趣的資料
在下面,我們來用一個具體的例子來展示如何使用 rollup 的。
01
任務建立
準備資料
我們首先需要找到一個資料比較大一點的索引。我們可以參考我之前的文章 “Logstash 入門教程 (二)” 。在那篇文章中,我們直接跳到文章的最後一節把整個檔案都導入到 Elasticticsearch 中。
我們可以看到我們的 cars 索引有達到 201M 的大小,而且它的總文檔數達到 30 萬。在接下來我們将使用 rollup 的功能對這個索引進行處理。在資料導入後,我們必須建立一個 index pattern,并在 Discover 中進行顯示。從下面的圖中,我們可以看出來,整個索引的資料是從 2019.5.9 到 2019.6.07 進行采集的。
建立 rollup job
我們打開 Kibana 界面,點選 Create rollup job 按鈕:
我們可以看出來這個 rollup job 針對一個時間系列的索引,把螢幕向下滾動。
我們可以看出來:rollup 是一個在背景不斷運作的一個任務。它會周期性地定時做這項工作,以使得最新已有的資料得到處理。
我選擇在每個時鐘過15分鐘時進行一次rollup,比如在1:15分做一次,2:15分做一次,3:15分做一次,依次類推。這依賴于你自己索引的大小及項目的性質決定的。
點選 Next 按鈕:
這個是針對 Date histogram 的配置,點選 Next 按鈕:
這個是針對 terms 的 選擇。點選 Add terms fields 按鈕:
我們選擇 geoip.country_code2:
按照同樣的方法,我們添加 agent.hostname:
點選 Next 按鈕:
這個頁面是可選項,如果我們感興趣的話,那麼我們點選 Add histogram fields:
我們選擇 Bytes 字段:
點選 Next 按鈕:
這個是對 Metrics 的配置。我們點選 Add metrics fields 按鈕:
我們選擇 bytes,由于 Metrics 隻是針對數值類型的字段,在上面我們可以看到所有的字段都是數值類型的。
我們接着勾上我們喜歡的 metrics 項。再點選 Next 按鈕:
在這個頁面我們可以看到這個 rollup 的概覽。如果你覺得不太滿意,你可以點選 Back 按鈕然後再進行重新配置。如果滿意的話,我們點選 Save 按鈕。如果你還想馬上就開始這個 job 的話,勾上 Start job now。我們點選 Save,并勾上 Start job now:
上面的狀态顯示這個job已經開始工作了。我們可以點選 Manage 按鈕來對這個 job 進行管理:
它可以讓我們停止這個 job 或者 克隆這個job。目前,我們既不想停止,也不想克隆。我們在這個界面還可以點選上面的幾個tab來檢視這個job的詳細資訊。
特别有意思的是我們甚至可以看到這個 job 的JSON 表達方式。退出這個Dialog:
從上面,我們可以看出來我們的 Job 正在運作中。
02
在 Kibana 中進行統計
我在 Kibana中,通過如下的指令來檢視所有的索引:
GET _cat/indices
結果顯示:
我們可以看到一個新的索引:apache_rollup。它的檔案顯示的非常之小,隻有21.8M,是不是覺得不可思議啊。它相比之前的那個 apache_elastic_example 來說,小了非常多。
我們通過對這個索引來對我們所關心的資料進行統計分析。當我們對這個 rollup 的索引進行分析時,參照連結https://www.elastic.co/guide/en/elasticsearch/reference/master/rollup-search.html,我們可以對它進行如下的方式的搜尋:
我們經過 rollup 的處理後,那麼對于我們的資料的統計來說有沒有什麼影響呢?我們先來做一些檢測。
找出最大值
GET apache_rollup/_rollup_search{ "size": 0, "aggs": { "my_max": { "max": { "field": "bytes" } } }}
傳回結果:
{ "took" : 9, "timed_out" : false, "terminated_early" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 0, "relation" : "eq" }, "max_score" : 0.0, "hits" : [ ] }, "aggregations" : { "my_max" : { "value" : 8.2090432E7 } }}
上面是根據 rollup 的索引得到的結果。我們來使用最原始的索引:
GET apache_elastic_example/_search{ "size": 0, "aggs": { "my_max": { "max": { "field": "bytes" } } }}{ "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 10000, "relation" : "gte" }, "max_score" : null, "hits" : [ ] }, "aggregations" : { "my_max" : { "value" : 8.209043E7 } }}
顯然傳回的結果是一樣的。
找出最小值
GET apache_rollup/_rollup_search{ "size": 0, "aggs": { "my_min": { "min": { "field": "bytes" } } }}
傳回的結果:
{ "took" : 7, "timed_out" : false, "terminated_early" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 0, "relation" : "eq" }, "max_score" : 0.0, "hits" : [ ] }, "aggregations" : { "my_min" : { "value" : 1.0 } }}
使用原始的資料:
GET apache_elastic_example/_search{ "size": 0, "aggs": { "my_min": { "min": { "field": "bytes" } } }}{ "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 10000, "relation" : "gte" }, "max_score" : null, "hits" : [ ] }, "aggregations" : { "my_min" : { "value" : 1.0 } }}
傳回的結果是一樣的。
找出平均值
GET apache_rollup/_rollup_search{ "size": 0, "aggs": { "my_avg": { "avg": { "field": "bytes" } } }}{ "took" : 13, "timed_out" : false, "terminated_early" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 0, "relation" : "eq" }, "max_score" : 0.0, "hits" : [ ] }, "aggregations" : { "my_avg" : { "value" : 2372996.9378802367 } }}
使用原始的資料:
GET apache_elastic_example/_search{ "size": 0, "aggs": { "my_avg": { "avg": { "field": "bytes" } } }}
傳回的結果是:
{ "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 10000, "relation" : "gte" }, "max_score" : null, "hits" : [ ] }, "aggregations" : { "my_avg" : { "value" : 2372996.957136324 } }}
結果的差異是非常之小的。
找出國家的TOP5
GET apache_rollup/_rollup_search{ "size":0, "aggs" : { "countries": { "terms": { "field": "geoip.country_code2.keyword", "size": 5 } } }} "aggregations" : { "countries" : { "doc_count_error_upper_bound" : 0, "sum_other_doc_count" : 66600, "buckets" : [ { "key" : "US", "doc_count" : 122800 }, { "key" : "FR", "doc_count" : 19226 }, { "key" : "DE", "doc_count" : 17415 }, { "key" : "NL", "doc_count" : 14720 }, { "key" : "CN", "doc_count" : 14581 } ] } }
使用最原始的資料:
GET apache_elastic_example/_search{ "size":0, "aggs" : { "countries": { "terms": { "field": "geoip.country_code2.keyword", "size": 5 } } }} "aggregations" : { "countries" : { "doc_count_error_upper_bound" : 0, "sum_other_doc_count" : 109662, "buckets" : [ { "key" : "US", "doc_count" : 122800 }, { "key" : "FR", "doc_count" : 19226 }, { "key" : "DE", "doc_count" : 17415 }, { "key" : "NL", "doc_count" : 14720 }, { "key" : "CN", "doc_count" : 14581 } ] } }
這兩個的統計結果是一樣的。
嘗試更為複雜的聚合
我們可以嘗試一下的統計:
GET apache_rollup/_rollup_search{ "size": 0, "query": { "term": { "agent.hostname.keyword" : { "value": "liuxg" } } }, "aggs" :{ "daily" : { "date_histogram" :{ "field": "@timestamp", "calendar_interval": "1d", "time_zone": "UTC" }, "aggs": { "avg_byete" : { "avg": { "field": "bytes" } } } } }}
上面是對 rollup 索引進行的統計:
"aggregations" : { "daily" : { "meta" : { }, "buckets" : [ { "key_as_string" : "2019-05-08T00:00:00.000Z", "key" : 1557273600000, "doc_count" : 688, "avg_byete" : { "value" : 126608.41791044777 } }, { "key_as_string" : "2019-05-09T00:00:00.000Z", "key" : 1557360000000, "doc_count" : 7721, "avg_byete" : { "value" : 317026.4027212793 } }, { "key_as_string" : "2019-05-10T00:00:00.000Z", "key" : 1557446400000, "doc_count" : 9044, "avg_byete" : { "value" : 551951.1379390019 } }, { "key_as_string" : "2019-05-11T00:00:00.000Z", "key" : 1557532800000, "doc_count" : 9412, "avg_byete" : { "value" : 261766.84161836826 } }, ...}
我們對最原始的索引來進行統計:
GET apache_elastic_example/_search{ "size": 0, "query": { "term": { "agent.hostname.keyword": { "value": "liuxg" } } }, "aggs": { "daily": { "date_histogram": { "field": "@timestamp", "calendar_interval": "1d", "time_zone": "UTC" }, "aggs": { "avg_bytes": { "avg": { "field": "bytes" } } } } }}
顯示結果:
"aggregations" : { "daily" : { "buckets" : [ { "key_as_string" : "2019-05-08T00:00:00.000Z", "key" : 1557273600000, "doc_count" : 688, "avg_bytes" : { "value" : 126608.41940298508 } }, { "key_as_string" : "2019-05-09T00:00:00.000Z", "key" : 1557360000000, "doc_count" : 7721, "avg_bytes" : { "value" : 317026.4042642727 } }, { "key_as_string" : "2019-05-10T00:00:00.000Z", "key" : 1557446400000, "doc_count" : 9044, "avg_bytes" : { "value" : 551951.1417513863 } }, { "key_as_string" : "2019-05-11T00:00:00.000Z", "key" : 1557532800000, "doc_count" : 9412, "avg_bytes" : { "value" : 261766.84351315204 } }, ... }
從上面的結果顯示,通過這兩種方式進行的統計的結果非常一緻。
從某種程度上講,我們甚至可以通過 Index cycle management 的方法隻保留一段時間的資料(比如最近的6個月的資料),而通過 rollup 方法繼續可以對之前的資料進行預設的統計。
03
通過 API 的方法實作
上面的方法通過界面非常直覺,但是 Elastic 也提供 API 的方法來設定。比如:
PUT _rollup/job/apache_rollup_job2{ "index_pattern": "apache_elastic_example*", "rollup_index": "apache_rollup2", "cron": "*/10 * * * * ?", "page_size": 1000, "groups": { "date_histogram": { "field": "@timestamp", "fixed_interval": "1h", "delay": "1d", "time_zone": "UTC" }, "terms": { "fields": [ "geoip.country_code2.keyword", "agent.hostname.keyword" ] } }, "metrics": [ { "field": "bytes", "metrics": [ "min", "max", "sum", "avg" ] } ]}
在上面,我們設定了幾乎和界面一樣的實作,隻不過在這裡,我們每隔10分鐘來進行一次 rollup。在這裡,我們把資料存于到 apache_rollup2 這個索引中。
這個apache_rollup2 的大小更小。但是它和apache_rollup 是一樣的效果。我們可以對這個索引做同樣的搜尋:
GET apache_rollup2/_rollup_search{ "size": 0, "aggs": { "my_avg": { "avg": { "field": "bytes" } } }} "took" : 2, "timed_out" : false, "terminated_early" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 0, "relation" : "eq" }, "max_score" : 0.0, "hits" : [ ] }, "aggregations" : { "my_avg" : { "value" : 2372996.9558440386 } }}
使用最原始的索引:
GET apache_elastic_example/_search{ "size": 0, "aggs": { "my_avg": { "avg": { "field": "bytes" } } }}{ "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 10000, "relation" : "gte" }, "max_score" : null, "hits" : [ ] }, "aggregations" : { "my_avg" : { "value" : 2372996.957136324 } }}
關于其它的比較,我這裡就不詳述了。你們可以自己來實踐。
如果我們想删除一個 rollup 的任務,我們必須先停止,然後,删除它的 rollup 索引,再進行删除的動作。這些有的可以在之前的那個UI裡通過 Manage 按鈕來實作。通過 API的方式來删除一個 rollup,請參考如下的步驟:
POST _rollup/job/apache_rollup_job/_stop?wait_for_completion=true&timeout=10sDELETE apache_rollupDELETE _rollup/job/apache_rollup_job
正文完
作者:劉曉國
https://blog.csdn.net/UbuntuTouch
本文編輯:筷子
活動預告
随着 k8s 的快速發展,在很多業務場景下得到了廣泛的應用。那麼在資料層,Elasticsearch 依賴其得天獨厚的一些設計,使得在 k8s 上運維管理變得可行。今天我們為大家準備了一個 Github 開源項目,帶着大家一步步把整個流程跑一遍。其間會對一些應用場景和最佳實踐方案進行解釋,目的是為了讓大家能夠簡便的使用這些腳本和配置檔案對自己的叢集進行管理。
參與方式:活動直播将采用 Zoom 會議方式進行
時間:2020年5月23日 15:00 - 16:00(中原標準時間)
會議 ID:590-508-385
會議密碼:elastic
請于活動開始前 5 分鐘撥入并等待活動開始。
嗨,互動起來吧!
喜歡這篇文章麼?
歡迎留下你想說的,留言 100% 精選哦!
Elastic 社群公衆号長期征稿,如果您有 Elastic 技術的相關文章,也歡迎投稿至本公衆号,一起進步! 投稿請添加微信:medcl123
招聘資訊
Job board
民生銀行總行科技部社招
職位定位:
為行内使用的開源技術産品提供二、三線支援,以及基于開源技術産品進行二次開發。
職位要求:
1. 全日制計算機相關專業大學大學及以上學曆;年齡35周歲(含)以下,特别優秀應聘者可适當放寬;
2. 至少熟練掌握一門程式設計語言,擁有紮實的開發能力;
3.對Hadoop、Spark、Kafka、Elasticsearch、Redis、Kubernetes等技術的一個或多個有較深的了解。
聯系方式:
微信:tinawenqiao
郵箱:[email protected]
地點:北京順義
關
注
我
們
Elastic中文社群公衆号 (elastic-cn)
為您彙集 Elastic 社群的最新動态、精選幹貨文章、精華讨論、文檔資料、翻譯與版本釋出等。
喜歡本篇内容就請給我們點個[在看]吧