聚合分析運算是資料庫中重要的特性,對于資料分析場景尤為重要。類似于關系型資料庫中的 SUM,AVG, GROUP BY 等,Elasticsearch 也提供了豐富的聚合運算方式,可以滿足大部分分析和查詢場景。
Doc Values 和 Field Data
在學習聚合分析之前,我們先了解一下 Doc Values 和 Field Data 資料結構,我們知道反向索引的優勢在于查找包含某個項的文檔,反過來确定哪些項是否在某個文檔中并不高效,ES 為了滿足排序、聚合以及執行腳本的需求,是以就出現了 Doc Values 和 Field Data 兩種資料結構,一般對應的資料結構如下:
Doc Terms
-----------------------------------------------------------------
Doc_1 | brown, dog, fox, jumped, lazy, over, quick, the
Doc_2 | brown, dogs, foxes, in, lazy, leap, over, quick, summer
Doc_3 | dog, dogs, fox, jumped, over, quick, the
Doc Values
- Doc Values 在索引生成時建立,通過序列持久化資料結構存儲到磁盤,可以以作業系統的檔案緩存來代替 JVM heap
- Doc Values 不支援分詞的字元串,如果想要分詞的字元串進行聚合功能,可以看下面的 Field Data 資料結構
- 設定字段屬性 doc_values=false 可以關閉 Doc Values 功能,意味你不可以對該字段進行聚合、排序等,同時也可以節省磁盤空間
Field Data
- Field Data 建構和管理 100% 在記憶體中,常駐于 JVM 記憶體堆,會消耗大量的記憶體,開啟時需要謹慎考慮
- 早起版本所有類型字段的預設設定都是 Field Data,後面大部分類型字段都遷移到了 Doc Values,隻留下分詞字元串還使用 Field Data
- text 類型預設是 Field Data 功能的,如果需要可以通過設定屬性 fielddata=true 開啟該功能
- Field Data 是延遲加載的,也就是隻有你第一次對一個分詞字元串進行聚合、排序操作時才會加載,是以第一次加載時查詢會較慢
- indices.fielddata.cache.size:可以通過設定該選項來限制 Field Data 占用堆空間大小,預設是沒有上限的,例如可以設定為 50% 或者 12 G,如果超過該限制,就會使用 LRU 算法進行記憶體回收
- fielddata_frequency_filter:為了限制 Field Data 使用大量的記憶體,我們可以設定一些篩選條件隻有滿足該條件時才加載 Field Data
PUT my_index
{
"mappings": {
"properties": {
"tag": {
"type": "text",
"fielddata": true,
"fielddata_frequency_filter": {
"min": 0.001, //隻有那些至少在本段文檔中出現的詞頻在0.1% 和 10% 之間的文檔到記憶體中
"max": 0.1,
"min_segment_size": 500 //忽略任何文檔個數小于 500 的 segment
}
}
}
}
}
基本概念
聚合分析分類
- Metric Aggregation: 名額分析聚合,比如計算某些名額的平均值、最大值,求和
- Bucket Aggregation: 分桶聚合,類似于關系型資料庫中的 Group By 文法,根據一定規則按照次元進行劃分成不同的桶
- Pipeline: 管道分析類型,可以基于已有的聚合結果進行二次聚合運算
- Matrix: 矩陣分析類型
聚合分析格式
下面使用一個例子來說明聚合分析查詢格式:
//查詢 employees 工資的最小值
POST employees/_search
{
"size": 0, //我們一般情況下隻關心聚合分析的結果,所有原資料項的查詢 size 設定為 0
"aggs": { //聚合分析關鍵詞,也可以寫成 aggregations
"min_salary": { //自定義的聚合分析名稱,一般起有意義的名稱,用于在傳回結果中找到分析結果
"min": { // 聚合分析類型,
"field":"salary" //分析的主體,表示根據哪些字段資訊進行聚合
}
}
}
}
Metric Aggregation
Metric Aggregation 主要分為兩類:單值分析(輸出單個結果)和多值分析(輸出多個結果)。
單值分析
- 單值分析主要包括 min、max、avg、sum、cardinality,weight avg,value count
- weight avg 在計算平均數時會使用另外一個字段作為每個文檔的權重,比如 score = 99 學生有 3 個,score = 85 的學生有 5 個,求平均分數,人數就是這裡的 weight
- cardinality 類似于關系資料庫中的 distinct count
- value count 統計某字段所有有值的文檔數
- 可以同時使用多個單值分析關鍵詞傳回多個結果
//同時傳回員工中的最低薪水和最高薪水
POST employees/_search
{
"size": 0,
"aggs": {
"min_salary": {
"min": {
"field":"salary"
}
},
"max_salary": {
"max": {
"field":"salary"
}
}
}
}
多值分析
- stats:一次性傳回 min、max、avg、sum、cardinality,weight avg,value count 的所有單值結果
- extended_stats:對 stats 進行擴充,包含更多,如:方差,标準差,标準差等
- percentile:百分位數統計,比如用于統計 95% 的員工工資都小于某個值或者大于某個值
//查詢 latency 索引中 95%, 99%, 99.9% 的文檔的 load_time 都分别大于哪些值
GET latency/_search
{
"size": 0,
"aggs" : {
"load_time_outlier" : {
"percentiles" : {
"field" : "load_time" //根據 load_time 字段計算百分比
},
"percents" : [95, 99, 99.9] //設定百分比的點,預設是 [ 1, 5, 25, 50, 75, 95, 99 ]
}
}
}
- percentile rank:和 percentile 統計方向相反,比如用于統計工資小于 2 萬的員工落在哪個百分比上
//用于統計 load_time 小于 500,600 的文檔分别落在哪個百分比上
GET latency/_search
{
"size": 0,
"aggs" : {
"load_time_ranks" : {
"percentile_ranks" : {
"field" : "load_time",
"values" : [500, 600]
}
}
}
}
- top hits:一般用于分桶之後,擷取每個桶内最比對的前幾個文檔的清單,即詳情資料,使用時一般需要帶上排序資訊
//用于查詢 sales 索引中按照 type 字段進行聚合分桶,然後傳回每個分桶中按照 date 字段降序後的 top 1 的所有文檔
POST /sales/_search?size=0
{
"aggs": {
"top_tags": {
"terms": { //terms 分桶,後面會有講解
"field": "type",
"size": 3
},
"aggs": {
"top_sales_hits": {
"top_hits": {
"sort": [ //對每個桶中的文檔按照 date 字段降序,預設情況下按照查詢分數進行排序
{
"date": {
"order": "desc"
}
}
],
"_source": { //傳回每個文檔的 date 和 price 字段
"includes": [ "date", "price" ]
},
"size" : 1 //隻傳回 top 1
}
}
}
}
}
}
Bucket Aggregation
Bucket Aggregation 類似于 Group By 的概念,按照一定的規則将文檔配置設定到不同的桶中,主要分為下面幾類:
Terms
- 直接按 Terms 進行分桶,也就是按照每個詞項進行分桶
GET /_search
{
"aggs" : {
"genres" : {
"terms" : { "field" : "genre" },
"size": 5, //預設情況下傳回前 10 個聚合後的結果,根據排序字段定義的順序傳回,不支援分頁,隻支援傳回 top
"order" : { "_count" : "asc" }, //預設排序是 doc_count 降序
"shard_size": 20 //去每個分片擷取的文檔數量,請參考下文的精确度分析介紹
"min_doc_count": 2, //隻有在所有分片合并後的 doc_count 大于 min_doc_count 的分組才會被傳回,
"shared_min_doc_count": 1 // 隻有每個分片上的 doc_count 大于 shared_min_doc_count,該分片才會被傳回,一般小于 min_doc_count
}
}
}
輸出結果 =>
{
...
"aggregations" : {
"genres" : {
"doc_count_error_upper_bound": 0, //被遺漏的 term 分桶包含的文檔的最大可能值,看下文聚合分析精确度分析
"sum_other_doc_count": 0, //除了傳回 bucket 的 terms 以外,其它 terms 的文檔總數
"buckets" : [
{
"key" : "electronic", //每個聚合詞項
"doc_count" : 6 //該詞項下面對應的文檔個數
},
{
"key" : "rock",
"doc_count" : 3
},
{
"key" : "jazz",
"doc_count" : 2
}
]
}
}
}
- text 類型字段預設不支援分桶,隻能通過 ${field}.keyword 去分桶,這個時候會把對整個字元串進行比對
# 對 job 字段按照整個字元串進行聚合
POST employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field":"job.keyword"
}
}
}
}
- Terms 分桶的本質是根據詞項進行分桶,也就是字段必須具有 Field Data 或者 Doc Values 屬性。如果想讓 text 類型字段支援 Terms 分桶,需要設定 fielddata = true,此時會按照 text 類型分詞的結果去分桶
# 對 Text 字段打開 fielddata,支援 terms aggregation
PUT employees/_mapping
{
"properties" : {
"job":{
"type": "text",
"fielddata": true
}
}
}
POST employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field":"job"
}
}
}
}
- order:排序是為了對聚合後的桶進行排序,可以通過 order 參數指定
- 三種排序方式:_key, _count, sub-aggregation
- 在多分片的情況下,排序有可能不準确(參考後面聚合精确度分析)
- 排序預設是按照每個分桶的 doc_count 降序
- 可以按照桶名進行排序:
GET /_search
{
"aggs" : {
"genres" : {
"terms" : {
"field" : "genre",
"order" : { "_key" : "asc" }
}
}
}
}
- 可以按照 sub-aggregation 進行排序,支援多層聚合嵌套排序,通過”>“指明path
GET /_search
{
"aggs" : {
"countries" : {
"terms" : {
"field" : "artist.country",
// ">" 表示路徑指向,"." 有多值聚合結果時,擷取其中一個值
"order" : [ { "rock>playback_stats.avg" : "desc" }, { "_count" : "desc" } ]
},
"aggs" : {
"rock" : {
"filter" : { "term" : { "genre" : "rock" }},
"aggs" : {
"playback_stats" : { "stats" : { "field" : "play_count" }}
}
}
}
}
}
}
- Script: 通過腳函數本進行聚合計算
GET /_search
{
"aggs" : {
"genres" : {
"terms" : {
"script" : {
"source": "doc['genre'].value",
"lang": "painless"
}
}
}
}
}
- Filtering Values:可以通過屬性 include,exclude 對聚合的分桶名稱進行過濾
GET /_search
{
"aggs" : {
"tags" : {
"terms" : {
"field" : "tags",
"include" : ".*sport.*", //也可以精确數組比對 ["rover", "jensen"]
"exclude" : "water_.*"
}
}
}
}
- Missing value:在分組名稱是 NULL 的情況下,我們可以使用 missing 字段進行聚合處理
//預設情況下如果某個文檔對應的 tags 為null,是不會被分組的,
//加上 missing 字段後,所有 tags 為 null 文檔的被分成一個組,組名為 "N/A"
GET /_search
{
"aggs" : {
"tags" : {
"terms" : {
"field" : "tags",
"missing": "N/A"
}
}
}
}
- Filtering Values with partitions: 某些情況下如果在一個請求中傳回太多的分組可能會影響性能,我們可以使用 Filtering Values with partitions 拆分成多個 partitions,然後一個一個傳回,具體邏輯可以看官方文檔
- Collect mode: Elasticsearch 提供了兩種計算結果集的周遊方式,breadth_first 和 depth_first,通過參數 collect_mode 指定
- breadth_first 模式是優先進行廣度周遊計算,計算完上層的聚合結果後,再進行每個桶的聚合結果計算
- depth_first 模式是優先進行深度周遊計算,每個分支進行一次深度周遊計算,然後再進行剪切
- 如果某個字段的 cardinality 大小比請求的 size 大或者這個字段的 cardinality 是未知的,那麼預設是 breadth_first,其它預設是 depth_first
- 可以通過參數 collect_mode = breadth_first 設定可以将子聚合計算延遲到上層父級被剪切之後再計算
- 如果 order 字段中使用到了 sub aggregation,那麼被使用到的 sub aggregation 會優先被計算不管是在那種模式下
- 聚合樹的所有分支都在一次深度周遊的過程中進行計算,然後再進行剪切,某些情況下會浪費記憶體和 CPU
GET /_search
{
"aggs" : {
"actors" : {
"terms" : {
"field" : "actors",
"size" : 10,
"collect_mode" : "breadth_first"
},
"aggs" : {
"costars" : {
"terms" : {
"field" : "actors",
"size" : 5
}
}
}
}
}
}
- Execution hint:提供了兩種聚合計算的方式,map 和 global_ordinals
- global_ordinals 模式,對于海量的資料聚合計算,ES 使用一種 global ordinals 的資料結構來進行 bucket 配置設定,通過有序的數值來映射每一個 term 字元串實作記憶體消耗的優化
- map 模式:直接将查詢結果拿到記憶體裡通過 map 來計算,在查詢資料集很小的情況下使用 map,會加快計算的速度
- 預設情況下隻有使用腳本計算聚合的時候才使用 map 模式來計算
- 即使你設定了 map,ES 也不一定能保證一定使用 map 去做計算,一般情況下不需要關心 Execution hint 設定,ES 會根據場景選擇最佳的計算方式
GET /_search
{
"aggs" : {
"tags" : {
"terms" : {
"field" : "tags",
"execution_hint": "map"
}
}
}
}
Range /Date Range
通過指定數字類型進行分桶:
# Salary Ranges 分桶,可以自己定義 key
POST employees/_search
{
"size": 0,
"aggs": {
"salary_range": {
"range": {
"field":"salary",
"ranges":[
{ "to":10000},
{"from":10000, "to":20000},
{
"key":">20000", # 不指定 key,會自動生成
"from":20000
}
]
}
}
}
}
Date Range
通過指定日期類型的範圍進行分桶
POST /sales/_search?size=0
{
"aggs": {
"range": {
"date_range": {
"field": "date",
"format": "MM-yyyy",
"ranges": [
{ "to": "now-10M/M" },
{ "from": "now-10M/M" }
]
}
}
}
}
Histogram
直方圖,按固定數值間隔政策進行資料分桶
# Salary Histogram 工資0到10萬,以 5000一個區間進行分桶
POST employees/_search
{
"size": 0,
"aggs": {
"salary_histrogram": {
"histogram": {
"field":"salary",
"interval":5000,
"extended_bounds":{
"min":0,
"max":100000
}
}
}
}
}
Date Histogram
Date Histogram: 日期直方圖,按固定時間間隔進行資料分割
# Salary Histogram 工資0到10萬,以 5000一個區間進行分桶
POST /sales/_search?size=0
{
"aggs" : {
"sales_over_time" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
}
}
}
}
嵌套聚合分析
- 聚合查詢支援嵌套,可以在每個桶裡再次進行聚合
- 子聚合可以是 Bucket 也可以是 Metric
# 先按照工種進行聚合,然後再求出每個工種中年紀最大的3個員工的具體資訊
POST employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field":"job.keyword"
},
"aggs":{
"old_employee":{
"top_hits":{
"size":3,
"sort":[
{
"age":{
"order":"desc"
}
}
]
}
}
}
}
}
}
Pipeline Aggregation
- Pipeline Aggregation 是對聚合分析的結果進行再次分析聚合,主要分為 Parent 和 Sibling 兩類
- Pipeline Aggregation 是通過使用 buckets_path 參數引用所需度量的路徑來進行計算
- Parent Pipeline Aggregation 是将聚合結果内嵌到現有的分析結果中,主要包括:Derivate、Moving Average、Cumulative Sum
POST /_search
{
"aggs": {
"my_date_histo":{
"date_histogram":{
"field":"timestamp",
"calendar_interval":"day"
},
"aggs":{
"the_sum":{
"sum":{ "field": "lemmings" }
},
"the_movavg":{
//the_sum 的移動平均值計算結果内嵌到每一個 my_date_histo 的桶中
"moving_avg":{ "buckets_path": "the_sum" }
}
}
}
}
}
- Sibling Pipeline Aggregation 是聚合結果與現有的聚合分析結果同級,主要包括 Max/Min/Sum/Avg Bucket、Stats/Extended Stats Bucket、Percentiles Bucket
POST /_search
{
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
}
}
},
"max_monthly_sales": {
//找出 sales_per_month 分桶中找到 sales 最大的分桶
"max_bucket": {
"buckets_path": "sales_per_month>sales"
}
}
}
}
- Pipeline Aggregation 下面不能再使用 sub-aggregations,但是可以在 buckets_path 中引用另外一個 Pipeline Aggregation,進而形成鍊式計算
- Pipeline Aggregation 不會改變原先聚合的結果,隻是在已有的聚合輸出中新增新的聚合結果,是以最後輸出結果會包括鍊上所有的 Pipeline Aggregation 結果
聚合的作用範圍
ES 聚合分析的預設作用範圍是 query 的查詢結果集,同時 ES 還可以支援以下方式改變聚合的作用範圍
- filter Aggregation:不改變整體 query 語句的情況下,隻修改部分需要聚合的查詢範圍
POST employees/_search
{
"size": 0,
"aggs": {
"older_person": {
//隻修改 older_person 的聚合範圍,而不會影響到 all_jobs 的聚合範圍
"filter":{
"range":{
"age":{ "from":35}
}
},
"aggs":{
"jobs":{
"terms": {"field":"job.keyword"}
}
}
},
"all_jobs": {
"terms": {"field":"job.keyword"}
}
}
}
- Post Filter:用于文檔過濾,在聚合分析計算好之後進行過濾結果
POST employees/_search
{
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword"
}
}
},
"post_filter": {
"match": {
"job.keyword": "Dev Manager"
}
}
}
- Global Filter:忽略 query 條件,基于所有文檔進行分析
#global
POST employees/_search
{
"size": 0,
"query": {
"range": {
"age": {
"gte": 40
}
}
},
"aggs": {
"jobs": {
"terms": {
"field":"job.keyword"
}
},
"all":{
"global":{}, //會忽略上面query的限制,全局資料的聚合
"aggs":{
"salary_avg":{
"avg":{
"field":"salary"
}
}
}
}
}
}
聚合分析計算結果的精确度問題
讨論聚合分析計算的精确度問題前,我們先了解下 ES 是如何進行聚合分析計算的,我們前面的文章 Elasticsearch 分布式原理以及相關讀寫邏輯 中,我們知道 ES 是分布式存儲的,每個索引中的文檔會存儲在不同的分片上,是以在進行聚合計算時,因為資料量和記憶體的限制,ES 不會把所有文檔資料都拿到記憶體裡然後進行聚合,而是 會去每個分片上擷取聚合計算的結果,然後再在 coordinate Node 上進行彙總聚合,這樣必然會引起結果不準确性,比如每個分片上”求和銷售額“ 的前10個最大值都可能不一樣,最好導緻彙總時結果的不精确性。那麼我們看下關于結果的不精确性,ES 都提供哪些配置和說明:
doc_count_error_upper_bound
該值是傳回聚合 bucket 中被遺漏的 term 可能的最大值,因為計算的不精确性,有些 term 不是我們想要的。
sum_other_doc_count
除了傳回結果的 bucket 的 term 以外,其它沒有被傳回的 term 的文檔總數
show_term_doc_count_error
- 設定可以每次從每個分片(shard)上擷取 bucket 數量
- 我們可以利用 shard_size 從每個分片上多擷取一些資料進而提高計算的精确度
- shard_size 的預設值是 size * 1.5 + 10
- shard_size 不能小于 size,如果設定小于 size, ES 會自動重置成 size 大小
GET my_flights/_search
{
"size": 0,
"aggs": {
"weather": {
"terms": {
"field":"OriginWeather",
"size":1,
"shard_size":1, 預設值是 size * 1.5 + 10
"show_term_doc_count_error":true
}
}
}
}
參考文獻
- ES系列八、正排索Doc Values和Field Data
- 【ElasticStack】ElasticSearch聚合分析與資料模組化
- elasticsearch系列六:聚合分析(聚合分析簡介、名額聚合、桶聚合)
- Elasticsearch聚合優化 | 聚合速度提升5倍