天天看點

驚!使用300行代碼建立一個分布式系統

建構一個分布式系統是很困難的。它需要可擴充性、容錯性、高可用性、一緻性、可伸縮以及高效。為了達到這些目的,分布式系統需要很多複雜的元件以一

種複雜的方式協同工作。例如,apache hadoop在大型叢集上并行處理tb級别的資料集時,需要依賴有着高容錯的檔案系統(hdfs)來達到高吞

吐量。

在之前,每一個新的分布式系統,例如hadoop和cassandra,都需要建構自己的底層架構,包括消息處理、存儲、網絡、容錯性和可伸縮性。

慶幸的是,像apache mesos這樣的系統,通過給分布式系統的關鍵構模組化塊提供類似作業系統的管理服務,簡化了建構和管理分布式系統的任務。

mesos抽離了cpu、存儲和其它計算資源,是以開發者開發分布式應用程式時能夠将整個資料中心叢集當做一台巨型機對待。

驚!使用300行代碼建立一個分布式系統

建構在mesos上的應用程式被稱為架構,它們能解決很多問題:apache spark,一種流行的叢集式資料分析工具;chronos,一個類

似cron的具有容錯性的分布式scheduler,這是兩個建構在mesos上的架構的例子。建構架構可以使用多種語言,包括

c++,go,python,java,haskell和 scala。

在分布式系統用例上,比特币開采就是一個很好的例子。比特币将為生成 acceptable hash 的挑戰轉為驗證一塊事務的可靠性。可能需要

幾十年,單台筆記本電腦挖一塊可能需要花費超過150年。結果是,有許多的“采礦池”允許采礦者将他們的計算資源聯合起來以加快挖礦速度。

1個mesos架構有1個scheduler 和1個executor組成。scheduler 和mesos master通信并決定運作什麼任

務,而executor 運作在slaves上面,執行實際任務。大多數的架構實作了自己的scheduler,并使用1個由mesos提供的标準

executors。當然,架構也可以自己定制executor。在這個例子中即會編寫定制的scheduler,并使用标準指令執行器

(executor)運作包含我們比特币服務的docker鏡像。

對這裡的scheduler來說,需要運作的有兩種任務—— 單礦伺服器任務和多礦伺服器任務。伺服器會和一個比特币采礦池通信,并給每個“勞工”配置設定塊。“勞工”會努力工作,即開采比特币。

任務實際上被封裝在executor架構中,是以任務運作意味着告訴mesos master在其中一個slave上面啟動一個executor。

由于這裡使用的是标準指令執行器(executor),是以可以指定任務是二進制可執行檔案、bash腳本或者其他指令。由于mesos支援

docker,是以在本例中将使用可執行的docker鏡像。docker是這樣一種技術,它允許你将應用程式和它運作時需要的依賴一起打包。

為了在mesos中使用docker鏡像,這裡需要在docker registry中注冊它們的名稱:

const ( 

    minerserverdockerimage = "derekchiang/p2pool" 

    minerdaemondockerimage = "derekchiang/cpuminer" 

然後定義一個常量,指定每個任務所需資源:

    memperdaemontask = 128  // mining shouldn't be memory-intensive 

    memperservertask = 256 

    cpuperservertask = 1    // a miner server does not use much cpu 

現在定義一個真正的scheduler,對其跟蹤,并確定其正确運作需要的狀态:

type minerscheduler struct { 

    // bitcoind rpc credentials 

    bitcoindaddr string 

    rpcuser      string 

    rpcpass      string 

    // mutable state 

    minerserverrunning  bool 

    minerserverhostname string 

    minerserverport     int    // the port that miner daemons 

                               // connect to 

    // unique task ids 

    taskslaunched        int 

    currentdaemontaskids []*mesos.taskid 

這個scheduler必須實作下面的接口:

type scheduler interface { 

    registered(schedulerdriver, *mesos.frameworkid, *mesos.masterinfo) 

    reregistered(schedulerdriver, *mesos.masterinfo) 

    disconnected(schedulerdriver) 

    resourceoffers(schedulerdriver, []*mesos.offer) 

    offerrescinded(schedulerdriver, *mesos.offerid) 

    statusupdate(schedulerdriver, *mesos.taskstatus) 

    frameworkmessage(schedulerdriver, *mesos.executorid, 

                     *mesos.slaveid, string) 

    slavelost(schedulerdriver, *mesos.slaveid) 

    executorlost(schedulerdriver, *mesos.executorid, *mesos.slaveid, 

                 int) 

    error(schedulerdriver, string) 

現在一起看一個回調函數:

func (s *minerscheduler) registered(_ sched.schedulerdriver, 

      frameworkid *mesos.frameworkid, masterinfo *mesos.masterinfo) { 

    log.infoln("framework registered with master ", masterinfo) 

func (s *minerscheduler) reregistered(_ sched.schedulerdriver, 

      masterinfo *mesos.masterinfo) { 

    log.infoln("framework re-registered with master ", masterinfo) 

func (s *minerscheduler) disconnected(sched.schedulerdriver) { 

    log.infoln("framework disconnected with master") 

registered在scheduler 成功向mesos master注冊之後被調用。

reregistered在scheduler 與mesos master斷開連接配接并且再次注冊時被調用,例如,在master重新開機的時候。

disconnected在scheduler 與mesos master斷開連接配接時被調用。這個在master挂了的時候會發生。

目前為止,這裡僅僅在回調函數中列印了日志資訊,因為對于一個像這樣的簡單架構,大多數回調函數可以空在那裡。然而,下一個回調函數就是每一個架構的核心,必須要認真的編寫。

resourceoffers在scheduler 從master那裡得到一個offer的時候被調用。每一個offer包含一個叢集上可以給架構使用的資源清單。資源通常包括cpu、記憶體、端口和磁盤。一個架構可以使用它提供的一些資源、所有資源或者一點資源都不給用。

針對每一個offer,現在期望聚集所有的提供的資源并決定是否需要釋出一個新的server任務或者一個新的worker任務。這裡可以向每個

offer發送盡可能多的任務以測試最大容量,但是由于開采比特币是依賴cpu的,是以這裡每個offer運作一個開采者任務并使用所有可用的cpu資

源。

for i, offer := range offers { 

    // … gather resource being offered and do setup 

    if !s.minerserverrunning && mems >= memperservertask && 

            cpus >= cpuperservertask && ports >= 2 { 

        // … launch a server task since no server is running and we 

        // have resources to launch it. 

    } else if s.minerserverrunning && mems >= memperdaemontask { 

        // … launch a miner since a server is running and we have mem 

        // to launch one. 

    } 

針對每個任務都需要建立一個對應的taskinfo message ,它包含了運作這個任務需要的資訊。

s.taskslaunched++ 

taskid = &mesos.taskid { 

    value: proto.string("miner-server-" + 

                        strconv.itoa(s.taskslaunched)), 

task ids由架構決定,并且每個架構必須是唯一的。 

containertype := mesos.containerinfo_docker 

task = &mesos.taskinfo { 

    name: proto.string("task-" + taskid.getvalue()), 

    taskid: taskid, 

    slaveid: offer.slaveid, 

    container: &mesos.containerinfo { 

        type: &containertype, 

        docker: &mesos.containerinfo_dockerinfo { 

            image: proto.string(minerserverdockerimage), 

        }, 

    }, 

    command: &mesos.commandinfo { 

        shell: proto.bool(false), 

        arguments: []string { 

            // these arguments will be passed to run_p2pool.py 

            "--bitcoind-address", s.bitcoindaddr, 

            "--p2pool-port", strconv.itoa(int(p2poolport)), 

            "-w", strconv.itoa(int(workerport)), 

            s.rpcuser, s.rpcpass, 

    resources: []*mesos.resource { 

        util.newscalarresource("cpus", cpuperservertask), 

        util.newscalarresource("mem", memperservertask), 

taskinfo message指定了一些關于任務的重要中繼資料資訊,它允許mesos節點運作docker容器,特别會指定name、task id、container information以及一些需要給容器傳遞的參數。這裡也會指定任務需要的資源。

現在taskinfo已經被建構好,是以任務可以這樣運作:

driver.launchtasks([]*mesos.offerid{offer.id}, tasks, &mesos.filters{refuseseconds: proto.float64(1)})

在架構中,需要處理的最後一件事情是當開采者server關閉時會發生什麼。這裡可以利用statusupdate 函數來處理。

在一個任務的生命周期中,針對不同的階段有不同類型的狀态更新。對這個架構來說,想要確定的是如果開采者server由于某種原因失敗,系統會kill所有開采者worker以避免浪費資源。這裡是相關的代碼:

if strings.contains(status.gettaskid().getvalue(), "server") && 

    (status.getstate() == mesos.taskstate_task_lost || 

        status.getstate() == mesos.taskstate_task_killed || 

        status.getstate() == mesos.taskstate_task_finished || 

        status.getstate() == mesos.taskstate_task_error || 

        status.getstate() == mesos.taskstate_task_failed) { 

    s.minerserverrunning = false 

    // kill all tasks 

    for _, taskid := range s.currentdaemontaskids { 

        _, err := driver.killtask(taskid) 

        if err != nil { 

            log.errorf("failed to kill task %s", taskid) 

        } 

    s.currentdaemontaskids = make([]*mesos.taskid, 0) 

萬事大吉!通過努力,這裡在apache mesos上建立一個正常工作的分布式比特币開采架構,它隻用了大約300行go代碼。這證明了使用mesos 架構的api編寫分布式系統是多麼快速和簡單。

來源:51cto