天天看點

Elasticsearch JAVA api輕松搞定groupBy聚合

本文給出如何使用Elasticsearch的Java API做類似SQL的group by聚合。

為了簡單起見,隻給出一級groupby即group by field1(而不涉及到多級,例如group by field1, field2, ...);如果你需要多級的groupby,在實作上可能需要拆分的更加細緻。

即将給出的方法,适用于如下的場景:

場景1:找出分組中的所有桶,例如,select group_name from index_name group by group_name;

場景2:靈活添加一個或者多個聚合函數,例如,select group_name, max(count), avg(count) group by group_name;

1、用法

GroupBy類是我們的實作。

1)測試用例

public static void main(String[] args) {
        /*
        *   初始化es用戶端
        * */
        ESClient esClient = new ESClient(
                "dqa-cluster",
                "10.93.21.21:9300,10.93.18.34:9300,10.93.18.35:9300,100.90.62.33:9300,100.90.61.14:9300",
                false);

        /*
        *   為了示範, 構造了一個距離查詢, 相當于where子句.
        * */
        GeoDistanceRangeQueryBuilder queryBuilder = QueryBuilders.geoDistanceRangeQuery("location")
                .point(39.971424, 116.398251)
                .from("0m")
                .to(String.format("%fm", 500.0))
                .includeLower(true)
                .includeUpper(true)
                .optimizeBbox("memory")
                .geoDistance(GeoDistance.SLOPPY_ARC);
        
        SearchRequestBuilder search = esClient.getClient().prepareSearch("moon").setTypes("bj")
                .setSearchType(SearchType.DFS_QUERY_AND_FETCH)
                .setQuery(queryBuilder);

        /*
        *  GroupBy類就是我們的實作, 初始化的時候傳入的參數依次是, search, 桶命名, 分桶字段, 排序asc
        *  select date as date_group from index group by date;
        * */
        GroupBy groupBy = new GroupBy(search, "date_group", "date", true);

        /*
        *   添加各種分組函數
        *   這裡我實作了10種, 下面是其中的6種
        * */
        groupBy.addSumAgg("pre_total_fee_sum", "pre_total_fee");
        groupBy.addAvgAgg("pre_total_fee_avg", "pre_total_fee");
        groupBy.addPercentilesAgg("pre_total_fee_percent", "pre_total_fee");
        groupBy.addPercentileRanksAgg("pre_total_fee_percentRank", "pre_total_fee", new double[]{13, 16, 20});
        groupBy.addStatsAgg("pre_total_fee_stats", "pre_total_fee");
        groupBy.addCardinalityAgg("type_card", "type");

        /*
        *   擷取groupBy聚合的結果
        *   結果是兩級Map, 這裡的實作是TreeMap因為要保護桶的排序
        * */
        Map<String, Object> groupbyResponse = groupBy.getGroupbyResponse();
        for (Map.Entry<String, Object> entry : groupbyResponse.entrySet()) {
            String bucketKey = entry.getKey();
            Map<String, String> subAggMap = (Map<String, String>) entry.getValue();
            System.out.println(String.format("%s\t%s\t%s", bucketKey, "pre_total_fee_sum", subAggMap.get("pre_total_fee_sum")));
            System.out.println(String.format("%s\t%s\t%s", bucketKey, "pre_total_fee_avg", subAggMap.get("pre_total_fee_avg")));
            System.out.println(String.format("%s\t%s\t%s", bucketKey, "pre_total_fee_percent", subAggMap.get("pre_total_fee_percent")));
            System.out.println(String.format("%s\t%s\t%s", bucketKey, "pre_total_fee_percentRank", subAggMap.get("pre_total_fee_percentRank")));
            System.out.println(String.format("%s\t%s\t%s", bucketKey, "pre_total_fee_stats", subAggMap.get("pre_total_fee_stats")));
            System.out.println(String.format("%s\t%s\t%s", bucketKey, "type_card", subAggMap.get("type_card")));

        }
    }      

2)初始化

初始化的時候,相當于構造了這樣一個SQL:select date as date_group from index group by date;

傳入search對象,相當于where子句

傳入分桶命名, 相當于 as date_group

傳入分桶字段,相當于date

傳入排序,asc=true

3)初始化完成後,可以添加各種聚合函數,也就是場景2。

GroupBy類裡實作了10種聚合函數

4)讀取結果

結果的傳回是兩級Map,為了保護分桶的排序,實作中使用了TreeMap。

這裡需要注意的是,有些聚合函數的傳回,并不是一個值,而是一組值,如Percentiles、Stats等等,這裡我們把這一組值壓縮成JSONString了。

5)列印輸出

我們以日期進行了分桶,同一個分桶中的聚合結果,sum、avg、cardinality都是單個的值。而percentiles、percentileRanks、stats是壓縮的jsonstring。

Elasticsearch JAVA api輕松搞定groupBy聚合

2、實作

 先上代碼,然後在後面進行講解。

public class GroupBy {

    private SearchRequestBuilder search;

    private String termsName;

    private TermsBuilder termsBuilder;

    private List<Map<String, Object>> subAggList = new ArrayList<Map<String, Object>>();

    public GroupBy(SearchRequestBuilder search, String termsName, String fieldName, boolean asc) {
        this.search = search;
        this.termsName = termsName;
        termsBuilder = AggregationBuilders.terms(termsName).field(fieldName).order(Terms.Order.term(asc)).size(0);
    }

    private void addSubAggList(String aggName, MetricsAggregationBuilder aggBuilder) {
        Map<String, Object> subAgg = new HashMap<String, Object>();
        subAgg.put("aggName", aggName);
        subAgg.put("aggBuilder", aggBuilder);
        subAggList.add(subAgg);
    }

    public void addSumAgg(String aggName, String fieldName) {
        SumBuilder builder = AggregationBuilders.sum(aggName).field(fieldName);
        termsBuilder.subAggregation(builder);
        addSubAggList(aggName, builder);
    }

    public boolean bucketSumAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
        if (aggBuilder instanceof SumBuilder) {
            tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
            return true;
        } else {
            return false;
        }
    }

    public void addCountAgg(String aggName, String fieldName) {
        ValueCountBuilder builder = AggregationBuilders.count(aggName).field(fieldName);
        termsBuilder.subAggregation(builder);
        addSubAggList(aggName, builder);
    }

    public boolean bucketCountAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
        if (aggBuilder instanceof ValueCountBuilder) {
            tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
            return true;
        } else {
            return false;
        }
    }

    public void addAvgAgg(String aggName, String fieldName) {
        AvgBuilder builder = AggregationBuilders.avg(aggName).field(fieldName);
        termsBuilder.subAggregation(builder);
        addSubAggList(aggName, builder);
    }

    public boolean bucketAvgAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
        if (aggBuilder instanceof AvgBuilder) {
            tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
            return true;
        } else {
            return false;
        }
    }

    public void addMinAgg(String aggName, String fieldName) {
        MinBuilder builder = AggregationBuilders.min(aggName).field(fieldName);
        termsBuilder.subAggregation(builder);
        addSubAggList(aggName, builder);
    }

    public boolean bucketMinAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
        if (aggBuilder instanceof MinBuilder) {
            tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
            return true;
        } else {
            return false;
        }
    }

    public void addMaxAgg(String aggName, String fieldName) {
        MaxBuilder builder = AggregationBuilders.max(aggName).field(fieldName);
        termsBuilder.subAggregation(builder);
        addSubAggList(aggName, builder);
    }

    public boolean bucketMaxAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
        if (aggBuilder instanceof MaxBuilder) {
            tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
            return true;
        } else {
            return false;
        }
    }

    public void addStatsAgg(String aggName, String fieldName) {
        StatsBuilder builder = AggregationBuilders.stats(aggName).field(fieldName);
        termsBuilder.subAggregation(builder);
        addSubAggList(aggName, builder);
    }

    public boolean bucketStatsAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
        if (aggBuilder instanceof StatsBuilder) {
            Stats stats = bucket.getAggregations().get(aggName);
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("min", stats.getMin());
            jsonObject.put("max", stats.getMax());
            jsonObject.put("sum", stats.getMax());
            jsonObject.put("count", stats.getCount());
            jsonObject.put("avg", stats.getAvg());
            tmpMap.put(aggName, jsonObject.toJSONString());
            return true;
        } else {
            return false;
        }
    }

    public void addExtendedStatsAgg(String aggName, String fieldName) {
        ExtendedStatsBuilder builder = AggregationBuilders.extendedStats(aggName).field(fieldName);
        termsBuilder.subAggregation(builder);
        addSubAggList(aggName, builder);
    }

    public boolean bucketExtendedStatsAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
        if (aggBuilder instanceof ExtendedStatsBuilder) {
            ExtendedStats extendedStats = bucket.getAggregations().get(aggName);
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("min", extendedStats.getMin());
            jsonObject.put("max", extendedStats.getMax());
            jsonObject.put("sum", extendedStats.getMax());
            jsonObject.put("count", extendedStats.getCount());
            jsonObject.put("avg", extendedStats.getAvg());
            jsonObject.put("stdDeviation", extendedStats.getStdDeviation());
            jsonObject.put("sumOfSquares", extendedStats.getSumOfSquares());
            jsonObject.put("variance", extendedStats.getVariance());
            tmpMap.put(aggName, jsonObject.toJSONString());
            return true;
        } else {
            return false;
        }
    }

    public void addPercentilesAgg(String aggName, String fieldName) {
        PercentilesBuilder builder = AggregationBuilders.percentiles(aggName).field(fieldName);
        termsBuilder.subAggregation(builder);
        addSubAggList(aggName, builder);
    }

    public void addPercentilesAgg(String aggName, String fieldName, double[] percentiles) {
        PercentilesBuilder builder = AggregationBuilders.percentiles(aggName).field(fieldName).percentiles(percentiles);
        termsBuilder.subAggregation(builder);
        addSubAggList(aggName, builder);
    }

    public boolean bucketPercentilesAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
        if (aggBuilder instanceof PercentilesBuilder) {
            Percentiles percentiles = bucket.getAggregations().get(aggName);
            JSONObject jsonObject = new JSONObject();
            for (Percentile percentile : percentiles) {
                jsonObject.put(String.valueOf(percentile.getPercent()), percentile.getValue());
            }
            tmpMap.put(aggName, jsonObject.toJSONString());
            return true;
        } else {
            return false;
        }
    }

    public void addPercentileRanksAgg(String aggName, String fieldName, double[] percentiles) {
        PercentileRanksBuilder builder = AggregationBuilders.percentileRanks(aggName).field(fieldName).percentiles(percentiles);
        termsBuilder.subAggregation(builder);
        addSubAggList(aggName, builder);
    }

    public boolean bucketPercentileRanksAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
        if (aggBuilder instanceof PercentileRanksBuilder) {
            PercentileRanks percentileRanks = bucket.getAggregations().get(aggName);
            JSONObject jsonObject = new JSONObject();
            for (Percentile percentile : percentileRanks) {
                jsonObject.put(String.valueOf(percentile.getPercent()), percentile.getValue());
            }
            tmpMap.put(aggName, jsonObject.toJSONString());
            return true;
        } else {
            return false;
        }
    }

    public void addCardinalityAgg(String aggName, String fieldName) {
        CardinalityBuilder builder = AggregationBuilders.cardinality(aggName).field(fieldName);
        termsBuilder.subAggregation(builder);
        addSubAggList(aggName, builder);
    }

    public boolean bucketCardinalityAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
        if (aggBuilder instanceof CardinalityBuilder) {
            tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
            return true;
        } else {
            return false;
        }
    }

    public List<Terms.Bucket> getTermsBucket() {
        search.addAggregation(termsBuilder);
        Terms termsGroup = search.get().getAggregations().get(termsName);
        return termsGroup.getBuckets();
    }

    public Map<String, Object> getGroupbyResponse() {
        Map<String, Object> aggResponseMap = new TreeMap<String, Object>();
        for (Terms.Bucket bucket : getTermsBucket()) {
            String bucketKeyAsString = bucket.getKeyAsString();
            Map<String, String> tmpMap = new TreeMap<String, String>();
            for (Map<String, Object> subAgg : subAggList) {
                String subAggName = subAgg.get("aggName").toString();
                MetricsAggregationBuilder subAggBuilder = (MetricsAggregationBuilder) subAgg.get("aggBuilder");
                if (bucketAvgAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketMaxAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketMinAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketSumAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketCountAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketCardinalityAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketPercentileRanksAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketPercentilesAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketExtendedStatsAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketStatsAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
            }
            aggResponseMap.put(bucketKeyAsString, tmpMap);
        }
        return aggResponseMap;
    }
}      

1)構造函數

構造函數中,核心邏輯是termsBuilder = AggregationBuilders.terms(termsName).field(fieldName).order(Terms.Order.term(asc)).size(0);

執行個體化了termsBuilder也就是分桶。

後面調用add...函數簇添加聚合函數的時候,都是通過termsBuilder.subAggregation(builder)在分桶的基礎上添加了子聚合。

最後在擷取結果的時候search.addAggregation(termsBuilder);将termsBuilder添加到查詢上,進行聚合查詢。

2)添加聚合函數add...函數簇

以sum函數為例

public void addSumAgg(String aggName, String fieldName) {
        SumBuilder builder = AggregationBuilders.sum(aggName).field(fieldName);
        termsBuilder.subAggregation(builder);
        addSubAggList(aggName, builder);
    }      

a)初始化了一個SumBuilder聚合操作,然後作為termsBuilder的子聚合。

b)addSubAggList方法在subAggList屬性(subAggList屬性是一個List<Map<String, Object>>)上儲存了所有添加了的子聚合的名字和builder。這樣做是為了在解析結果的時候,知道是哪種type的聚合(instanceof),以便使用不同的邏輯去解析。

private void addSubAggList(String aggName, MetricsAggregationBuilder aggBuilder) {
        Map<String, Object> subAgg = new HashMap<String, Object>();
        subAgg.put("aggName", aggName);
        subAgg.put("aggBuilder", aggBuilder);
        subAggList.add(subAgg);
    }      

3)按類型擷取結果

還是以sum函數為例

public boolean bucketSumAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
        if (aggBuilder instanceof SumBuilder) {
            tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
            return true;
        } else {
            return false;
        }
    }      

a)這裡先判斷了aggBuilder是哪種類型的(instanceof),如果是SumBuilder類型的,就按照sum的結果類型去讀取傳回結果。

b)sum的傳回結果就是一個值,當遇到percentiles這種類型的,傳回結果不是一個值,此時為了簡單,我将結果壓縮成了jsonstring,也相當于一個值,可以自行參看代碼。

c)後面依賴return true實作了一個邏輯,一旦命中了類型,就不再繼續判斷了,提升效率。

d)tmpMap是外部傳入的一個全局接收器,用來存儲結果。

4)解析所有的子聚合結果

public Map<String, Object> getGroupbyResponse() {
        Map<String, Object> aggResponseMap = new TreeMap<String, Object>();
        for (Terms.Bucket bucket : getTermsBucket()) {
            String bucketKeyAsString = bucket.getKeyAsString();
            Map<String, String> tmpMap = new TreeMap<String, String>();
            for (Map<String, Object> subAgg : subAggList) {
                String subAggName = subAgg.get("aggName").toString();
                MetricsAggregationBuilder subAggBuilder = (MetricsAggregationBuilder) subAgg.get("aggBuilder");
                if (bucketAvgAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketMaxAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketMinAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketSumAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketCountAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketCardinalityAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketPercentileRanksAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketPercentilesAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketExtendedStatsAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
                if (bucketStatsAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
            }
            aggResponseMap.put(bucketKeyAsString, tmpMap);
        }
        return aggResponseMap;
    }      

這裡是解析結果的代碼。tmpMap定義為全局接收器。

a)通過周遊subAggList存儲的所有子聚合函數,擷取所有的子聚合結果,并儲存成兩級TreeMap。

b)對每個疊代,調用所有的bucket...函數簇,這裡通過if判斷是否命中類型,如果命中了,就通過continue不再繼續檢查。

c) aggResponseMap使用treeMap是為了保持bucket的有序。

3、十種聚合函數

最後列出我們實作的十種聚合函數,你可以根據自己的需求繼續添加。

1)傳回單個值:sum、avg、min、max、count、cardinality(有誤差)

2)percentiles:分位數查詢,傳入分位數,擷取分位數上的值;percentileRanks,分位數排名查詢,傳入值,傳回對應的分位數;互為逆向操作。

3)stats和extendedStats,extended聚合更詳細的資訊max、min、avg、sum、平方和、标準差等。