天天看點

distribution源碼分析(三):registry pull操作詳細流程

1. 前言

倉庫的設計初衷就是為了存儲鏡像資料并提供上傳下載下傳鏡像服務的,是以與鏡像存儲以及鏡像資料傳輸是非常重要的方面。本節中将對鏡像存儲以及與docker端的資料傳輸過程做出詳細解析。

2. 本文分析内容安排

  • 建立連接配接
  • 接受request并分發到handler分發以及proxy
  • manifest傳輸
  • data傳輸

3. 建立連接配接

建立連接配接前的初始化工作主要是對于Registry.App的初始化,初始化的流程如圖3.1所示:

distribution源碼分析(三):registry pull操作詳細流程

圖3.1 建立連接配接流程

上述流程圖是registry初始化然後提供給docker http服務的全過程,其中最後三步之前對應的是distribution/registry/registry.go中Cmd變量定義中的

registry, err := NewRegistry(ctx, config)

這行代碼,主要是對registry本身的初始化,包括Handler、storage、endpoint等一切和鏡像管理相關的結構;最後三步是根據配置好的registry調用http Listener 和 Server提供服務,對應于distribution/registry/registry.go中Cmd變量定義中的

registry.ListenAndServe()

。實際上,最早接收到docker端請求在後三步,這三步中包括了接收請求以及傳回結果的接口。具體流程是Listener接收到請求後,根據之前NewRegistry配置的Handler調用相應的函數到注冊的storage中讀取資料,然後通過Serve接口将結果傳回給docker端。可見,将Listener作為切入點研究distribution代碼,便可以一步步弄清楚整個流程。

ListenAndServer函數在系列(二)中已經介紹過了,主要語句是

ln, err := listener.NewListener(config.HTTP.Net, config.HTTP.Addr)

監聽連接配接,之後

registry.server.Serve(ln)

,建立持續連接配接并根據server中注冊的Handler、storage等提供服務。Serve是net/http包中的函數,會通過route調用恰當的Handler來提供服務。至此,可以說建立連接配接的過程已經完成,接下來是收到request并分發到相應handler提供服務了。

4. 接受request并分發到handler分發以及proxy

注冊handler并提供服務是net/http包提供的原生功能,distribution直接利用了go語言的該功能。

4.1 go語言net/http注入Handler原生特性

func ListenAndServe(addr string, handler Handler) error

該方法用于在指定的 addr 位址進行監聽,然後調用服務端處理程式來處理傳入的連結請求。第二個參數表示服務端處理程式,如果為空,意味着調用http.DefaultServeMux進行處理,而服務端編寫的業務邏輯處理程式http.Handle()或http.HandleFunc()預設注入http.DefaultServeMux中,示例如下:

http.Handle("/foo",fooHandler)
http.HandleFunc("/bar",func(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "Hello, %q", html.EscapeString(r.URL.Path))
})
log.Fatal(http.ListenAndServe(":8080", nil))
           

也可以自己重新定義http.Server,将Handler直接寫入Serve中,這樣非但不用再調用http.Handle或者http.HandleFunc注冊而且可以更多地控制服務端的行為,distribution源碼就是這麼做的,在NewRegistry函數中就已經重新定義了Server,将handler注入了,并添加了很多控制行為。這裡先不說distribution,而是舉個例子說明下用法:

s := &http.Server{
    Addr:                    ":8080",
    Handler:               myHandler,
    ReadTimeout:     *time.Second,
    WriteTimeout:    *time.Second,
    MaxHeaderBytes:  <,
}
log.Fatal(s.ListenAndServe())
           

4.2 distribution中Handler注入實作

這裡從後向前推,在distribution/registry/registry.go中,NewRegistry在最後傳回之前的語句為

server := &http.Server{
    Handler: handler,
}
           

可見,是對Server做了重新定義,主要是注入了Handler處理函數,處理函數為handler,定義在

handler := configureReporting(app)

,在該函數中最重要的一行代碼為

var handler http.Handler = app

,因為http.Handler接口隻有ServeHTTP一個函數,handlers.App實作了該函數,是以便實作了http.Handler接口。可知,app即為distribution注入的接收請求後的處理函數。具體的注冊是在NewApp中的這幾行

// Register the handler dispatchers.
    app.register(v2.RouteNameBase, func(ctx *Context, r *http.Request) http.Handler {
        return http.HandlerFunc(apiBase)
    })
    app.register(v2.RouteNameManifest, imageManifestDispatcher)
    app.register(v2.RouteNameCatalog, catalogDispatcher)
    app.register(v2.RouteNameTags, tagsDispatcher)
    app.register(v2.RouteNameBlob, blobDispatcher)
    app.register(v2.RouteNameBlobUpload, blobUploadDispatcher)
    app.register(v2.RouteNameBlobUploadChunk, blobUploadDispatcher)
           

這裡以blobDispatcher為例講一下,該函數的實作位于registry/handlers/blob.go中,代碼如下:

// blobDispatcher uses the request context to build a blobHandler.
func blobDispatcher(ctx *Context, r *http.Request) http.Handler {
    dgst, err := getDigest(ctx)
    if err != nil {

        if err == errDigestNotAvailable {
            return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                ctx.Errors = append(ctx.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err))
            })
        }

        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            ctx.Errors = append(ctx.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err))
        })
    }

    blobHandler := &blobHandler{
        Context: ctx,
        Digest:  dgst,
    }

    return handlers.MethodHandler{
        "GET":    http.HandlerFunc(blobHandler.GetBlob),
        "HEAD":   http.HandlerFunc(blobHandler.GetBlob),
        "DELETE": http.HandlerFunc(blobHandler.DeleteBlob),
    }
}
           

由以上代碼可知,傳回的是一個MethodHandler的map,其中不僅包含http.HandlerFunc,還包含一個查找該Handler的string,這主要是因為提供服務會調用接口Handler的ServeHTTP函數提供服務,distribution對ServeHTTP也做了針對于兩個參數的更改。該函數的實作位于gorilla/handlers/handlers.go中,其中最重要的代碼為:

if handler, ok := h[req.Method]; ok {
        handler.ServeHTTP(w, req)
}
           

可見,在此将兩個參數化為針對于特定于GET、HEAD或是DELETE的處理函數,最終調用的還是http.HandlerFunc,比如當為GET時,調用的是

http.HandlerFunc(blobHandler.GetBlob)

,具體提供服務的就是blobHandler.GetBlob函數,到此已經涉及到了取資料以及之後的傳資料,在第六節展開。

5. manifest傳輸

在第四節中是以blobDispatcher為例展開叙述的,是以到最後是HEAD或者data的傳輸,如果以imageManifestDispatcher為例展開叙述,那麼最後傳輸的就是鏡像的manifest,具體流程相似,可以參照上一小節。

HandlerFunc中最後注入的函數為GetImagemanifest,實作位于registry/handlers/images.go中,它從後端存儲中得到manifest資料并且寫入http中傳回給請求端,整體流程比較清晰,取了資料後直接放到http.ResponseWriter中傳回了,是以在此不再贅述,下一節data傳輸相對來說流程相似,但更加複雜,将在那部分詳細介紹整個流程。

// GetImageManifest fetches the image manifest from the storage backend, if it exists.
func (imh *imageManifestHandler) GetImageManifest(w http.ResponseWriter, r *http.Request) {
    ctxu.GetLogger(imh).Debug("GetImageManifest")
    manifests, err := imh.Repository.Manifests(imh)
    if err != nil {
        imh.Errors = append(imh.Errors, err)
        return
    }
    var sm *schema1.SignedManifest
    if imh.Tag != "" {
        sm, err = manifests.GetByTag(imh.Tag)
    } else {
        if etagMatch(r, imh.Digest.String()) {
            w.WriteHeader(http.StatusNotModified)
            return
        }
        sm, err = manifests.Get(imh.Digest)
    }
    if err != nil {
        imh.Errors = append(imh.Errors, v2.ErrorCodeManifestUnknown.WithDetail(err))
        return
    }
    // Get the digest, if we don't already have it.
    if imh.Digest == "" {
        dgst, err := digestManifest(imh, sm)
        if err != nil {
            imh.Errors = append(imh.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err))
            return
        }
        if etagMatch(r, dgst.String()) {
            w.WriteHeader(http.StatusNotModified)
            return
        }
        imh.Digest = dgst
    }
    w.Header().Set("Content-Type", "application/json; charset=utf-8")
    w.Header().Set("Content-Length", fmt.Sprint(len(sm.Raw)))
    w.Header().Set("Docker-Content-Digest", imh.Digest.String())
    w.Header().Set("Etag", fmt.Sprintf(`"%s"`, imh.Digest))
    w.Write(sm.Raw)
}
           

6. data傳輸

第四節最後已經說到,HandlerFunc裡最後注入的函數為blobHandler.GetBlob,該函數位于registry/handlers/blob.go中,如下所示。

其中,

desc, err := blobs.Stat(bh, bh.Digest)

實作位于registry/storage/blobstore.go中,傳回的是一個distribution.Descriptor,該結構包括MediaType、Size和Digest,可以用來fetch、store和target任何blob。最開始的代碼

path, err := pathFor(blobDataPathSpec{digest: dgst,})

根據digest傳回blob的路徑,該路徑是從/docker/registry/v2開始的,不包括在yaml檔案中配置的那部分字首。這個函數并沒有直接和磁盤互動,因為distribution将manifest、tag、blob等内容存入特定的目錄,是以在這裡隻是根據使用者要提取的内容組合出相應的目錄傳回。但是在pathFor之後的代碼确實通過driver和磁盤互動了,傳回了檔案的大小以及建立時間等資訊,針對于本地檔案系統,函數的實作位于registry/storage/driver/filesystem/driver.go中,該函數組建了blob的全路徑并通過讀磁盤确立了路徑的有效性。

// GetBlob fetches the binary data from backend storage returns it in the response.
func (bh *blobHandler) GetBlob(w http.ResponseWriter, r *http.Request) {
    context.GetLogger(bh).Debug("GetBlob")
    blobs := bh.Repository.Blobs(bh)
    desc, err := blobs.Stat(bh, bh.Digest)
    if err != nil {
        if err == distribution.ErrBlobUnknown {
            bh.Errors = append(bh.Errors, v2.ErrorCodeBlobUnknown.WithDetail(bh.Digest))
        } else {
            bh.Errors = append(bh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
        }
        return
    }
    if err := blobs.ServeBlob(bh, w, r, desc.Digest); err != nil {
        context.GetLogger(bh).Debugf("unexpected error getting blob HTTP handler: %v", err)
        bh.Errors = append(bh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
        return
    }
}
           

再繼續向下講之前先介紹一個結構體fileReader,提供了一個存儲在storagedriver中的檔案的read seeker接口。可見該結構體記錄了後端使用的存儲方式、檔案路徑以及大小等。

type fileReader struct {
    driver storagedriver.StorageDriver
    ctx context.Context
    // identifying fields
    path string
    size int64 // size is the total size, must be set.
    // mutable fields
    rc     io.ReadCloser // remote read closer
    brd    *bufio.Reader // internal buffered io
    offset int64         // offset is the current read offset
    err    error         // terminal error, if set, reader is closed
}
           

回到GetBlob函數中,最後的blobs.ServeBlob的實作位于registry/storage/blobserver.go中,下面的代碼列出比較重要的部分,newFileReader傳回一個記錄了存儲方式、檔案路徑以及大小等組成的fileReader結構,後面就可以根據這個調用針對于特定檔案系統的read函數了。

得到fileReader後就是設定http.ResponseWriter的頭部資訊了,最後一步用的是http.ServeContent函數,将具體blob的内容傳回給docker端。

golang // Fallback to serving the content directly. br, err := newFileReader(ctx, bs.driver, path, desc.Size) if err != nil { return err } defer br.Close() w.Header().Set("ETag", fmt.Sprintf(`"%s"`, desc.Digest)) // If-None-Match handled by ServeContent w.Header().Set("Cache-Control", fmt.Sprintf("max-age=%.f", blobCacheControlMaxAge.Seconds())) if w.Header().Get("Docker-Content-Digest") == "" { w.Header().Set("Docker-Content-Digest", desc.Digest.String()) } if w.Header().Get("Content-Type") == "" { // Set the content type if not already set. w.Header().Set("Content-Type", desc.MediaType) } if w.Header().Get("Content-Length") == "" { // Set the content length if not already set. w.Header().Set("Content-Length", fmt.Sprint(desc.Size)) } http.ServeContent(w, r, desc.Digest.String(), time.Time{}, br)

7. 總結

以上内容包括了監聽docker請求,配置設定Handler從registry存儲後端讀取資料,讀取時會根據不同的後端調用相應的storagedriver,讀取資料之後将資料傳回給docker端,可知,内容已經涵蓋了pull操作的所有部件,讀懂後便對distribution的架構有所了解了,下節會介紹push操作流程。

8. 作者介紹

梁明遠,國防科大并行與分布式計算國家重點實驗室應屆研究所學生,14年入學伊始便開始接觸docker,準備在餘下的讀研時間在docker相關開源社群貢獻自己的代碼,畢業後準備繼續從事該方面研究。郵箱:[email protected]

9. 參考文獻