天天看點

Elasticsearch 聚合分析的細節

聚合分析運算是資料庫中重要的特性,對于資料分析場景尤為重要。類似于關系型資料庫中的 SUM,AVG, GROUP BY 等,Elasticsearch 也提供了豐富的聚合運算方式,可以滿足大部分分析和查詢場景。

Doc Values 和 Field Data

在學習聚合分析之前,我們先了解一下 Doc Values 和 Field Data 資料結構,我們知道反向索引的優勢在于查找包含某個項的文檔,反過來确定哪些項是否在某個文檔中并不高效,ES 為了滿足排序、聚合以及執行腳本的需求,是以就出現了 Doc Values 和 Field Data 兩種資料結構,一般對應的資料結構如下:

Doc      Terms
-----------------------------------------------------------------
Doc_1 | brown, dog, fox, jumped, lazy, over, quick, the
Doc_2 | brown, dogs, foxes, in, lazy, leap, over, quick, summer
Doc_3 | dog, dogs, fox, jumped, over, quick, the      

Doc Values

  • Doc Values 在索引生成時建立,通過序列持久化資料結構存儲到磁盤,可以以作業系統的檔案緩存來代替 JVM heap
  • Doc Values 不支援分詞的字元串,如果想要分詞的字元串進行聚合功能,可以看下面的 Field Data 資料結構
  • 設定字段屬性 doc_values=false 可以關閉 Doc Values 功能,意味你不可以對該字段進行聚合、排序等,同時也可以節省磁盤空間

Field Data

  • Field Data 建構和管理 100% 在記憶體中,常駐于 JVM 記憶體堆,會消耗大量的記憶體,開啟時需要謹慎考慮
  • 早起版本所有類型字段的預設設定都是 Field Data,後面大部分類型字段都遷移到了 Doc Values,隻留下分詞字元串還使用 Field Data
  • text 類型預設是 Field Data 功能的,如果需要可以通過設定屬性 fielddata=true 開啟該功能
  • Field Data 是延遲加載的,也就是隻有你第一次對一個分詞字元串進行聚合、排序操作時才會加載,是以第一次加載時查詢會較慢
  • indices.fielddata.cache.size:可以通過設定該選項來限制 Field Data 占用堆空間大小,預設是沒有上限的,例如可以設定為 50% 或者 12 G,如果超過該限制,就會使用 LRU 算法進行記憶體回收
  • fielddata_frequency_filter:為了限制 Field Data 使用大量的記憶體,我們可以設定一些篩選條件隻有滿足該條件時才加載 Field Data
PUT my_index
{
  "mappings": {
    "properties": {
      "tag": {
        "type": "text",
        "fielddata": true,
        "fielddata_frequency_filter": {
          "min": 0.001,   //隻有那些至少在本段文檔中出現的詞頻在0.1% 和 10% 之間的文檔到記憶體中
          "max": 0.1,
          "min_segment_size": 500  //忽略任何文檔個數小于 500 的 segment
        }
      }
    }
  }
}      

基本概念

聚合分析分類

  • Metric Aggregation: 名額分析聚合,比如計算某些名額的平均值、最大值,求和
  • Bucket Aggregation: 分桶聚合,類似于關系型資料庫中的 Group By 文法,根據一定規則按照次元進行劃分成不同的桶
  • Pipeline: 管道分析類型,可以基于已有的聚合結果進行二次聚合運算
  • Matrix: 矩陣分析類型

聚合分析格式

下面使用一個例子來說明聚合分析查詢格式:

//查詢 employees 工資的最小值
POST employees/_search
{
  "size": 0,  //我們一般情況下隻關心聚合分析的結果,所有原資料項的查詢 size 設定為 0
  "aggs": {   //聚合分析關鍵詞,也可以寫成 aggregations
    "min_salary": { //自定義的聚合分析名稱,一般起有意義的名稱,用于在傳回結果中找到分析結果 
      "min": {      // 聚合分析類型,
        "field":"salary"   //分析的主體,表示根據哪些字段資訊進行聚合
      }
    }
  }
}      

Metric Aggregation

Metric Aggregation 主要分為兩類:單值分析(輸出單個結果)和多值分析(輸出多個結果)。

單值分析

  • 單值分析主要包括 min、max、avg、sum、cardinality,weight avg,value count
  • weight avg 在計算平均數時會使用另外一個字段作為每個文檔的權重,比如 score = 99 學生有 3 個,score = 85 的學生有 5 個,求平均分數,人數就是這裡的 weight
  • cardinality 類似于關系資料庫中的 distinct count
  • value count 統計某字段所有有值的文檔數
  • 可以同時使用多個單值分析關鍵詞傳回多個結果
//同時傳回員工中的最低薪水和最高薪水
POST employees/_search
{
  "size": 0,  
  "aggs": {  
    "min_salary": { 
      "min": {     
        "field":"salary"  
      }
    },
    "max_salary": {
      "max": {    
        "field":"salary"  
      }
    }
  }
}      

多值分析

  • stats:一次性傳回 min、max、avg、sum、cardinality,weight avg,value count 的所有單值結果
  • extended_stats:對 stats 進行擴充,包含更多,如:方差,标準差,标準差等
  • percentile:百分位數統計,比如用于統計 95% 的員工工資都小于某個值或者大于某個值
//查詢 latency 索引中 95%, 99%, 99.9% 的文檔的 load_time 都分别大于哪些值
GET latency/_search
{
    "size": 0,
    "aggs" : {
        "load_time_outlier" : {
            "percentiles" : {
                "field" : "load_time"  //根據 load_time 字段計算百分比
            },
            "percents" : [95, 99, 99.9] //設定百分比的點,預設是 [ 1, 5, 25, 50, 75, 95, 99 ]
        }
    }
}      
  • percentile rank:和 percentile 統計方向相反,比如用于統計工資小于 2 萬的員工落在哪個百分比上
//用于統計 load_time 小于 500,600 的文檔分别落在哪個百分比上
GET latency/_search
{
    "size": 0,
    "aggs" : {
        "load_time_ranks" : {
            "percentile_ranks" : {
                "field" : "load_time", 
                "values" : [500, 600]
            }
        }
    }
}      
  • top hits:一般用于分桶之後,擷取每個桶内最比對的前幾個文檔的清單,即詳情資料,使用時一般需要帶上排序資訊
//用于查詢 sales 索引中按照 type 字段進行聚合分桶,然後傳回每個分桶中按照 date 字段降序後的 top 1 的所有文檔
POST /sales/_search?size=0
{
    "aggs": {
        "top_tags": {
            "terms": {  //terms 分桶,後面會有講解
                "field": "type",
                "size": 3
            },
            "aggs": {
                "top_sales_hits": {
                    "top_hits": {
                        "sort": [ //對每個桶中的文檔按照 date 字段降序,預設情況下按照查詢分數進行排序
                            {
                                "date": {
                                    "order": "desc"
                                }
                            }
                        ],
                        "_source": { //傳回每個文檔的 date 和 price 字段
                            "includes": [ "date", "price" ]
                        },
                        "size" : 1 //隻傳回 top 1
                    }
                }
            }
        }
    }
}      

Bucket Aggregation

Bucket Aggregation 類似于 Group By 的概念,按照一定的規則将文檔配置設定到不同的桶中,主要分為下面幾類:

Terms

  • 直接按 Terms 進行分桶,也就是按照每個詞項進行分桶
GET /_search
{
    "aggs" : {
        "genres" : {
            "terms" : { "field" : "genre" },
            "size": 5, //預設情況下傳回前 10 個聚合後的結果,根據排序字段定義的順序傳回,不支援分頁,隻支援傳回 top
            "order" : { "_count" : "asc" }, //預設排序是 doc_count 降序
            "shard_size": 20            //去每個分片擷取的文檔數量,請參考下文的精确度分析介紹
            "min_doc_count": 2,         //隻有在所有分片合并後的 doc_count 大于 min_doc_count 的分組才會被傳回,
            "shared_min_doc_count": 1  // 隻有每個分片上的 doc_count 大于 shared_min_doc_count,該分片才會被傳回,一般小于 min_doc_count
        }
    }
}

輸出結果 =>
{
    ...
    "aggregations" : {
        "genres" : {
            "doc_count_error_upper_bound": 0,   //被遺漏的 term 分桶包含的文檔的最大可能值,看下文聚合分析精确度分析
            "sum_other_doc_count": 0, //除了傳回 bucket 的 terms 以外,其它 terms 的文檔總數
            "buckets" : [ 
                {
                    "key" : "electronic", //每個聚合詞項
                    "doc_count" : 6       //該詞項下面對應的文檔個數
                },
                {
                    "key" : "rock",
                    "doc_count" : 3
                },
                {
                    "key" : "jazz",
                    "doc_count" : 2
                }
            ]
        }
    }
}      
  • text 類型字段預設不支援分桶,隻能通過 ${field}.keyword 去分桶,這個時候會把對整個字元串進行比對
# 對 job 字段按照整個字元串進行聚合
POST employees/_search
{
  "size": 0,
  "aggs": {
    "jobs": {
      "terms": {
        "field":"job.keyword"
      }
    }
  }
}      
  • Terms 分桶的本質是根據詞項進行分桶,也就是字段必須具有 Field Data 或者 Doc Values 屬性。如果想讓 text 類型字段支援 Terms 分桶,需要設定 fielddata = true,此時會按照 text 類型分詞的結果去分桶
# 對 Text 字段打開 fielddata,支援 terms aggregation
PUT employees/_mapping
{
  "properties" : {
    "job":{
       "type":     "text",
       "fielddata": true
    }
  }
}

POST employees/_search
{
  "size": 0,
  "aggs": {
    "jobs": {
      "terms": {
        "field":"job"
      }
    }
  }
}      
  • order:排序是為了對聚合後的桶進行排序,可以通過 order 參數指定
- 三種排序方式:_key, _count, sub-aggregation
- 在多分片的情況下,排序有可能不準确(參考後面聚合精确度分析)
- 排序預設是按照每個分桶的 doc_count 降序
- 可以按照桶名進行排序:
GET /_search
{
    "aggs" : {
        "genres" : {
            "terms" : {
                "field" : "genre",
                "order" : { "_key" : "asc" }
            }
        }
    }
}
- 可以按照 sub-aggregation 進行排序,支援多層聚合嵌套排序,通過”>“指明path
GET /_search
{
    "aggs" : {
        "countries" : {
            "terms" : {
                "field" : "artist.country",
                // ">" 表示路徑指向,"." 有多值聚合結果時,擷取其中一個值
                "order" : [ { "rock>playback_stats.avg" : "desc" }, { "_count" : "desc" } ]
            },
            "aggs" : {
                "rock" : {
                    "filter" : { "term" : { "genre" : "rock" }},
                    "aggs" : {
                        "playback_stats" : { "stats" : { "field" : "play_count" }}
                    }
                }
            }
        }
    }
}      
  • Script: 通過腳函數本進行聚合計算
GET /_search
{
    "aggs" : {
        "genres" : {
            "terms" : {
                "script" : {
                    "source": "doc['genre'].value",
                    "lang": "painless"
                }
            }
        }
    }
}      
  • Filtering Values:可以通過屬性 include,exclude 對聚合的分桶名稱進行過濾
GET /_search
{
    "aggs" : {
        "tags" : {
            "terms" : {
                "field" : "tags",
                "include" : ".*sport.*", //也可以精确數組比對 ["rover", "jensen"]
                "exclude" : "water_.*"
            }
        }
    }
}      
  • Missing value:在分組名稱是 NULL 的情況下,我們可以使用 missing 字段進行聚合處理
//預設情況下如果某個文檔對應的 tags 為null,是不會被分組的,
//加上 missing 字段後,所有 tags 為 null 文檔的被分成一個組,組名為 "N/A"
GET /_search
{
    "aggs" : {
        "tags" : {
             "terms" : {
                 "field" : "tags",
                 "missing": "N/A" 
             }
         }
    }
}      
  • Filtering Values with partitions: 某些情況下如果在一個請求中傳回太多的分組可能會影響性能,我們可以使用 Filtering Values with partitions 拆分成多個 partitions,然後一個一個傳回,具體邏輯可以看​​官方文檔​​
  • Collect mode: Elasticsearch 提供了兩種計算結果集的周遊方式,breadth_first 和 depth_first,通過參數 collect_mode 指定
- breadth_first 模式是優先進行廣度周遊計算,計算完上層的聚合結果後,再進行每個桶的聚合結果計算
- depth_first 模式是優先進行深度周遊計算,每個分支進行一次深度周遊計算,然後再進行剪切
- 如果某個字段的 cardinality 大小比請求的 size 大或者這個字段的 cardinality 是未知的,那麼預設是 breadth_first,其它預設是 depth_first
- 可以通過參數 collect_mode = breadth_first 設定可以将子聚合計算延遲到上層父級被剪切之後再計算
- 如果 order 字段中使用到了 sub aggregation,那麼被使用到的 sub aggregation 會優先被計算不管是在那種模式下
- 聚合樹的所有分支都在一次深度周遊的過程中進行計算,然後再進行剪切,某些情況下會浪費記憶體和 CPU
GET /_search
{
    "aggs" : {
        "actors" : {
             "terms" : {
                 "field" : "actors",
                 "size" : 10,
                 "collect_mode" : "breadth_first" 
             },
            "aggs" : {
                "costars" : {
                     "terms" : {
                         "field" : "actors",
                         "size" : 5
                     }
                 }
            }
         }
    }
}      
  • Execution hint:提供了兩種聚合計算的方式,map 和 global_ordinals
- global_ordinals 模式,對于海量的資料聚合計算,ES 使用一種 global ordinals 的資料結構來進行 bucket 配置設定,通過有序的數值來映射每一個 term 字元串實作記憶體消耗的優化
- map 模式:直接将查詢結果拿到記憶體裡通過 map 來計算,在查詢資料集很小的情況下使用 map,會加快計算的速度
- 預設情況下隻有使用腳本計算聚合的時候才使用 map 模式來計算
- 即使你設定了 map,ES 也不一定能保證一定使用 map 去做計算,一般情況下不需要關心 Execution hint 設定,ES 會根據場景選擇最佳的計算方式
GET /_search
{
    "aggs" : {
        "tags" : {
             "terms" : {
                 "field" : "tags",
                 "execution_hint": "map" 
             }
         }
    }
}      

Range /Date Range

通過指定數字類型進行分桶:

# Salary Ranges 分桶,可以自己定義 key
POST employees/_search
{
  "size": 0,
  "aggs": {
    "salary_range": {
      "range": {
        "field":"salary",
        "ranges":[
          {  "to":10000},
          {"from":10000, "to":20000},
          {
            "key":">20000",  # 不指定 key,會自動生成
            "from":20000
          }
        ]
      }
    }
  }
}      

Date Range

通過指定日期類型的範圍進行分桶

POST /sales/_search?size=0
{
    "aggs": {
        "range": {
            "date_range": {
                "field": "date",
                "format": "MM-yyyy",
                "ranges": [
                    { "to": "now-10M/M" }, 
                    { "from": "now-10M/M" } 
                ]
            }
        }
    }
}      

Histogram

直方圖,按固定數值間隔政策進行資料分桶

# Salary Histogram 工資0到10萬,以 5000一個區間進行分桶
POST employees/_search
{
  "size": 0,
  "aggs": {
    "salary_histrogram": {
      "histogram": {
        "field":"salary",
        "interval":5000,
        "extended_bounds":{
          "min":0,
          "max":100000
        }
      }
    }
  }
}      

Date Histogram

Date Histogram: 日期直方圖,按固定時間間隔進行資料分割

# Salary Histogram 工資0到10萬,以 5000一個區間進行分桶
POST /sales/_search?size=0
{
    "aggs" : {
        "sales_over_time" : {
            "date_histogram" : {
                "field" : "date",
                "calendar_interval" : "month"
            }
        }
    }
}      

嵌套聚合分析

  • 聚合查詢支援嵌套,可以在每個桶裡再次進行聚合
  • 子聚合可以是 Bucket 也可以是 Metric
# 先按照工種進行聚合,然後再求出每個工種中年紀最大的3個員工的具體資訊
POST employees/_search
{
  "size": 0,
  "aggs": {
    "jobs": {
      "terms": {
        "field":"job.keyword"
      },
      "aggs":{
        "old_employee":{
          "top_hits":{
            "size":3,
            "sort":[
              {
                "age":{
                  "order":"desc"
                }
              }
            ]
          }
        }
      }
    }
  }
}      

Pipeline Aggregation

  • Pipeline Aggregation 是對聚合分析的結果進行再次分析聚合,主要分為 Parent 和 Sibling 兩類
  • Pipeline Aggregation 是通過使用 buckets_path 參數引用所需度量的路徑來進行計算
  • Parent Pipeline Aggregation 是将聚合結果内嵌到現有的分析結果中,主要包括:Derivate、Moving Average、Cumulative Sum
POST /_search
{
    "aggs": {
        "my_date_histo":{
            "date_histogram":{
                "field":"timestamp",
                "calendar_interval":"day"
            },
            "aggs":{
                "the_sum":{
                    "sum":{ "field": "lemmings" } 
                },
                "the_movavg":{
                    //the_sum 的移動平均值計算結果内嵌到每一個 my_date_histo 的桶中
                    "moving_avg":{ "buckets_path": "the_sum" } 
                }
            }
        }
    }
}      
  • Sibling Pipeline Aggregation 是聚合結果與現有的聚合分析結果同級,主要包括 Max/Min/Sum/Avg Bucket、Stats/Extended Stats Bucket、Percentiles Bucket
POST /_search
{
    "aggs" : {
        "sales_per_month" : {
            "date_histogram" : {
                "field" : "date",
                "calendar_interval" : "month"
            },
            "aggs": {
                "sales": {
                    "sum": {
                        "field": "price"
                    }
                }
            }
        },
        "max_monthly_sales": {
            //找出 sales_per_month 分桶中找到 sales 最大的分桶
            "max_bucket": {
                "buckets_path": "sales_per_month>sales" 
            }
        }
    }
}      
  • Pipeline Aggregation 下面不能再使用 sub-aggregations,但是可以在 buckets_path 中引用另外一個 Pipeline Aggregation,進而形成鍊式計算
  • Pipeline Aggregation 不會改變原先聚合的結果,隻是在已有的聚合輸出中新增新的聚合結果,是以最後輸出結果會包括鍊上所有的 Pipeline Aggregation 結果

聚合的作用範圍

ES 聚合分析的預設作用範圍是 query 的查詢結果集,同時 ES 還可以支援以下方式改變聚合的作用範圍

  • filter Aggregation:不改變整體 query 語句的情況下,隻修改部分需要聚合的查詢範圍
POST employees/_search
{
  "size": 0,
  "aggs": {
    "older_person": {
      //隻修改 older_person 的聚合範圍,而不會影響到 all_jobs 的聚合範圍
      "filter":{
        "range":{
          "age":{ "from":35}
        }
      },
      "aggs":{
         "jobs":{
           "terms": {"field":"job.keyword"}
         }
      }
    },
    "all_jobs": {
      "terms": {"field":"job.keyword"}
    }
  }
}      
  • Post Filter:用于文檔過濾,在聚合分析計算好之後進行過濾結果
POST employees/_search
{
  "aggs": {
    "jobs": {
      "terms": {
        "field": "job.keyword"
      }
    }
  },
  "post_filter": {
    "match": {
      "job.keyword": "Dev Manager"
    }
  }
}      
  • Global Filter:忽略 query 條件,基于所有文檔進行分析
#global
POST employees/_search
{
  "size": 0,
  "query": {
    "range": {
      "age": {
        "gte": 40
      }
    }
  },
  "aggs": {
    "jobs": {
      "terms": {
        "field":"job.keyword"

      }
    },
    "all":{ 
      "global":{}, //會忽略上面query的限制,全局資料的聚合
      "aggs":{
        "salary_avg":{
          "avg":{
            "field":"salary"
          }
        }
      }
    }
  }
}      

聚合分析計算結果的精确度問題

讨論聚合分析計算的精确度問題前,我們先了解下 ES 是如何進行聚合分析計算的,我們前面的文章 ​​Elasticsearch 分布式原理以及相關讀寫邏輯​​ 中,我們知道 ES 是分布式存儲的,每個索引中的文檔會存儲在不同的分片上,是以在進行聚合計算時,因為資料量和記憶體的限制,ES 不會把所有文檔資料都拿到記憶體裡然後進行聚合,而是 會去每個分片上擷取聚合計算的結果,然後再在 coordinate Node 上進行彙總聚合,這樣必然會引起結果不準确性,比如每個分片上”求和銷售額“ 的前10個最大值都可能不一樣,最好導緻彙總時結果的不精确性。那麼我們看下關于結果的不精确性,ES 都提供哪些配置和說明:

doc_count_error_upper_bound

該值是傳回聚合 bucket 中被遺漏的 term 可能的最大值,因為計算的不精确性,有些 term 不是我們想要的。

sum_other_doc_count

除了傳回結果的 bucket 的 term 以外,其它沒有被傳回的 term 的文檔總數

show_term_doc_count_error

  • 設定可以每次從每個分片(shard)上擷取 bucket 數量
  • 我們可以利用 shard_size 從每個分片上多擷取一些資料進而提高計算的精确度
  • shard_size 的預設值是 size * 1.5 + 10
  • shard_size 不能小于 size,如果設定小于 size, ES 會自動重置成 size 大小
GET my_flights/_search
{
  "size": 0,
  "aggs": {
    "weather": {
      "terms": {
        "field":"OriginWeather",
        "size":1,
        "shard_size":1,  預設值是 size * 1.5 + 10
        "show_term_doc_count_error":true
      }
    }
  }
}      

參考文獻

  • ES系列八、正排索Doc Values和Field Data
  • ​​【ElasticStack】ElasticSearch聚合分析與資料模組化​​
  • elasticsearch系列六:聚合分析(聚合分析簡介、名額聚合、桶聚合)
  • ​​Elasticsearch聚合優化 | 聚合速度提升5倍​​

繼續閱讀