天天看点

mongoDB aggregate聚合分析

简介

聚集操作实际上是对数据进行统计分析时使用的,简单的说,可以理解为SQL中的聚合操作,MongoDB中的聚集操作是为了大数据分析做准备的,这里简单介绍一下聚集框架aggregate的使用。

管道模式聚集分析

MongoDB的聚合框架是参考UNIX上的管道命令实现的,数据通过一个多步骤的管道,每个步骤都会对数据进行加工处理,最后返回需要的结果集。 

管道聚集是可以操作一个分片的集合的(The aggregation pipeline can operate on a sharded collection.) 

管道聚集在某一些阶段可以利用索引提高性能,另外,管道聚集还有一个一个内部优化阶段(后面管道聚集优化会讲到)。

常见的管道操作符有如下:

操作符 说明
$match 过滤文档
$limit 限制管道中文件的数据
$skip 跳过指定的文档数量
$sort 对所输入的文档进行排序
$group 对文档进行分组后计算聚集结果
$out 输出文档到具体的集合中(必须是管道操作的最后一步)

与$group一起使用的聚集操符:

操作符 说明
$first 返回group后的第一个值
$last 返回group后的最后一个值
$max group后的最大值
$min group后的最小值
$avg group后的平均值
$sum group后求和

示例:

db.Collection.aggregate(

        {$match : {"appId" : "2e1800b22ae70600", "leaveTime" : {"$gt" : ISODate("2017-07-12T00:00:00"), "$lt" : ISODate("2017-07-13T00:00:00")}}},

        {$group : {"_id" : "$leaveMethod", "count" : {$sum : 1}}},

        {$sort : {"_id" : 1}}

  )

集成到java操作

首先要导入mongodb的java驱动包mongo-java-driver-3.2.2.jar

使用 new Document()对象来替代上面{“XXX”:”XXX”}条件

示例:

public String aggregateLeaveMethodByDate(String app_id, Date beginDate, Date endDate) throws Exception {

              MongoCollection<Document> collection = PluginMongo.instance().getDatabase().getCollection(MongoCollectionName.PARKING_RECORD);

              Document sub_match = new Document();

              sub_match.put("appId", app_id);

              sub_match.put("leaveTime", new Document("$gt", beginDate).append("$lt", endDate));

             

              Document sub_group = new Document();

              sub_group.put("_id", "$leaveMethod");

              sub_group.put("count", new Document("$sum", 1));

             

              Document match = new Document("$match", sub_match);

              Document group = new Document("$group", sub_group);

              Document sort = new Document("$sort", new Document("_id", 1));

             

              List<Document> aggregateList = new ArrayList<Document>();

              aggregateList.add(match);

              aggregateList.add(group);

              aggregateList.add(sort);

             

              JSONObject ret_obj = new JSONObject();

              AggregateIterable<Document> resultset = collection.aggregate(aggregateList);

              MongoCursor<Document> cursor = resultset.iterator();

             

              try {

                     while(cursor.hasNext()) {

                            Document item_doc = cursor.next();

                            int leaveMethod = item_doc.getInteger("_id", 0);

                            int count = item_doc.getInteger("count", 0);

                           

                            LeaveMethodEnum leaveMethodVal = LeaveMethodEnum.fromType(leaveMethod);

                            ret_obj.put(leaveMethodVal.name(), count);

                     }

              } finally {

                     cursor.close();

              }

             

              return ret_obj.toJSONString();

       }