天天看點

MongoDB分布式存儲的MapReduce并行查詢

今天要介紹的是如何使用MONGODB中提供的MapReduce功能進行查詢。

  今天介紹如何基于sharding機制進行mapreduce查詢。在MongoDB的官方文檔中,這麼一句話:

Sharded Environments

      In sharded environments, data processing of map/reduce operations runs in parallel on all shards.

即: map/reduce操作會并行運作在所有的shards上。

  下面我們就用之前這篇文章中白搭建的環境來構造mapreduce查詢:

  首先要說的是,基于sharding的mapreduce與非sharding的資料在傳回結構上有一些差別,我目前注意到的主要是不支援定制式的json格式的傳回資料,也就是下面方式可能會出現問題:

  return { count : total };

注意:上面的情況目前出現在了我的測試環境下,如下圖:

  就需要改成 return count;

  下面是測試代碼,首先是按文章id來查詢相應數量(基于分組查詢執行個體方式):

Code highlighting produced by Actipro CodeHighlighter (freeware)http://www.CodeHighlighter.com/-->public partial class getfile : System.Web.UI.Page

    {

        public Mongo Mongo { get; set; }

        public IMongoDatabase DB

        {

            get

            {

                return this.Mongo["dnt_mongodb"];

            }

        }

        /// <summary>

        /// Sets up the test environment.  You can either override this OnInit to add custom initialization.

        /// </summary>

        public virtual void Init()

            string ConnectionString = "Server=10.0.4.85:27017;ConnectTimeout=30000;ConnectionLifetime=300000;MinimumPoolSize=512;MaximumPoolSize=51200;Pooled=true";

            if (String.IsNullOrEmpty(ConnectionString))

                throw new ArgumentNullException("Connection string not found.");

            this.Mongo = new Mongo(ConnectionString);

            this.Mongo.Connect();         

        string mapfunction = "function(){\n" +

                        "  if(this._id=='548111') { emit(this._id, 1); } \n" +   

                        "};";

        string reducefunction = "function(key, current ){" +

                                "   var count = 0;" +

                                "   for(var i in current) {" +

                                "       count+=current[i];" +

                                "   }" +

                                "   return count ;\n" +

                              "};";

        protected void Page_Load(object sender, EventArgs e)

            Init();

            var mrb = DB["posts1"].MapReduce();//attach_gfstream.files

            int groupCount = 0;

            using (var mr = mrb.Map(mapfunction).Reduce(reducefunction))

                foreach (Document doc in mr.Documents)

                {

                    groupCount = int.Parse(doc["value"].ToString());

                }

            this.Mongo.Disconnect();

        }     

     }

  下面是運作時的查詢結果,如下:

MongoDB分布式存儲的MapReduce并行查詢

  接着示範一下如何把查詢到的文章資訊傳回并裝入list集合,這裡隻查詢ID為548110和548111兩個文章:

Code highlighting produced by Actipro CodeHighlighter (freeware)http://www.CodeHighlighter.com/-->        string mapfunction = "function(){\n" +

                        "  if(this._id=='548110'|| this._id=='548111') { emit(this, 1); } \n" +    

        string reducefunction = "function(doc, current ){" +

                                "   return doc;\n" +

                               "};";

            List<Document> postDoc = new List<Document>();

                    postDoc.Add((Document)doc["value"]);

下面是運作時的查詢結果,如下:

  上面的map/reduce方法還有許多寫法,如果大家感興趣可以看一下如下這些連結:

  當然在mongos進行map/reduce運算時,會生成一些臨時檔案,如下圖:

MongoDB分布式存儲的MapReduce并行查詢

  我猜這些臨時檔案可能會對再次查詢系統時的性能有一些提升(但目前未觀察到)。

  當然對于mongodb的gridfs系統(可使用它搭建分布式檔案存儲系統,我之前在這篇文章中已介紹過,我也做了測試,但遺憾的是并未成功,它經常會報一些錯誤,比如:

  Thu Sep 09 12:09:29 Assertion failure _grab client\parallel.cpp 461

看來mapreduce程式連結到mongodb上時,會産生一些問題,但不知道是不是其自身穩定性的原因,還是我的機器環境設定問題(記憶體或配置的64位系統mongos與32位的client連接配接問題)。

本文轉自 wws5201985 51CTO部落格,原文連結:http://blog.51cto.com/wws5201985/786458,如需轉載請自行聯系原作者