天天看点

ElasticSearch Aggregations GroupBy 实现源码分析

为了方便调试,我对索引做了如下配置

这样只有一个分片,方便ide的跟踪,也算是个看源码的技巧

数据

假定的查询如下:

其语义类似这个sql 语句: 

select avg(num) from twitter group by newtype

也就是按newtype 字段进行group by,然后对num求平均值。在我们实际的业务系统中,这种统计需求也是最多的。

在查询过程中,es是将整个查询分成几个阶段的,大体如下:

queryphase

rescorephase

suggestphase

aggregationphase

fetchphase

对于全文检索,可能还有dfsphase。

顺带提一点,spark sql + es 的组合,最影响响应时间的地方其实是fetch original source 。

而对于这些phase,并不是一个链路的模式,而是在某个phase调用另外一个phase。这个在源码中也很明显,我们看如下一段代码:

要了解具体是如何实现聚合功能的,则需要了解es 的aggregator相关的概念。大体有五个:

aggregatorfactory (典型的工厂模式)负责创建aggregator实例

aggregator (负责提供collector,并且提供具体聚合逻辑的类)

aggregations (聚合结果)

pipelineaggregator (对聚合结果进一步处理)

aggregator 的嵌套,比如 示例中的avgaggregator 就是根据globalordinalsstringtermsaggregator 的以bucket为维度,对相关数据进行操作.这种嵌套结构也是

bucket 其实就是被groupby 字段的数字表示形式。用数字表示,可以节省对应字段列式存储的空间,并且提高性能。

我们知道,无论检索亦或是聚合查询,本质上都需要转化到lucene里的collector,以上面的案例为例,其实由两个collector 完成最后的计算:

totalhitcountcollecotr

globalordinalsstringtermsaggregator(里面还有个aggregator)

因为我们没有定义过滤条件,所以最后的query 是个matchallquery,之后基于这个基础上,这两个collector 完成对应的计算。通常,这两个collector 会被wrap成一个新的multicollector ,最终传入indexsearcher的collector 就是multicollector。

根据上面的分析,我们知道示例中的聚合计算完全由globalordinalsstringtermsaggregator负责。

基于docvalues实现groupby概览

对于每一个segment,我们都会为每个列单独存储成一个文件,为了压缩,我们可能会将里面具体的值转换成数字,然后再形成一个字典和数字对应关系的文件。我们进行所谓的groupby操作,以最后进行avg为例子,其实就是维护了两个大数组,

counts是newtype(我们例子中被groupby的字段)次数统计,对应的数组下标是newtype(我们已经将newtype转化为数字表示了)。我们遍历文档的时候(matchallquery),可以获取doc,然后根据doc到列存文件获取对应的newtype,然后给counts 对应的newtype +1。 这样我们就知道每个newtype 出现的次数了。

这里我们也可以看到,消耗内存的地方取决于newtype的数量(distinct后),我们称之为基数。基数过高的话,是比较消耗内存的。

sums 也是一样的,下标是newtype的值,而对应的值则是不断累加num(我们例子中需要被avg的字段)。

之后就可以遍历两个数组得到结果了,代码大体如下:

globalordinalsstringtermsaggregator/avgaggregator组合实现

globalordinalsstringtermsaggregator 首先要提供一个collector 给主流程,所以其提供了一个newcollector方法:

接着判定是不是只有一个列文件(docvalues):

final sorteddocvalues singlevalues = docvalues.unwrapsingleton(words);

//如果singlevalues!=null 则是一个,否则有多个列文件

如果是一个的话:

通过doc 拿到ord(newtype),然后交给avg的collector 接着处理,进入avgaggregator 里的collector的collect逻辑:

这个和我上面的概述中描述是一致的。

如果是多个docvalues(此时索引还没有对那些segment做合并),这个时候会走下面的流程:

这里的ords 包括了多个docvalues文件,然后做了全局映射,因为要把文件的下标做映射。为啥要有下标映射呢?因为多个列文件(docvalues)的全集才具有完整的newtype,但是每个列文件都是从0开始递增的。现在要扩张到一个global的空间上。 ords.cardinality()拿到了列文件(docvalues)的数目,然后对每个文件都处理一遍,通过ords.ordat(i) 拿到newtype的全局下标,这个时候就可以继续交给avg完成了。

到这个阶段,我们其实已经算好了每个newtype 出现的次数,以及num的累计值,也就是我们前面提到的两个数组。

最终我们是要把这个数据输出输出的,不论是输出给别的es节点,还是直接输出给调用方。所以有个buildaggregation的过程,可以根据名字进行直观的了解。

考虑到内存问题,es允许你设置一些threshhold,然后通过bucketpriorityqueue(优先队列)来完成实际的数据收集以及排序(默认按文档出现次数排序)。 里面的元素是ordbucket,ordbucket包含了几个值:

接着取出 topn 的对象,放到internalterms.bucket[] 数组中。然后遍历该数组,调用子aggregator的buildaggregation方法,这里的子aggregator是avgaggregator ,每个bucket(newtype)就获取到一个avg aggregations了,该aggregations通过internalaggregations 包裹,internalaggregations 包含了一个reduce 方法,该方法会调用具体internalaggregation的doreduce 方法,比如avgaggregator就有自己的reduce方法。说这个主要给下一小结做铺垫。

最后会被包装成stringterms ,然后就可以序列化成json格式,基本就是你在接口上看到的样子了。

前面我们讨论的,都是基于一个分片,但是最终是要把结果数据进行merge的。 这个功能是由searchphasecontroller 对象来完成,大体如下:

sortedshardlist = searchphasecontroller.sortdocs(usescroll, firstresults);

其中merge 动作是按分类进行merge的,比如:

counter(计数,譬如total_hits)

hits

aggregations

suggest

profile (性能相关的数据)

这里我们只关注aggregations的merge

代码有点长,核心是

里面实际的逻辑也是比较简单直观的。会调用internalterms的reduce方法做merge,但是不同的类型的aggregator产生aggregations 合并逻辑是不一样的,所以会委托给对应实现。比如globalordinalsstringtermsaggregator则会委托给internalterms的doreduce方法,而如avgaggregator会委托给internalavg的doreduce。 这里就不展开。未来会单独出一片文章讲解。

这里我们再额外讲讲valuesource (es 对fielddata/docvalues的抽象)。

前文我们提到,大部分aggregator 都是依赖于fielddata/docvalues 来实现的,而valuesource 则是他们在es里的表示。所以了解他们是很有必要的。valuessource 全类名是:

该类就是es 为了管理 docvalues 而封装的。它是一个抽象类,内部还有很多实现类,bytes,withordinals,fielddata,numeric,longvalues 等等。这些都是对特定类型docvalues类型的 es 表示。

按上面我们的查询示例来看,newtype 字段对应的是

对象。这个对象是es对lucene string 类型的docvalues的一个表示。你会发现在valuesource类里,有不同的fielddata。不同的fielddata 可能继承自不同基类从而表示不同类型的数据。在现在这个fielddata 里面有一个对象:

该对象在newtype(我们示例中的字段)是string类型的时候,对应的是实现类是

该对象的大体作用是,构建出docvalue的es的wraper。

具体代码如下:

以第一种情况为例,上面的代码new 了一个新的

我们看到,通过reader获取到最后的列就是在该类里的getordinalsvalues 方法里实现的。

该方法最后返回的randomaccessords 就是lucene的docvalues实现了。

分析了这么多,所有的逻辑就浓缩在getleafcollector的第一行代码上。globalords  的类型是randomaccessords,并且是直接和lucene对应上了。 

我们知道,在lucene里,大部分文件都是不可更新的。一个段一旦生成后就是不可变的,新的数据或者删除数据都需要生成新的段。docvalues的存储文件也是类似的。所以docvalues.unwrapsingleton其实就是做这个判定的,是不是有多个文件 。无论是否则不是都直接创建了一个匿名的collector。

当个文件的很好理解,包含了索引中newtype字段所有的值,其下标获取也很自然。

根据文档号获取值对应的位置,如果ord >=0 则代表有值,否则代表没有值。

如果有多个文件,则会返回如下的collecor:

上面的代码可以保证多个文件最终合起来保持一个文件的序号。什么意思呢?比如a文件有一个文档,b文件有一个,那么最终获取的globalord 就是0,1 而不会都是0。此时的 ords 实现类 不是singletonsortedsetdocvalues 而是

对象了。

计数的方式两个都大体类似。

这里的bucketord 其实就是前面的ord/globalord。所以整个计算就是填充doccounts

es的 aggregation机制还是挺复杂的。本文试图通过一个简单的group by 的例子来完成对其机制的解释。其中valuesource 那层我目前也没没完全吃透,如有表述不合适的地方,欢迎大家指出。