天天看點

client-go之dynamic包(下篇)源碼分析client-go之dynamic包(下篇)源碼分析dynamic包

client-go之dynamic包(下篇)源碼分析

@TOC

dynamic包

用于動态生成informer/lister/client等

dynamiclister包

  • interface.go 定義了擷取(從索引器(緩存)中get/list)obj的接口
    • 接口
      // Lister 擷取資源和擷取NamespaceLister。
      type Lister interface {
      	// List 列出索引器(緩存)中的所有資源。
      	List(selector labels.Selector) (ret []*unstructured.Unstructured, err error)
      	// Get 從索引器(緩存)中檢索具有給定名稱的資源
      	Get(name string) (*unstructured.Unstructured, error)
      	// Namespace 傳回一個對象,該對象可以列出和擷取給定命名空間中的資源。
      	Namespace(namespace string) NamespaceLister
      }
      
      // NamespaceLister 擷取命名空間下的資源。類似于controller-runtime分析client包的Reader接口
      type NamespaceLister interface {
      	// List 列出索引器(緩存)中給定命名空間的所有資源。
      	List(selector labels.Selector) (ret []*unstructured.Unstructured, err error)
      	// Get 從索引器(緩存)中檢索給定命名空間和名稱的資源。
      	Get(name string) (*unstructured.Unstructured, error)
      }
                 
  • lister.go
    • 結構體
      // dynamicLister 實作了 Lister 接口。
      type dynamicLister struct {
        // 索引器(緩存)
        indexer cache.Indexer
        // 索引器對應的資源gvr,該索引器隻存儲該gvr對應的資源
        gvr     schema.GroupVersionResource
      }
      
      // List 列出索引器中的所有資源。
      func (l *dynamicLister) List(selector labels.Selector) (ret []*unstructured.Unstructured, err error) {
        // 該方法到對應包在做具體分析,用來擷取符合selector對應添加的item
        err = cache.ListAll(l.indexer, selector, func(m interface{}) {
            // 符合添加的item會追加到ret
      	  ret = append(ret, m.(*unstructured.Unstructured))
        })
        return ret, err
      }
      
      // Get 從索引器中檢索具有給定名稱的資源
      func (l *dynamicLister) Get(name string) (*unstructured.Unstructured, error) {
        // 沒有使用索引,這裡為什麼沒有判斷是否是l中對應的gvr? 因為在add到indexer時已經做了區分,不同的gvr在不同的indexer中
        obj, exists, err := l.indexer.GetByKey(name)
        if err != nil {
      	  return nil, err
        }
        if !exists {
      	  return nil, errors.NewNotFound(l.gvr.GroupResource(), name)
        }
        return obj.(*unstructured.Unstructured), nil
      }
      
      // Namespace 傳回一個對象,該對象可以從給定的命名空間中列出和擷取資源.
      func (l *dynamicLister) Namespace(namespace string) NamespaceLister {
      	return &dynamicNamespaceLister{indexer: l.indexer, namespace: namespace, gvr: l.gvr}
      }
      
      // dynamicNamespaceLister 實作了 NamespaceLister 接口。相比dynamicLister多了namespace屬性,用來限定namespace
      type dynamicNamespaceLister struct {
        // 索引器(緩存)
        indexer   cache.Indexer
        // 命名空間
        namespace string
        // 索引器對應的資源gvr,該索引器隻存儲該gvr對應的資源
        gvr       schema.GroupVersionResource
      }
      
      // List 列出索引器中給定命名空間的所有資源。
      func (l *dynamicNamespaceLister) List(selector labels.Selector) (ret []*unstructured.Unstructured, err error) {
        // 該方法到對應包在做具體分析,用來擷取符合selector和namespace對應添加的item
        err = cache.ListAllByNamespace(l.indexer, l.namespace, selector, func(m interface{}) {
          	ret = append(ret, m.(*unstructured.Unstructured))
        })
        return ret, err
      }
      
      // Get 從索引器中檢索給定命名空間和名稱的資源。
      func (l *dynamicNamespaceLister) Get(name string) (*unstructured.Unstructured, error) {
        // 注意: 這裡可以看到indexer中items的存放,當namespace不為空時,key是${namespace}/${name}
        obj, exists, err := l.indexer.GetByKey(l.namespace + "/" + name)
        if err != nil {
      	  return nil, err
        }
        if !exists {
      	  return nil, errors.NewNotFound(l.gvr.GroupResource(), name)
        }
        return obj.(*unstructured.Unstructured), nil
      }
                 
    • 函數
      // New 傳回一個新的 Lister.
      func New(indexer cache.Indexer, gvr schema.GroupVersionResource) Lister {
      	return &dynamicLister{indexer: indexer, gvr: gvr}
      }
                 
  • shim.go
    • 結構體
      // dynamicListerShim 實作了 cache.GenericLister(後面章節再詳細介紹) 接口。
      // 包裝了Lister接口,其實沒有多少差别,隻是List把傳回slice中Unstructured對象變為object對象
      type dynamicListerShim struct {
      	lister Lister
      }
      
      // List 将傳回跨命名空間的所有對象
      func (s *dynamicListerShim) List(selector labels.Selector) (ret []runtime.Object, err error) {
          // 
      	objs, err := s.lister.List(selector)
      	if err != nil {
      		return nil, err
      	}
      
          // 傳回slice中Unstructured對象變為Object對象
      	ret = make([]runtime.Object, len(objs))
      	for index, obj := range objs {
      		ret[index] = obj
      	}
      	return ret, err
      }
      
      func (s *dynamicListerShim) Get(name string) (runtime.Object, error) {
      	return s.lister.Get(name)
      }
      
      // 擷取限定命名空間的Lister
      func (s *dynamicListerShim) ByNamespace(namespace string) cache.GenericNamespaceLister {
      	return &dynamicNamespaceListerShim{
      		namespaceLister: s.lister.Namespace(namespace),
      	}
      }
      
      // dynamicNamespaceListerShim 實作了 NamespaceLister 接口。
      // 它包裝了 NamespaceLister 以便它實作 cache.GenericNamespaceLister 接口
      // 其實沒有多少差别,隻是List把傳回slice中Unstructured對象變為object對象
      type dynamicNamespaceListerShim struct {
      	namespaceLister NamespaceLister
      }
      
      // List 将傳回此命名空間中的所有對象
      func (ns *dynamicNamespaceListerShim) List(selector labels.Selector) (ret []runtime.Object, err error) {
      	objs, err := ns.namespaceLister.List(selector)
      	if err != nil {
      		return nil, err
      	}
          // 傳回slice中Unstructured對象變為Object對象
      	ret = make([]runtime.Object, len(objs))
      	for index, obj := range objs {
      		ret[index] = obj
      	}
      	return ret, err
      }
      
      // Get 将嘗試按命名空間和名稱檢索
      func (ns *dynamicNamespaceListerShim) Get(name string) (runtime.Object, error) {
      	return ns.namespaceLister.Get(name)
      }
                 

dynamicinformer包

  • interface.go
    • 接口 定義了擷取Informer的方法,等待緩存同步的方法,啟動所有informer的方法
      // Dynamic SharedInformerFactory 為動态用戶端提供對共享informer和lister的通路
      type DynamicSharedInformerFactory interface {
          // 啟動所有informer的方法
      	Start(stopCh <-chan struct{})
          // 擷取Informer的方法
      	ForResource(gvr schema.GroupVersionResource) informers.GenericInformer
          // 等待緩存同步的方法
      	WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
      }
      
      // TweakListOptionsFunc 定義了一個輔助函數的簽名,想要為 API 提供更多的清單選項
      type TweakListOptionsFunc func(*metav1.ListOptions)
                 
  • informer.go
    • 函數
      // NewDynamicSharedInformerFactory 為所有命名空間構造一個 dynamicSharedInformerFactory 的新執行個體。
      func NewDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration) DynamicSharedInformerFactory {
      	return NewFilteredDynamicSharedInformerFactory(client, defaultResync, metav1.NamespaceAll, nil)
      }
      
      // NewFilteredDynamicSharedInformerFactory 構造了一個 dynamicSharedInformerFactory 的新執行個體。 
      // 通過此工廠獲得的lister将受到此處指定的相同過濾器的限制。
      func NewFilteredDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration, namespace string, tweakListOptions TweakListOptionsFunc) DynamicSharedInformerFactory {
      	return &dynamicSharedInformerFactory{
      		client:           client,
      		defaultResync:    defaultResync,
      		namespace:        namespace,
      		informers:        map[schema.GroupVersionResource]informers.GenericInformer{},
      		startedInformers: make(map[schema.GroupVersionResource]bool),
      		tweakListOptions: tweakListOptions,
      	}
      }
      
      // NewFilteredDynamicInformer 為動态類型構造一個新的 Informer。
      func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
      	return &dynamicInformer{
      		gvr: gvr,
              // 建立共享的SharedIndexInformer
      		informer: cache.NewSharedIndexInformer(
      			&cache.ListWatch{
      				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
      					if tweakListOptions != nil {
      						tweakListOptions(&options)
      					}
      					return client.Resource(gvr).Namespace(namespace).List(context.TODO(), options)
      				},
      				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
      					if tweakListOptions != nil {
      						tweakListOptions(&options)
      					}
      					return client.Resource(gvr).Namespace(namespace).Watch(context.TODO(), options)
      				},
      			},
      			&unstructured.Unstructured{},
      			resyncPeriod,
      			indexers,
      		),
      	}
      }
                 
    • 結構體
      type dynamicSharedInformerFactory struct {
          // 建構ListWatch接口使用,為後來建構reflector,執行listWatch監控api resource提供client
      	client        dynamic.Interface
          // 同步周期,informer同步deltaFIFO中資料到listener中的chan中
      	defaultResync time.Duration
          // 命名空間
      	namespace     string
      
      	lock      sync.Mutex
          // 緩存informer到map中
      	informers map[schema.GroupVersionResource]informers.GenericInformer
      	// startInformers 用于跟蹤哪些 Informers 已啟動。這允許安全地多次調用 Start()。
      	startedInformers map[schema.GroupVersionResource]bool
      	tweakListOptions TweakListOptionsFunc
      }
      
      // 實作DynamicSharedInformerFactory的ForResource方法
      func (f *dynamicSharedInformerFactory) ForResource(gvr schema.GroupVersionResource) informers.GenericInformer {
      	f.lock.Lock()
      	defer f.lock.Unlock()
      
      	key := gvr
          // 擷取緩存map中的informer
      	informer, exists := f.informers[key]
      	if exists {
      		return informer
      	}
          // 不存在就建立
      	informer = NewFilteredDynamicInformer(f.client, gvr, f.namespace, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
      	f.informers[key] = informer
      
      	return informer
      }
      
      // 實作SharedInformerFactory的Start方法,啟動所有未啟動的informer。
      func (f *dynamicSharedInformerFactory) Start(stopCh <-chan struct{}) {
      	f.lock.Lock()
      	defer f.lock.Unlock()
      
          // 周遊所有informer
      	for informerType, informer := range f.informers {
              // 判斷該informer是否已經啟動
      		if !f.startedInformers[informerType] {
                  // 啟動informer
      			go informer.Informer().Run(stopCh)
                  // 設定對應gvr的informer已經啟動
      			f.startedInformers[informerType] = true
      		}
      	}
      }
      
      // 實作SharedInformerFactory的WaitForCacheSync方法,等待所有啟動的informer的緩存同步。
      func (f *dynamicSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool {
      	informers := func() map[schema.GroupVersionResource]cache.SharedIndexInformer {
      		f.lock.Lock()
      		defer f.lock.Unlock()
              // 定義map,用于接收所有已經啟動的informer
      		informers := map[schema.GroupVersionResource]cache.SharedIndexInformer{}
      		for informerType, informer := range f.informers {
      			if f.startedInformers[informerType] {
      				informers[informerType] = informer.Informer()
      			}
      		}
      		return informers
      	}()
          // 定義map,用于接收所有同步完成的informer
      	res := map[schema.GroupVersionResource]bool{}
          // 周遊已經啟動的所有informer
      	for informType, informer := range informers {
              // 執行同步方法
              // (1) 如果informer中controller為空,傳回false,
              // (2) 如果informer.controller的queue還沒有調用過Add/Update/Delete/AddIfNotPresent或者queue的initialPopulationCount != 0 (隊列中還有資料),傳回false
      		res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
      	}
      	return res
      }
      
      // 動态Informer結構體,包裝了SharedIndexInformer和gvr
      type dynamicInformer struct {
      	informer cache.SharedIndexInformer
      	gvr      schema.GroupVersionResource
      }
      
      // 實作GenericInformer的Informer方法
      func (d *dynamicInformer) Informer() cache.SharedIndexInformer {
      	return d.informer
      }
      
      // 實作GenericInformer的Lister方法,使用dynamicInformer的indexer和gvr構造以Lister
      func (d *dynamicInformer) Lister() cache.GenericLister {
      	return dynamiclister.NewRuntimeObjectShim(dynamiclister.New(d.informer.GetIndexer(), d.gvr))
      }