天天看點

Mongodb Manual閱讀筆記:CH6 聚合 6 聚合

聚合就是,通過處理資料得到一些計算結果。聚合操作可以分組,在Mongodb中聚合可以分為3中:聚合管道(aggregation pipeline),map-reduce,單一目的的聚合方法或者指令

<a href="#_Toc380869611">6 聚合... 1</a>

<a href="#_Toc380869612">6.1聚合介紹... 2</a>

<a href="#_Toc380869613">6.1.1聚合方式... 2</a>

<a href="#_Toc380869614">6.1.1.1 聚合管道(aggregation pipeline)2</a>

<a href="#_Toc380869615">6.1.1.2 map-reduce. 3</a>

<a href="#_Toc380869616">6.1.1.3單一目的的聚合操作... 3</a>

<a href="#_Toc380869617">6.1.2其他的特性和能力... 4</a>

<a href="#_Toc380869618">6.2聚合概述... 4</a>

<a href="#_Toc380869619">6.2.1聚合管道... 5</a>

<a href="#_Toc380869620">6.2.1.1 管道... 5</a>

<a href="#_Toc380869621">6.2.1.2 管道表達式... 5</a>

<a href="#_Toc380869622">6.2.1.3聚合管道特性... 5</a>

<a href="#_Toc380869623">6.2.2 Map-Reduce. 5</a>

<a href="#_Toc380869624">6.2.2.1 Map-Reduce的Javascript函數... 6</a>

<a href="#_Toc380869625">6.2.2.2 Map-Reduce的特性... 6</a>

<a href="#_Toc380869626">6.2.3單一目的的聚合操作... 6</a>

<a href="#_Toc380869627">6.2.3.1 Coutn. 6</a>

<a href="#_Toc380869628">6.2.3.2 Distinct. 6</a>

<a href="#_Toc380869629">6.2.3.3 Group. 6</a>

<a href="#_Toc380869630">6.2.4聚合機制... 6</a>

<a href="#_Toc380869631">6.2.4.1聚合管道優化... 7</a>

<a href="#_Toc380869632">6.2.4.2聚合管道的限制... 7</a>

<a href="#_Toc380869633">6.2.4.3 聚合管道和shard collection. 8</a>

<a href="#_Toc380869634">6.2.4.4 Map-Reduce和shard叢集... 8</a>

<a href="#_Toc380869635">6.2.4.5 Map-Reduce的并發... 9</a>

<a href="#_Toc380869636">6.3聚合例子... 9</a>

<a href="#_Toc380869637">6.3.1郵編的資料集上使用聚合... 9</a>

<a href="#_Toc380869638">6.3.1.1資料模型... 9</a>

<a href="#_Toc380869639">6.3.1.2傳回人口超過100萬的州... 9</a>

<a href="#_Toc380869640">6.3.1.3 每個州的平均城市人口... 10</a>

<a href="#_Toc380869641">6.3.1.4 傳回最大和最小城市... 10</a>

<a href="#_Toc380869642">6.3.2 使用者愛好資料聚合... 10</a>

<a href="#_Toc380869643">6.3.2.1資料模型... 10</a>

<a href="#_Toc380869644">6.3.2.2 文檔排序... 11</a>

<a href="#_Toc380869645">6.3.2.3 根據加入的月份排序... 11</a>

<a href="#_Toc380869646">6.3.2.4 每月加入的總人數... 11</a>

<a href="#_Toc380869647">6.3.2.5前五最受歡迎的愛好... 11</a>

<a href="#_Toc380869648">6.3.3 Map-Reduce例子... 12</a>

<a href="#_Toc380869649">6.3.3.1 傳回每個使用者的所有訂單價格... 12</a>

<a href="#_Toc380869650">6.3.3.2 計算訂單平均商品數量... 12</a>

<a href="#_Toc380869651">6.3.4 增量執行map-reduce. 13</a>

<a href="#_Toc380869652">6.3.4.1 資料... 13</a>

<a href="#_Toc380869653">6.3.4.2 初始化執行Map-Reduce. 13</a>

<a href="#_Toc380869654">6.3.4.3順序新增資料... 14</a>

<a href="#_Toc380869655">6.3.5 Map函數TroubleShooting. 15</a>

<a href="#_Toc380869656">6.3.6Reduce函數Troubleshooting. 15</a>

<a href="#_Toc380869657">6.4聚合指南... 15</a>

和查詢一樣,都是查詢collection的文檔,不過聚合還要計算,通過計算得到結果

聚合管道,通過多個階段的管道計算,來得到結果。最基本的就是過濾,和查詢的過來一樣。其他的管道,提供分組的工具,可以計算平均值,sum等。

Mongodb Manual閱讀筆記:CH6 聚合 6 聚合

map-reduce分為2個階段,map階段,處理每個文檔,通過emits發到下個階段。reduce階段,輸出map階段的輸出組合。map-reduce可以通過finalize階段處理最後的結果。map-reduce,也可以限制輸入條件,也可以排序限制輸出結果。

map-reduce的每個階段都是由javascript腳本寫成的函數。javascript提供的很好的靈活性,但是更加複雜和低效。

map-reduce,輸出可以超過16MB

Mongodb Manual閱讀筆記:CH6 聚合 6 聚合

單一目的的聚合操作,比如傳回行數,distinct等,都來至于同一個collection,但是和map-reduce和聚合管道而言,缺少了靈活性。

Mongodb Manual閱讀筆記:CH6 聚合 6 聚合

聚合管道和map-reduce可以在shard叢集上使用,map-reduce還可以輸出到叢集中。

聚合管道,可以通過索引來提高性能。

詳細介紹聚合管道,map-reduce,單一目的的聚合操作之間的特性和限制。

聚合管道是一個資料聚合模型架構的概念,資料處理管道,通過多個階段,來計算出聚合結果。

很多情況下由于靈活性,map-reduce是首選但是複雜性很難讓人接受。

理論上,collection中的文檔通過管道來轉化計算資料。collection的文檔一個一個的通過管道轉化,但是并不是每個輸入都會有輸出。

每個管道都有表達式,表達式指定了資料要如何轉化。管道隻能操作目前文檔。一般管道表達式沒有目前狀态,除了一些特殊的,比如求最大,最小值。

聚合管道把一個collection的資料都丢到管道中轉化,這些操作時可以被索引優化的。

$match,$sort,$limit,$skip這些操作時可以利用索引的,隻要在管道的最開始出,比一下任何操作早就可以了,$project,$unwind,$group

2.4版本中,$geoNear也可以使用地理性的索引,但是必去出現在第一階段。

若聚合隻處理collection的子集,就可以使用一下操作過濾,$match,$limit,$skip,然後放到pipeline的開頭可以使用合适的索引。

如果$match放在$sort的後面,那麼隻有$sort可以使用索引。

聚合管道,mongodb提供了内部優化各個管道的順序,提高執行的性能。

map-reduce是處理資料量的範例,在map階段,應用每個資料,然後搞成k-v對分發出去,在reduce階段,收集計算聚合資料,然後存放在collection中,最後還要通過finalize函數來輸出。

map-reduce内的階段都是js函數,是以可以在map階段之前做任何的排序,限制操作。

使用javascript在map中形成k-v對,然後在reduce中處理。在現實情況中,map可以又多個key,也可以沒有key。

mongodb中Map-Reduce的結果可以寫入collection也可以直接輸出。如果直接輸出有BSON的大小限制,16MB,Map-Reduce也支援寫入到shard的collection中。

隻能用于指定功能。

傳回文檔的數量可以是db.collection.count(),也可以是cursor.count()

檢視某個列的distinct值。

db.collection.distinct("cust_id")

把查詢結果作為輸入,通過一些計算然後把結果以數組方式輸出。

db.records.group( {

         key: { a: 1 },

         cond: { a: { $lt: 3 } },

         reduce: function(cur, result) { result.count += cur.count },

         initial: { count: 0 }

} )

本節介紹,聚合管道的優化,聚合管道的限制,聚合管道和shard叢集,map-reduce和shard叢集,map-reduce并發

聚合管道有個優化階段,把所有的管道重新排列

$sort+$skip+$limit順序優化:如果你的順序是$sort,$skip,$limit,那麼$limit會被提到$skip上面。如:

{ $sort: { age : -1 } },

{ $skip: 10 },

{ $limit: 5 }

優化後

{ $limit: 15 }

{ $skip: 10 }

$limit+$skip+$limit+$skip順序優化:一樣還是會把limit放到skip上面,但是limit之間,skip之間會合并,如:

{ $limit: 100 },

{ $skip: 5 },

{ $limit: 10},

{ $skip: 2 }

第一步優化後會把limit放到上面

{ $limit: 15},

第二步優化合并

{ $limit: 15 },

{ $skip: 7 }

如果$project裡面指定的是include,MongoDB會把這個projection應用到pipeline的頭上。

聚合管道不允許操作一下資料類型:Symbol,MinKey,MaxKey,DBRef,Code,CodeWScope

不能操作BSON結果大小16MB

聚合使用記憶體超過記憶體的10%,過程會報錯退出。

一般隻有累計操作次才會出現,即要全部輸入才會出結果的操作,記憶體超過5%就會開始告警,并寫入日志,超過10%報錯退出。

聚合管道支援在shard collection上運作

第一步,聚合管道會把$group,$sort操作釋出到每個shard,然後第二個管道再到mongos上運作,這個管道會組合之前的$group,$sort,繼續執行。

$group從shard獲得子結果,然後組合起來

聚合管道會給mongos帶來巨大的cpu消耗,如果在shard叢集中有大量的聚合管道操作,建議更換體系結構。

Map-Reduce輸入輸出都支援shard。

如果shared collection作為資料,mongos會自發的把map-reduce的操作釋出到每個shard。

如果map-reduce輸出帶有shard值,mongodb會使用_id作為shard key進行shard。

輸出到shard collection的條件:

1.如果輸出的collection不存在,會根據_id建立shard

2.如果一個空的shard,在map-reduce的第一階段的結果來是初花shard的chunks

3.monogos并發的排程map-reduce的任務到shard中,處理任務的時候,shard會擷取自己需要的chunk然後儲存

map-reduce是一個任務組合,包括讀輸入,執行map,執行reduce,輸出

在執行map-reduce的時候會産生以下的鎖:

1.讀階段,産生讀鎖,每100個文檔就會釋放一次

2.插入到一個零食collection寫鎖

3.如果collection不存在,建立一個寫鎖

4.如果collection存在,寫入輸出寫鎖

{

"_id": "10280",

"city": "NEW YORK",

"state": "NY",

"pop": 5574,

"loc": [

-74.016323,

40.710537

]

}

id:郵編,city:城市,state:州,pop:人口,loc:經緯

db.zipcodes.aggregate( { $group :

                                              { _id : "$state",

                                                        totalPop : { $sum : "$pop" } } },

                                     { $match : {totalPop : { $gte : 10*1000*1000 } } } )

1.group,為每個state建立一個文檔,sum 人口

2.然後,通過match過濾人口

db.zipcodes.aggregate( { $group :{ _id : { state : "$state", city : "$city" },pop : { $sum : "$pop" } } },

                                     { $group :{ _id : "$_id.state",avgCityPop : { $avg : "$pop" } } } )

1.group對state,city人口進行sum

2.對state人口進行平均

db.zipcodes.aggregate(

         { $group:{ _id: { state: "$state", city: "$city" },pop: { $sum: "$pop" } } },{ $sort: { pop: 1 } },

         { $group:{ _id : "$_id.state",biggestCity: { $last: "$_id.city" },biggestPop: { $last: "$pop" },

                  smallestCity: { $first: "$_id.city" },

                  smallestPop: { $first: "$pop" } }

         },

         // the following $project is optional, and

         // modifies the output format.

         { $project:{ _id: 0,state: "$_id",biggestCity: { name: "$biggestCity", pop: "$biggestPop" },

                  smallestCity: { name: "$smallestCity", pop: "$smallestPop" } } }

 )

1.對state,city人口進行sum,并進行排序

2.根據排序擷取人口最多,和人口最少的城市

3.通過project修改字段名

         _id : "jane",

         joined : ISODate("2011-03-02"),

         likes : ["golf", "racquetball"]

         _id : "joe",

         joined : ISODate("2012-07-02"),

         likes : ["tennis", "golf", "swimming"]

db.users.aggregate(

[

{ $project : { name:{$toUpper:"$_id"} , _id:0 } },

{ $sort : { name : 1 } }

])

         { $project : { month_joined : {$month : "$joined"},name : "$_id",_id : 0},

         { $sort : { month_joined : 1 } }

         { $project : { month_joined : { $month : "$joined" } } } ,

         { $group : { _id : {month_joined:"$month_joined"} , number : { $sum : 1 } } },

         { $sort : { "_id.month_joined" : 1 } }

         { $unwind : "$likes" },

         { $group : { _id : "$likes" , number : { $sum : 1 } } },

         { $sort : { number : -1 } },

         { $limit : 5 }

1.使用unwind對likes進行拆分

2.group對愛和統計,并計數

3.排序

4 前5

Map-Reduce主要針對以下資料模型:

         _id: ObjectId("50a8240b927d5d8b5891743c"),

         cust_id: "abc123",

         ord_date: new Date("Oct 04, 2012"),

         status: 'A',

         price: 25,

         items: [      { sku: "mmm", qty: 5, price: 2.5 },

                            { sku: "nnn", qty: 5, price: 2.5 } ]

1.定義map函數,map price和cust_id并emits

var mapFunction1 = function() {

                   emit(this.cust_id, this.price);

};

2.定義reduce 函數,2個參數,keycustid和valuesPrices,valuesPrices是一個數組,是以:

var reduceFunction1 = function(keyCustId, valuesPrices) {

         return Array.sum(valuesPrices);

3.執行map-reduce,輸出指定map_reduce_example如果已經存在會替換原先的collection

db.orders.mapReduce(

         mapFunction1,

         reduceFunction1,

         { out: "map_reduce_example" }

)

var mapFunction2 = function() {

                            for (var idx = 0; idx &lt; this.items.length; idx++) {

                                     var key = this.items[idx].sku;

                                     var value = {count: 1,qty: this.items[idx].qty};

                            emit(key, value);

}};

var reduceFunction2 = function(keySKU, countObjVals) {

                                               reducedVal = { count: 0, qty: 0 };

                                               for (var idx = 0; idx &lt; countObjVals.length; idx++) {

                                                        reducedVal.count += countObjVals[idx].count;

                                                        reducedVal.qty += countObjVals[idx].qty;

                                               }

                                               return reducedVal;

var finalizeFunction2 = function (key, reducedVal) {

         reducedVal.avg = reducedVal.qty/reducedVal.count;

         return reducedVal;

db.orders.mapReduce( mapFunction2,reduceFunction2,

                                     {

                                               out: { merge: "map_reduce_example" },

                                               query: { ord_date:{ $gt: new Date('01/01/2012') }},

                                               finalize: finalizeFunction2

                                     }

out,merge輸出會合并到map_reduce_example

如果map-reduce的資料集會一直增加,那麼不需要對所有的執行map-reduce,最對增加的部分執行即可。

1.在現有的collection上執行map-reduce

2.增加資料,在query中指定新資料的條件,在out中指定reduce,合并已有資料

db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );

db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );

db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );

db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );

db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );

db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );

db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );

db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );

var mapFunction = function() {

                                     var key = this.userid;

                                     var value = {userid: this.userid,

                                                        total_time: this.length,

                                                        count: 1,

                                                        avg_time: 0

                                     };

                                     emit( key, value );

var reduceFunction = function(key, values) {

                                                        var reducedObject = {

                                                                 userid: key,

                                                                 total_time: 0,

                                                                 count:0,

                                                                 avg_time:0

                                                        };

                                                        values.forEach( function(value) {

                                                                 reducedObject.total_time += value.total_time;

                                                                 reducedObject.count += value.count;

                                                        });

                                                        return reducedObject;

var finalizeFunction = function (key, reducedValue) {

                                                        if (reducedValue.count &gt; 0)

                                                        reducedValue.avg_time = reducedValue.total_time / reducedValue.count;

                                                        return reducedValue;

db.sessions.mapReduce( mapFunction,

                                               reduceFunction,

                                               {

                                                        out: { reduce: "session_stat" },

                                                        finalize: finalizeFunction

db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );

db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );

db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );

db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );

                                     reduceFunction,

                                               query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },

                                               out: { reduce: "session_stat" },

                                               finalize: finalizeFunction

);

1.定義map函數

var map = function() {

emit(this.cust_id, this.price);

2.定義reduce函數

var emit = function(key, value) {

print("emit");

print("key: " + key + " value: " + tojson(value));

3.調用map函數

var myDoc = db.orders.findOne( { _id: ObjectId("50a8240b927d5d8b5891743c") } );

map.apply(myDoc);

4.驗證輸出

emit

key: abc123 value:250

5.多文檔調用

var myCursor = db.orders.find( { cust_id: "abc123" } );

while (myCursor.hasNext()) {

         var doc = myCursor.next();

         print ("document _id= " + tojson(doc._id));

         map.apply(doc);

         print();

6.驗證結果

1.确定傳回的資料類型,和傳入的資料類型一緻

2.數組的順序,不影響結果

3.多次執行不影響結果

檢視手冊p308

繼續閱讀