天天看点

Elasticsearch 聚合分析的细节

聚合分析运算是数据库中重要的特性,对于数据分析场景尤为重要。类似于关系型数据库中的 SUM,AVG, GROUP BY 等,Elasticsearch 也提供了丰富的聚合运算方式,可以满足大部分分析和查询场景。

Doc Values 和 Field Data

在学习聚合分析之前,我们先了解一下 Doc Values 和 Field Data 数据结构,我们知道倒排索引的优势在于查找包含某个项的文档,反过来确定哪些项是否在某个文档中并不高效,ES 为了满足排序、聚合以及执行脚本的需求,因此就出现了 Doc Values 和 Field Data 两种数据结构,一般对应的数据结构如下:

Doc      Terms
-----------------------------------------------------------------
Doc_1 | brown, dog, fox, jumped, lazy, over, quick, the
Doc_2 | brown, dogs, foxes, in, lazy, leap, over, quick, summer
Doc_3 | dog, dogs, fox, jumped, over, quick, the      

Doc Values

  • Doc Values 在索引生成时创建,通过序列持久化数据结构存储到磁盘,可以以操作系统的文件缓存来代替 JVM heap
  • Doc Values 不支持分词的字符串,如果想要分词的字符串进行聚合功能,可以看下面的 Field Data 数据结构
  • 设置字段属性 doc_values=false 可以关闭 Doc Values 功能,意味你不可以对该字段进行聚合、排序等,同时也可以节省磁盘空间

Field Data

  • Field Data 构建和管理 100% 在内存中,常驻于 JVM 内存堆,会消耗大量的内存,开启时需要谨慎考虑
  • 早起版本所有类型字段的默认设置都是 Field Data,后面大部分类型字段都迁移到了 Doc Values,只留下分词字符串还使用 Field Data
  • text 类型默认是 Field Data 功能的,如果需要可以通过设置属性 fielddata=true 开启该功能
  • Field Data 是延迟加载的,也就是只有你第一次对一个分词字符串进行聚合、排序操作时才会加载,所以第一次加载时查询会较慢
  • indices.fielddata.cache.size:可以通过设置该选项来限制 Field Data 占用堆空间大小,默认是没有上限的,例如可以设置为 50% 或者 12 G,如果超过该限制,就会使用 LRU 算法进行内存回收
  • fielddata_frequency_filter:为了限制 Field Data 使用大量的内存,我们可以设置一些筛选条件只有满足该条件时才加载 Field Data
PUT my_index
{
  "mappings": {
    "properties": {
      "tag": {
        "type": "text",
        "fielddata": true,
        "fielddata_frequency_filter": {
          "min": 0.001,   //只有那些至少在本段文档中出现的词频在0.1% 和 10% 之间的文档到内存中
          "max": 0.1,
          "min_segment_size": 500  //忽略任何文档个数小于 500 的 segment
        }
      }
    }
  }
}      

基本概念

聚合分析分类

  • Metric Aggregation: 指标分析聚合,比如计算某些指标的平均值、最大值,求和
  • Bucket Aggregation: 分桶聚合,类似于关系型数据库中的 Group By 语法,根据一定规则按照维度进行划分成不同的桶
  • Pipeline: 管道分析类型,可以基于已有的聚合结果进行二次聚合运算
  • Matrix: 矩阵分析类型

聚合分析格式

下面使用一个例子来说明聚合分析查询格式:

//查询 employees 工资的最小值
POST employees/_search
{
  "size": 0,  //我们一般情况下只关心聚合分析的结果,所有原数据项的查询 size 设置为 0
  "aggs": {   //聚合分析关键词,也可以写成 aggregations
    "min_salary": { //自定义的聚合分析名称,一般起有意义的名称,用于在返回结果中找到分析结果 
      "min": {      // 聚合分析类型,
        "field":"salary"   //分析的主体,表示根据哪些字段信息进行聚合
      }
    }
  }
}      

Metric Aggregation

Metric Aggregation 主要分为两类:单值分析(输出单个结果)和多值分析(输出多个结果)。

单值分析

  • 单值分析主要包括 min、max、avg、sum、cardinality,weight avg,value count
  • weight avg 在计算平均数时会使用另外一个字段作为每个文档的权重,比如 score = 99 学生有 3 个,score = 85 的学生有 5 个,求平均分数,人数就是这里的 weight
  • cardinality 类似于关系数据库中的 distinct count
  • value count 统计某字段所有有值的文档数
  • 可以同时使用多个单值分析关键词返回多个结果
//同时返回员工中的最低薪水和最高薪水
POST employees/_search
{
  "size": 0,  
  "aggs": {  
    "min_salary": { 
      "min": {     
        "field":"salary"  
      }
    },
    "max_salary": {
      "max": {    
        "field":"salary"  
      }
    }
  }
}      

多值分析

  • stats:一次性返回 min、max、avg、sum、cardinality,weight avg,value count 的所有单值结果
  • extended_stats:对 stats 进行扩展,包含更多,如:方差,标准差,标准差等
  • percentile:百分位数统计,比如用于统计 95% 的员工工资都小于某个值或者大于某个值
//查询 latency 索引中 95%, 99%, 99.9% 的文档的 load_time 都分别大于哪些值
GET latency/_search
{
    "size": 0,
    "aggs" : {
        "load_time_outlier" : {
            "percentiles" : {
                "field" : "load_time"  //根据 load_time 字段计算百分比
            },
            "percents" : [95, 99, 99.9] //设置百分比的点,默认是 [ 1, 5, 25, 50, 75, 95, 99 ]
        }
    }
}      
  • percentile rank:和 percentile 统计方向相反,比如用于统计工资小于 2 万的员工落在哪个百分比上
//用于统计 load_time 小于 500,600 的文档分别落在哪个百分比上
GET latency/_search
{
    "size": 0,
    "aggs" : {
        "load_time_ranks" : {
            "percentile_ranks" : {
                "field" : "load_time", 
                "values" : [500, 600]
            }
        }
    }
}      
  • top hits:一般用于分桶之后,获取每个桶内最匹配的前几个文档的列表,即详情数据,使用时一般需要带上排序信息
//用于查询 sales 索引中按照 type 字段进行聚合分桶,然后返回每个分桶中按照 date 字段降序后的 top 1 的所有文档
POST /sales/_search?size=0
{
    "aggs": {
        "top_tags": {
            "terms": {  //terms 分桶,后面会有讲解
                "field": "type",
                "size": 3
            },
            "aggs": {
                "top_sales_hits": {
                    "top_hits": {
                        "sort": [ //对每个桶中的文档按照 date 字段降序,默认情况下按照查询分数进行排序
                            {
                                "date": {
                                    "order": "desc"
                                }
                            }
                        ],
                        "_source": { //返回每个文档的 date 和 price 字段
                            "includes": [ "date", "price" ]
                        },
                        "size" : 1 //只返回 top 1
                    }
                }
            }
        }
    }
}      

Bucket Aggregation

Bucket Aggregation 类似于 Group By 的概念,按照一定的规则将文档分配到不同的桶中,主要分为下面几类:

Terms

  • 直接按 Terms 进行分桶,也就是按照每个词项进行分桶
GET /_search
{
    "aggs" : {
        "genres" : {
            "terms" : { "field" : "genre" },
            "size": 5, //默认情况下返回前 10 个聚合后的结果,根据排序字段定义的顺序返回,不支持分页,只支持返回 top
            "order" : { "_count" : "asc" }, //默认排序是 doc_count 降序
            "shard_size": 20            //去每个分片获取的文档数量,请参考下文的精确度分析介绍
            "min_doc_count": 2,         //只有在所有分片合并后的 doc_count 大于 min_doc_count 的分组才会被返回,
            "shared_min_doc_count": 1  // 只有每个分片上的 doc_count 大于 shared_min_doc_count,该分片才会被返回,一般小于 min_doc_count
        }
    }
}

输出结果 =>
{
    ...
    "aggregations" : {
        "genres" : {
            "doc_count_error_upper_bound": 0,   //被遗漏的 term 分桶包含的文档的最大可能值,看下文聚合分析精确度分析
            "sum_other_doc_count": 0, //除了返回 bucket 的 terms 以外,其它 terms 的文档总数
            "buckets" : [ 
                {
                    "key" : "electronic", //每个聚合词项
                    "doc_count" : 6       //该词项下面对应的文档个数
                },
                {
                    "key" : "rock",
                    "doc_count" : 3
                },
                {
                    "key" : "jazz",
                    "doc_count" : 2
                }
            ]
        }
    }
}      
  • text 类型字段默认不支持分桶,只能通过 ${field}.keyword 去分桶,这个时候会把对整个字符串进行匹配
# 对 job 字段按照整个字符串进行聚合
POST employees/_search
{
  "size": 0,
  "aggs": {
    "jobs": {
      "terms": {
        "field":"job.keyword"
      }
    }
  }
}      
  • Terms 分桶的本质是根据词项进行分桶,也就是字段必须具有 Field Data 或者 Doc Values 属性。如果想让 text 类型字段支持 Terms 分桶,需要设置 fielddata = true,此时会按照 text 类型分词的结果去分桶
# 对 Text 字段打开 fielddata,支持 terms aggregation
PUT employees/_mapping
{
  "properties" : {
    "job":{
       "type":     "text",
       "fielddata": true
    }
  }
}

POST employees/_search
{
  "size": 0,
  "aggs": {
    "jobs": {
      "terms": {
        "field":"job"
      }
    }
  }
}      
  • order:排序是为了对聚合后的桶进行排序,可以通过 order 参数指定
- 三种排序方式:_key, _count, sub-aggregation
- 在多分片的情况下,排序有可能不准确(参考后面聚合精确度分析)
- 排序默认是按照每个分桶的 doc_count 降序
- 可以按照桶名进行排序:
GET /_search
{
    "aggs" : {
        "genres" : {
            "terms" : {
                "field" : "genre",
                "order" : { "_key" : "asc" }
            }
        }
    }
}
- 可以按照 sub-aggregation 进行排序,支持多层聚合嵌套排序,通过”>“指明path
GET /_search
{
    "aggs" : {
        "countries" : {
            "terms" : {
                "field" : "artist.country",
                // ">" 表示路径指向,"." 有多值聚合结果时,获取其中一个值
                "order" : [ { "rock>playback_stats.avg" : "desc" }, { "_count" : "desc" } ]
            },
            "aggs" : {
                "rock" : {
                    "filter" : { "term" : { "genre" : "rock" }},
                    "aggs" : {
                        "playback_stats" : { "stats" : { "field" : "play_count" }}
                    }
                }
            }
        }
    }
}      
  • Script: 通过脚函数本进行聚合计算
GET /_search
{
    "aggs" : {
        "genres" : {
            "terms" : {
                "script" : {
                    "source": "doc['genre'].value",
                    "lang": "painless"
                }
            }
        }
    }
}      
  • Filtering Values:可以通过属性 include,exclude 对聚合的分桶名称进行过滤
GET /_search
{
    "aggs" : {
        "tags" : {
            "terms" : {
                "field" : "tags",
                "include" : ".*sport.*", //也可以精确数组匹配 ["rover", "jensen"]
                "exclude" : "water_.*"
            }
        }
    }
}      
  • Missing value:在分组名称是 NULL 的情况下,我们可以使用 missing 字段进行聚合处理
//默认情况下如果某个文档对应的 tags 为null,是不会被分组的,
//加上 missing 字段后,所有 tags 为 null 文档的被分成一个组,组名为 "N/A"
GET /_search
{
    "aggs" : {
        "tags" : {
             "terms" : {
                 "field" : "tags",
                 "missing": "N/A" 
             }
         }
    }
}      
  • Filtering Values with partitions: 某些情况下如果在一个请求中返回太多的分组可能会影响性能,我们可以使用 Filtering Values with partitions 拆分成多个 partitions,然后一个一个返回,具体逻辑可以看​​官方文档​​
  • Collect mode: Elasticsearch 提供了两种计算结果集的遍历方式,breadth_first 和 depth_first,通过参数 collect_mode 指定
- breadth_first 模式是优先进行广度遍历计算,计算完上层的聚合结果后,再进行每个桶的聚合结果计算
- depth_first 模式是优先进行深度遍历计算,每个分支进行一次深度遍历计算,然后再进行剪切
- 如果某个字段的 cardinality 大小比请求的 size 大或者这个字段的 cardinality 是未知的,那么默认是 breadth_first,其它默认是 depth_first
- 可以通过参数 collect_mode = breadth_first 设置可以将子聚合计算延迟到上层父级被剪切之后再计算
- 如果 order 字段中使用到了 sub aggregation,那么被使用到的 sub aggregation 会优先被计算不管是在那种模式下
- 聚合树的所有分支都在一次深度遍历的过程中进行计算,然后再进行剪切,某些情况下会浪费内存和 CPU
GET /_search
{
    "aggs" : {
        "actors" : {
             "terms" : {
                 "field" : "actors",
                 "size" : 10,
                 "collect_mode" : "breadth_first" 
             },
            "aggs" : {
                "costars" : {
                     "terms" : {
                         "field" : "actors",
                         "size" : 5
                     }
                 }
            }
         }
    }
}      
  • Execution hint:提供了两种聚合计算的方式,map 和 global_ordinals
- global_ordinals 模式,对于海量的数据聚合计算,ES 使用一种 global ordinals 的数据结构来进行 bucket 分配,通过有序的数值来映射每一个 term 字符串实现内存消耗的优化
- map 模式:直接将查询结果拿到内存里通过 map 来计算,在查询数据集很小的情况下使用 map,会加快计算的速度
- 默认情况下只有使用脚本计算聚合的时候才使用 map 模式来计算
- 即使你设置了 map,ES 也不一定能保证一定使用 map 去做计算,一般情况下不需要关心 Execution hint 设置,ES 会根据场景选择最佳的计算方式
GET /_search
{
    "aggs" : {
        "tags" : {
             "terms" : {
                 "field" : "tags",
                 "execution_hint": "map" 
             }
         }
    }
}      

Range /Date Range

通过指定数字类型进行分桶:

# Salary Ranges 分桶,可以自己定义 key
POST employees/_search
{
  "size": 0,
  "aggs": {
    "salary_range": {
      "range": {
        "field":"salary",
        "ranges":[
          {  "to":10000},
          {"from":10000, "to":20000},
          {
            "key":">20000",  # 不指定 key,会自动生成
            "from":20000
          }
        ]
      }
    }
  }
}      

Date Range

通过指定日期类型的范围进行分桶

POST /sales/_search?size=0
{
    "aggs": {
        "range": {
            "date_range": {
                "field": "date",
                "format": "MM-yyyy",
                "ranges": [
                    { "to": "now-10M/M" }, 
                    { "from": "now-10M/M" } 
                ]
            }
        }
    }
}      

Histogram

直方图,按固定数值间隔策略进行数据分桶

# Salary Histogram 工资0到10万,以 5000一个区间进行分桶
POST employees/_search
{
  "size": 0,
  "aggs": {
    "salary_histrogram": {
      "histogram": {
        "field":"salary",
        "interval":5000,
        "extended_bounds":{
          "min":0,
          "max":100000
        }
      }
    }
  }
}      

Date Histogram

Date Histogram: 日期直方图,按固定时间间隔进行数据分割

# Salary Histogram 工资0到10万,以 5000一个区间进行分桶
POST /sales/_search?size=0
{
    "aggs" : {
        "sales_over_time" : {
            "date_histogram" : {
                "field" : "date",
                "calendar_interval" : "month"
            }
        }
    }
}      

嵌套聚合分析

  • 聚合查询支持嵌套,可以在每个桶里再次进行聚合
  • 子聚合可以是 Bucket 也可以是 Metric
# 先按照工种进行聚合,然后再求出每个工种中年纪最大的3个员工的具体信息
POST employees/_search
{
  "size": 0,
  "aggs": {
    "jobs": {
      "terms": {
        "field":"job.keyword"
      },
      "aggs":{
        "old_employee":{
          "top_hits":{
            "size":3,
            "sort":[
              {
                "age":{
                  "order":"desc"
                }
              }
            ]
          }
        }
      }
    }
  }
}      

Pipeline Aggregation

  • Pipeline Aggregation 是对聚合分析的结果进行再次分析聚合,主要分为 Parent 和 Sibling 两类
  • Pipeline Aggregation 是通过使用 buckets_path 参数引用所需度量的路径来进行计算
  • Parent Pipeline Aggregation 是将聚合结果内嵌到现有的分析结果中,主要包括:Derivate、Moving Average、Cumulative Sum
POST /_search
{
    "aggs": {
        "my_date_histo":{
            "date_histogram":{
                "field":"timestamp",
                "calendar_interval":"day"
            },
            "aggs":{
                "the_sum":{
                    "sum":{ "field": "lemmings" } 
                },
                "the_movavg":{
                    //the_sum 的移动平均值计算结果内嵌到每一个 my_date_histo 的桶中
                    "moving_avg":{ "buckets_path": "the_sum" } 
                }
            }
        }
    }
}      
  • Sibling Pipeline Aggregation 是聚合结果与现有的聚合分析结果同级,主要包括 Max/Min/Sum/Avg Bucket、Stats/Extended Stats Bucket、Percentiles Bucket
POST /_search
{
    "aggs" : {
        "sales_per_month" : {
            "date_histogram" : {
                "field" : "date",
                "calendar_interval" : "month"
            },
            "aggs": {
                "sales": {
                    "sum": {
                        "field": "price"
                    }
                }
            }
        },
        "max_monthly_sales": {
            //找出 sales_per_month 分桶中找到 sales 最大的分桶
            "max_bucket": {
                "buckets_path": "sales_per_month>sales" 
            }
        }
    }
}      
  • Pipeline Aggregation 下面不能再使用 sub-aggregations,但是可以在 buckets_path 中引用另外一个 Pipeline Aggregation,从而形成链式计算
  • Pipeline Aggregation 不会改变原先聚合的结果,只是在已有的聚合输出中新增新的聚合结果,所以最后输出结果会包括链上所有的 Pipeline Aggregation 结果

聚合的作用范围

ES 聚合分析的默认作用范围是 query 的查询结果集,同时 ES 还可以支持以下方式改变聚合的作用范围

  • filter Aggregation:不改变整体 query 语句的情况下,只修改部分需要聚合的查询范围
POST employees/_search
{
  "size": 0,
  "aggs": {
    "older_person": {
      //只修改 older_person 的聚合范围,而不会影响到 all_jobs 的聚合范围
      "filter":{
        "range":{
          "age":{ "from":35}
        }
      },
      "aggs":{
         "jobs":{
           "terms": {"field":"job.keyword"}
         }
      }
    },
    "all_jobs": {
      "terms": {"field":"job.keyword"}
    }
  }
}      
  • Post Filter:用于文档过滤,在聚合分析计算好之后进行过滤结果
POST employees/_search
{
  "aggs": {
    "jobs": {
      "terms": {
        "field": "job.keyword"
      }
    }
  },
  "post_filter": {
    "match": {
      "job.keyword": "Dev Manager"
    }
  }
}      
  • Global Filter:忽略 query 条件,基于所有文档进行分析
#global
POST employees/_search
{
  "size": 0,
  "query": {
    "range": {
      "age": {
        "gte": 40
      }
    }
  },
  "aggs": {
    "jobs": {
      "terms": {
        "field":"job.keyword"

      }
    },
    "all":{ 
      "global":{}, //会忽略上面query的限制,全局数据的聚合
      "aggs":{
        "salary_avg":{
          "avg":{
            "field":"salary"
          }
        }
      }
    }
  }
}      

聚合分析计算结果的精确度问题

讨论聚合分析计算的精确度问题前,我们先了解下 ES 是如何进行聚合分析计算的,我们前面的文章 ​​Elasticsearch 分布式原理以及相关读写逻辑​​ 中,我们知道 ES 是分布式存储的,每个索引中的文档会存储在不同的分片上,所以在进行聚合计算时,因为数据量和内存的限制,ES 不会把所有文档数据都拿到内存里然后进行聚合,而是 会去每个分片上获取聚合计算的结果,然后再在 coordinate Node 上进行汇总聚合,这样必然会引起结果不准确性,比如每个分片上”求和销售额“ 的前10个最大值都可能不一样,最好导致汇总时结果的不精确性。那么我们看下关于结果的不精确性,ES 都提供哪些配置和说明:

doc_count_error_upper_bound

该值是返回聚合 bucket 中被遗漏的 term 可能的最大值,因为计算的不精确性,有些 term 不是我们想要的。

sum_other_doc_count

除了返回结果的 bucket 的 term 以外,其它没有被返回的 term 的文档总数

show_term_doc_count_error

  • 设置可以每次从每个分片(shard)上获取 bucket 数量
  • 我们可以利用 shard_size 从每个分片上多获取一些数据从而提高计算的精确度
  • shard_size 的默认值是 size * 1.5 + 10
  • shard_size 不能小于 size,如果设置小于 size, ES 会自动重置成 size 大小
GET my_flights/_search
{
  "size": 0,
  "aggs": {
    "weather": {
      "terms": {
        "field":"OriginWeather",
        "size":1,
        "shard_size":1,  默认值是 size * 1.5 + 10
        "show_term_doc_count_error":true
      }
    }
  }
}      

参考文献

  • ES系列八、正排索Doc Values和Field Data
  • ​​【ElasticStack】ElasticSearch聚合分析与数据建模​​
  • elasticsearch系列六:聚合分析(聚合分析简介、指标聚合、桶聚合)
  • ​​Elasticsearch聚合优化 | 聚合速度提升5倍​​

继续阅读