天天看點

kubernetes CRD學習筆記

前言

最近在訂閱了kubernetes的專欄,這篇文章是想記錄一下自己學習CRD(custom resource definition)的過程,加深一下記憶。

準備工作

首先安裝一下我們用的go依賴:

cd $GOPATH/src/
mkdir resouer
git clone https://github.com/resouer/k8s-controller-custom-resource.git
cd k8s-controller-custom-resource
godep restore      

然後參照github位址​,我自己也照着抄了一個,這是我的那個git位址,沒有注釋,加深一下印象,以下代碼還有目錄都是我自己建立的,建立目錄和檔案如下(目前目錄):

hongzhi:k8s-crd hongzhi.wang$ tree
.
├── controller.go
├── crd
│   └── network.yaml
├── example
│   └── example-network.yaml
├── main.go
└── pkg
    └── apis
        └── samplecrd
            ├── register.go
            └── v1
                ├── doc.go
                ├── register.go
                └── types.go      

并且參考原文填寫這些檔案,接着安裝

go get -u k8s.io/code-generator
cd $GOPATH/src/k8s.io/code-generator
godep restore      

然後生成代碼

ROOT_PACKAGE="k8s-crd"
CUSTOM_RESOURCE_NAME="samplecrd"
CUSTOM_RESOURCE_VERSION="v1"
cd $GOPATH/src/k8s.io/code-generator
./generate-groups.sh all "$ROOT_PACKAGE/pkg/client" "$ROOT_PACKAGE/pkg/apis" "$CUSTOM_RESOURCE_NAME:$CUSTOM_RESOURCE_VERSION"      

腳本運作結果:

hongzhi:code-generator hongzhi.wang$ ./generate-groups.sh all "$ROOT_PACKAGE/pkg/client" "$ROOT_PACKAGE/pkg/apis" "$CUSTOM_RESOURCE_NAME:$CUSTOM_RESOURCE_VERSION"
Generating deepcopy funcs
Generating clientset for samplecrd:v1 at k8s-crd/pkg/client/clientset
Generating listers for samplecrd:v1 at k8s-crd/pkg/client/listers
Generating informers for samplecrd:v1 at k8s-crd/pkg/client/informers      

之後代碼如下:(自己的types.go:37​行的代碼注釋寫錯了,然後在生成代碼之後,goland 隻有 networklist 提示networklist 不是 runtime.Object 類型,而 network 在生成代碼之前也提示了,生成之後錯誤提示就消失了。我仔細看了一下 runtime.Object 是一個 interface 類型,在 k8s.io/apimachinery/pkg/runtime 的目錄下的 interface.go 檔案中,goland 報錯應該是沒生成代碼之前我們自定義的 type 沒有完全實作 runtime.Object 這個接口。)

hongzhi:k8s-crd hongzhi.wang$ tree
.
├── controller.go
├── crd
│   └── network.yaml
├── example
│   └── example-network.yaml
├── main.go
└── pkg
    ├── apis
    │   └── samplecrd
    │       ├── register.go
    │       └── v1
    │           ├── doc.go
    │           ├── register.go
    │           ├── types.go
    │           └── zz_generated.deepcopy.go
    ├── client
    │   ├── clientset
    │   │   └── versioned
    │   │       ├── clientset.go
    │   │       ├── doc.go
    │   │       ├── fake
    │   │       │   ├── clientset_generated.go
    │   │       │   ├── doc.go
    │   │       │   └── register.go
    │   │       ├── scheme
    │   │       │   ├── doc.go
    │   │       │   └── register.go
    │   │       └── typed
    │   │           └── samplecrd
    │   │               └── v1
    │   │                   ├── doc.go
    │   │                   ├── fake
    │   │                   │   ├── doc.go
    │   │                   │   ├── fake_network.go
    │   │                   │   └── fake_samplecrd_client.go
    │   │                   ├── generated_expansion.go
    │   │                   ├── network.go
    │   │                   └── samplecrd_client.go
    │   ├── informers
    │   │   └── externalversions
    │   │       ├── factory.go
    │   │       ├── generic.go
    │   │       ├── internalinterfaces
    │   │       │   └── factory_interfaces.go
    │   │       └── samplecrd
    │   │           ├── interface.go
    │   │           └── v1
    │   │               ├── interface.go
    │   │               └── network.go
    │   └── listers
    │       └── samplecrd
    │           └── v1
    │               ├── expansion_generated.go
    │               └── network.go
    └── signals
        ├── signal.go
        ├── signal_posix.go
        └── signal_windows.go

23 directories, 31 files      

signals 目錄和裡面的内容需要自己建立。接下來建立crd和crd對象

cd $GOPATH/src/k8s-crd
hongzhi:k8s-crd hongzhi.wang$ kubectl apply -f crd/network.yaml
customresourcedefinition.apiextensions.k8s.io/networks.samplecrd.k8s.io created

hongzhi:k8s-crd hongzhi.wang$ kubectl apply -f example/example-network.yaml
network.samplecrd.k8s.io/example-network created

hongzhi:k8s-crd hongzhi.wang$ kubectl get crd
NAME                        CREATED AT
networks.samplecrd.k8s.io   2018-10-22T07:17:56Z
hongzhi:k8s-crd hongzhi.wang$ kubectl get network
NAME              AGE
example-network   18s      

整個 cunstom controller 的流程圖(張磊在AS深圳2018分享ppt中的圖檔):

kubernetes CRD學習筆記

接下來分析一下 main.go 和 controller.go :

main.go 中首先需要 kube apiserver 的配置,連上我們k8s叢集,然後就是主要的幾行代碼

networkInformerFactory := informers.NewSharedInformerFactory(networkClient, time.Second*30)
  controller := NewController( kubeClient, networkClient,
    networkInformerFactory.Smaplecrd().V1().Networks())
  go networkInformerFactory.Start(stopCh)
  if err = controller.Run(2, stopCh); err != nil{
    glog.Fatalf("Error running controller: %s", err.Error())
  }      

informers.NewSharedInformerFactory 這個函數傳回的是SharedInformerFactory這個interface,

然後controller := NewController( kubeClient, networkClient,networkInformerFactory.Smaplecrd().V1().Networks()),這裡面主要是把針對這個 crd的增,删,改注冊到 eventhandler 中。

這裡需要說下,這個增删改是針對 reflector 從 apiserver 接到事件的時候發出的,time.Second*30這個時間是 local cache 和 apiserver 的定時同步時間,同步完成會觸發 updateFunc ,這個 update 操作是對所有的network對象的,這時需要對比一下r esource version 如果一樣就不需要進入 workqueue。這個增删改正常隻是 apiserver 接到請求後傳給 reflector 的,比如我們通過 kubectl 删了一個 network 對象,這個删除的操作會通過 apiserver 然後觸發 DeleteFunc。

然後調用 SharedInformerFactory 這個 interface 的 Start 方法,其實這個隻是啟動 reflector 監聽 apiserver 中n etwork 對象的變化。

然後調用 func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) 方法啟動 controller 。

for i:=0 ; i < threadiness; i++ {
    go wait.Until(c.runWorker, time.Second, stopCh)
  }      

這是要啟動 threadiness 個 go 的協程,來進行 reconcile loop。其中期望狀态是從 cache 中擷取的,實際狀态是從 k8s 叢集是實時擷取的。整個流程是 apiserver 接到請求(增删改),然後發給 reflector ,reflector 調用對應的方法,把這個 key(這個 key 就是 namespace/name 的字元串)加到 workqueue 中,然後根據對應的請求操作 cache ,reconcile loop 從 workqueue 拿到這個 key ,然後從 cache 中取這個 key 如果報 notfound error,reconcile loop 接着就會從實際的叢集中删除這個 network 對象。如果 cache 中有這個 key ,那麼就看實際狀态有沒有,沒有就建立,有就對比,不一樣就更新。

main.go

package main

import (
  clientset "k8s-crd/pkg/client/clientset/versioned"
  informers "k8s-crd/pkg/client/informers/externalversions"
  "flag"
  "k8s-crd/pkg/signals"
  "k8s.io/client-go/tools/clientcmd"
  "github.com/golang/glog"
  "k8s.io/client-go/kubernetes"
  "time"
)

var (
  masterURL string
  kubeconfig string
)

func main()  {
  flag.Parse()
  stopCh := signals.SetupSignalHandler()

  cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
  if err != nil {
    glog.Fatalf("Error building kubecofnig: %s", err.Error())
  }

  kubeClient, err := kubernetes.NewForConfig(cfg)
  if err != nil {
    glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
  }

  networkClient, err := clientset.NewForConfig(cfg)
  if err != nil {
    glog.Fatalf("Error building example clientset: %s", err.Error())
  }

  networkInformerFactory := informers.NewSharedInformerFactory(networkClient, time.Second*30)

  controller := NewController( kubeClient, networkClient,
    networkInformerFactory.Smaplecrd().V1().Networks())

  go networkInformerFactory.Start(stopCh)

  if err = controller.Run(2, stopCh); err != nil{
    glog.Fatalf("Error running controller: %s", err.Error())
  }
}

func init()  {
  flag.StringVar(&kubeconfig, "kubeconfig","","Path to kubeconfig")
  flag.StringVar(&masterURL, "master", "","The address of the kubernetes api server")
}      
controller.go

package main


import (
  corev1 "k8s.io/api/core/v1"

  utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
  samplecrdv1 "k8s-crd/pkg/apis/samplecrd/v1"
  clientset "k8s-crd/pkg/client/clientset/versioned"
  networkscheme "k8s-crd/pkg/client/clientset/versioned/scheme"
  informers "k8s-crd/pkg/client/informers/externalversions/samplecrd/v1"
  listers "k8s-crd/pkg/client/listers/samplecrd/v1"
  "k8s.io/client-go/kubernetes"
  "k8s.io/client-go/tools/cache"
  "k8s.io/client-go/util/workqueue"
  "k8s.io/client-go/tools/record"
  "k8s.io/client-go/kubernetes/scheme"
  "github.com/golang/glog"
  "github.com/contrib/service-loadbalancer/Godeps/_workspace/src/k8s.io/kubernetes/pkg/util/runtime"
  "fmt"
  "k8s.io/apimachinery/pkg/util/wait"

  "time"
  "k8s.io/apimachinery/pkg/api/errors"
)

const controllerAgentName  = "network-controller"

const (
  SuccessSynced = "Synced"
  MessageResourceSynced = "Network synced successfully"
)

type Controller struct {
  kubeclientset kubernetes.Interface
  networkclientset clientset.Interface

  networksLister listers.NetworkLister
  networksSynced cache.InformerSynced

  workqueue workqueue.RateLimitingInterface

  recorder record.EventRecorder
}

func NewController(
  kubeclientset kubernetes.Interface,
  networkclientset clientset.Interface,
  networkInformer informers.NetworkInformer) *Controller  {


  utilruntime.Must(networkscheme.AddToScheme(scheme.Scheme))
  glog.V(4).Info("Creating event broadcaster")
  eventBroadcaster := record.NewBroadcaster()
  eventBroadcaster.StartLogging(glog.Infof)
  eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface:kubeclientset.CoreV1().Events("")})
  recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

  controller := &Controller{
    kubeclientset: kubeclientset,
    networkclientset: networkclientset,
    networksLister: networkInformer.Lister(),
    networksSynced: networkInformer.Informer().HasSynced,
    workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),"Networks"),
    recorder: recorder,
  }

  glog.Info("Setting up event handlers")
  networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: controller.enqueueNetwork,
    UpdateFunc: func(old, new interface{}) {
      oldNetwork := old.(*samplecrdv1.Network)
      newNetwork := new.(*samplecrdv1.Network)
      if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
        return
      }
      controller.enqueueNetwork(new)
    },
    DeleteFunc: controller.enqueueNetworkForDelete,
  })
  return controller
}

func (c *Controller) Run(threadiness int, stopCh <- chan struct{}) error {
  defer runtime.HandleCrash()
  defer c.workqueue.ShutDown()

  glog.Info("Starting Network control loop")

  glog.Info("Waiting for informer caches to sync")
  if ok := cache.WaitForCacheSync(stopCh, c.networksSynced); !ok {
    return fmt.Errorf("failed to wait for caches to sync")
  }

  glog.Info("Starting workers")
  for i:=0 ; i < threadiness; i++ {
    go wait.Until(c.runWorker, time.Second, stopCh)
  }

  glog.Info("Started workers")
  <-stopCh
  glog.Info("shutting down workers")

  return nil
}

func (c *Controller) runWorker()  {
  for c.processNextWorkItem(){

  }
}

func (c *Controller) processNextWorkItem() bool {
  obj, shutdown := c.workqueue.Get()
  if shutdown {
    return false
  }

  err := func(obj interface{}) error {
    defer c.workqueue.Done(obj)
    var key string
    var ok bool

    if key,ok = obj.(string); !ok {
      c.workqueue.Forget(obj)
      runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
      return nil
    }

    if err := c.syncHandler(key); err != nil {
      return fmt.Errorf("error syncing '%s' : %s", key, err.Error())
    }

    c.workqueue.Forget(obj)
    glog.Infof("Successfully synced '%s'", key)
    return nil
  }(obj)
  if err != nil {
    runtime.HandleError(err)
    return true
  }
  return true
}


func (c *Controller) syncHandler(key string) error  {
  namespace, name , err := cache.SplitMetaNamespaceKey(key)
  if err != nil {
    runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
    return nil
  }
  network, err := c.networksLister.Networks(namespace).Get(name)
  if err != nil {
    if errors.IsNotFound(err){
      glog.Warningf("Network: %s/%s does not exist in local cache, will delete it from Neutron ...",
        namespace, name)
      glog.Infof("[Neutron] Deleting network: %s/%s ...", namespace, name)
      return nil
    }
    runtime.HandleError(fmt.Errorf("failed to list network by : %s/%s", namespace, name))
    return nil
  }
  glog.Infof("[Neutron] Try to process network: %#v ...", network)

  c.recorder.Event(network, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
  return nil
}


func (c *Controller) enqueueNetwork(obj interface{})  {
  var key string
  var err error
  if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil{
    runtime.HandleError(err)
    return
  }
  c.workqueue.AddRateLimited(key)
}


func (c *Controller) enqueueNetworkForDelete(obj interface{})  {
  key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
  if err != nil {
    runtime.HandleError(err)
    return
  }
  c.workqueue.AddRateLimited(key)
}