简介
开源的 Elasticsearch 是目前全文搜索引擎的首选,很多日志都是放到elasticsearch里面,然后再根据具体的需求进行分析。
目前我们的运维系统是使用golang开发的,需要定时到elasticsearch取数据分析。
官方有一套自己的SDK:github.com/elastic/go-elasticsearch ,但是好像用的人不是很多,更多用的是:github.com/olivere/elastic
支持版本
不同版本的ES对应的API也是不一样,因此api地址也是不一样,在包地址后面加上对应的版本号就可以调用到对应的版本了。
例如:elasticsearch 6 的对应 SDK 就是 github.com/olivere/elastic/v6
例子
一个获取系统日志的例子
package mainimport ( "context" "encoding/json" "fmt" "time" "github.com/olivere/elastic")// Message 系统日志type Message struct { Host string `json:"host"` Message string `json:"message"` Facility string `json:"facility"` Severity string `json:"severity"` TimeStamp time.Time `json:"@timestamp"`}func main() { endTime := time.Now() startTime := endTime.Add(-10 * time.Minute) // 设置地址和验证 , 默认 会自动去查找集群的所有可用节点,如果使用代理,代理到内网就有可能出现访问不同,需要加上参数 elastic.SetSniff(false) client, err := elastic.NewClient(elastic.SetURL("http://192.168.100.1:9200", "http://192.168.100.2:9200", "http://192.168.100.3:9200"), elastic.SetBasicAuth("XXXXX", "XXXXX")) if err != nil { panic(err) } query := elastic.NewBoolQuery() // 过滤facility字段中 包含kern的 query.Must(elastic.NewQueryStringQuery("facility:kern")) // 查找时间范围 query.Must(elastic.NewRangeQuery("@timestamp").Gte(startTime).Lt(endTime)) // Search指定索引,可以使用通配符 result, err := client.Search("rsyslog*").Query(query).Sort("@timestamp", true).Size(100).Do(context.Background()) if err != nil { panic(err) } fmt.Println(result.Hits.TotalHits) total := result.Hits.TotalHits // messages := make([]*Message, total) var count int64 var index int64 for (count-1)*100 < total { for _, value := range result.Hits.Hits { if index >= total { continue } var doc *Message json.Unmarshal(*value.Source, &doc) index++ fmt.Println("第", index, "条:", doc.TimeStamp.Format("2006-01-02 15:04:05"), doc.Host, doc.Facility, doc.Severity, doc.Message) } count++ } return}
有时候我们呢要的是直接同统计结果,这时候就需要使用聚合查询
package mainimport ( "context" "time" "github.com/olivere/elastic")// MessageCount 系统统计type MessageCount struct { Host string `json:"host"` Count int64 `json:"count"`}func main() { endTime := time.Now() startTime := endTime.Add(-10 * time.Minute) // 设置地址和验证 , 默认 会自动去查找集群的所有可用节点,如果使用代理,代理到内网就有可能出现访问不同,需要加上参数 elastic.SetSniff(false) client, err := elastic.NewClient(elastic.SetURL("http://192.168.100.1:9200", "http://192.168.100.2:9200", "http://192.168.100.3:9200"), elastic.SetBasicAuth("XXXXX", "XXXXX")) if err != nil { panic(err) } query := elastic.NewBoolQuery() // 过滤facility字段中 包含kern的 query.Must(elastic.NewQueryStringQuery("facility:kern")) // 查找时间范围 query.Must(elastic.NewRangeQuery("@timestamp").Gte(startTime).Lt(endTime)) aggs := elastic.NewTermsAggregation().Field("host") // Search指定索引,可以使用通配符 result, err := client.Search("rsyslog-*").Query(query).Aggregation("host", aggs).Sort("@timestamp", true).Size(0).Do(context.Background()) agg, found := result.Aggregations.Terms("host") if !found { return } var messageCounts []MessageCount // 遍历桶数据 for _, bucket := range agg.Buckets { // 每一个桶都有一个key值,其实就是分组的值,可以理解为SQL的group by值 // bucketValue := bucket.Key // 打印结果, 默认桶聚合查询,都是统计文档总数 // fmt.Printf("bucket = %q 文档总数 = %d", bucketValue, bucket.DocCount) messageCounts = append(messageCounts, MessageCount{ Host: bucket.Key.(string), Count: bucket.DocCount, }) } return}
总结
Elasticsearch查询还是非常快,操作起来也不难。