天天看点

elasticsearch aggregations_Golang操作elasticsearch简介支持版本例子总结

简介

开源的 Elasticsearch 是目前全文搜索引擎的首选,很多日志都是放到elasticsearch里面,然后再根据具体的需求进行分析。

elasticsearch aggregations_Golang操作elasticsearch简介支持版本例子总结

目前我们的运维系统是使用golang开发的,需要定时到elasticsearch取数据分析。

官方有一套自己的SDK:github.com/elastic/go-elasticsearch ,但是好像用的人不是很多,更多用的是:github.com/olivere/elastic

elasticsearch aggregations_Golang操作elasticsearch简介支持版本例子总结

支持版本

不同版本的ES对应的API也是不一样,因此api地址也是不一样,在包地址后面加上对应的版本号就可以调用到对应的版本了。

例如:elasticsearch 6 的对应 SDK 就是 github.com/olivere/elastic/v6

例子

一个获取系统日志的例子

package mainimport (    "context"    "encoding/json"    "fmt"    "time"    "github.com/olivere/elastic")​// Message 系统日志type Message struct {    Host      string    `json:"host"`    Message   string    `json:"message"`    Facility  string    `json:"facility"`    Severity  string    `json:"severity"`    TimeStamp time.Time `json:"@timestamp"`}​func main() {    endTime := time.Now()    startTime := endTime.Add(-10 * time.Minute)    // 设置地址和验证 , 默认 会自动去查找集群的所有可用节点,如果使用代理,代理到内网就有可能出现访问不同,需要加上参数 elastic.SetSniff(false)    client, err := elastic.NewClient(elastic.SetURL("http://192.168.100.1:9200", "http://192.168.100.2:9200", "http://192.168.100.3:9200"), elastic.SetBasicAuth("XXXXX", "XXXXX"))    if err != nil {        panic(err)    }    query := elastic.NewBoolQuery()    // 过滤facility字段中 包含kern的    query.Must(elastic.NewQueryStringQuery("facility:kern"))    // 查找时间范围    query.Must(elastic.NewRangeQuery("@timestamp").Gte(startTime).Lt(endTime))    // Search指定索引,可以使用通配符    result, err := client.Search("rsyslog*").Query(query).Sort("@timestamp", true).Size(100).Do(context.Background())    if err != nil {        panic(err)    }    fmt.Println(result.Hits.TotalHits)    total := result.Hits.TotalHits    // messages := make([]*Message, total)    var count int64    var index int64    for (count-1)*100 < total {        for _, value := range result.Hits.Hits {            if index >= total {                continue            }            var doc *Message            json.Unmarshal(*value.Source, &doc)            index++            fmt.Println("第", index, "条:", doc.TimeStamp.Format("2006-01-02 15:04:05"), doc.Host, doc.Facility, doc.Severity, doc.Message)        }        count++    }    return}​
           

有时候我们呢要的是直接同统计结果,这时候就需要使用聚合查询

package main​import (    "context"    "time"​    "github.com/olivere/elastic")​// MessageCount 系统统计type MessageCount struct {    Host  string `json:"host"`    Count int64  `json:"count"`}​func main() {    endTime := time.Now()    startTime := endTime.Add(-10 * time.Minute)    // 设置地址和验证 , 默认 会自动去查找集群的所有可用节点,如果使用代理,代理到内网就有可能出现访问不同,需要加上参数 elastic.SetSniff(false)    client, err := elastic.NewClient(elastic.SetURL("http://192.168.100.1:9200", "http://192.168.100.2:9200", "http://192.168.100.3:9200"), elastic.SetBasicAuth("XXXXX", "XXXXX"))    if err != nil {        panic(err)    }    query := elastic.NewBoolQuery()    // 过滤facility字段中 包含kern的    query.Must(elastic.NewQueryStringQuery("facility:kern"))    // 查找时间范围    query.Must(elastic.NewRangeQuery("@timestamp").Gte(startTime).Lt(endTime))​    aggs := elastic.NewTermsAggregation().Field("host")    // Search指定索引,可以使用通配符    result, err := client.Search("rsyslog-*").Query(query).Aggregation("host", aggs).Sort("@timestamp", true).Size(0).Do(context.Background())    agg, found := result.Aggregations.Terms("host")    if !found {        return    }    var messageCounts []MessageCount    // 遍历桶数据    for _, bucket := range agg.Buckets {        // 每一个桶都有一个key值,其实就是分组的值,可以理解为SQL的group by值        // bucketValue := bucket.Key        // 打印结果, 默认桶聚合查询,都是统计文档总数        // fmt.Printf("bucket = %q 文档总数 = %d", bucketValue, bucket.DocCount)        messageCounts = append(messageCounts, MessageCount{            Host:  bucket.Key.(string),            Count: bucket.DocCount,        })    }    return}
           

总结

Elasticsearch查询还是非常快,操作起来也不难。

elasticsearch aggregations_Golang操作elasticsearch简介支持版本例子总结