client-go之dynamic包(下篇)源碼分析
@TOC
dynamic包
用于動态生成informer/lister/client等
dynamiclister包
- interface.go 定義了擷取(從索引器(緩存)中get/list)obj的接口
- 接口
// Lister 擷取資源和擷取NamespaceLister。 type Lister interface { // List 列出索引器(緩存)中的所有資源。 List(selector labels.Selector) (ret []*unstructured.Unstructured, err error) // Get 從索引器(緩存)中檢索具有給定名稱的資源 Get(name string) (*unstructured.Unstructured, error) // Namespace 傳回一個對象,該對象可以列出和擷取給定命名空間中的資源。 Namespace(namespace string) NamespaceLister } // NamespaceLister 擷取命名空間下的資源。類似于controller-runtime分析client包的Reader接口 type NamespaceLister interface { // List 列出索引器(緩存)中給定命名空間的所有資源。 List(selector labels.Selector) (ret []*unstructured.Unstructured, err error) // Get 從索引器(緩存)中檢索給定命名空間和名稱的資源。 Get(name string) (*unstructured.Unstructured, error) }
- 接口
- lister.go
- 結構體
// dynamicLister 實作了 Lister 接口。 type dynamicLister struct { // 索引器(緩存) indexer cache.Indexer // 索引器對應的資源gvr,該索引器隻存儲該gvr對應的資源 gvr schema.GroupVersionResource } // List 列出索引器中的所有資源。 func (l *dynamicLister) List(selector labels.Selector) (ret []*unstructured.Unstructured, err error) { // 該方法到對應包在做具體分析,用來擷取符合selector對應添加的item err = cache.ListAll(l.indexer, selector, func(m interface{}) { // 符合添加的item會追加到ret ret = append(ret, m.(*unstructured.Unstructured)) }) return ret, err } // Get 從索引器中檢索具有給定名稱的資源 func (l *dynamicLister) Get(name string) (*unstructured.Unstructured, error) { // 沒有使用索引,這裡為什麼沒有判斷是否是l中對應的gvr? 因為在add到indexer時已經做了區分,不同的gvr在不同的indexer中 obj, exists, err := l.indexer.GetByKey(name) if err != nil { return nil, err } if !exists { return nil, errors.NewNotFound(l.gvr.GroupResource(), name) } return obj.(*unstructured.Unstructured), nil } // Namespace 傳回一個對象,該對象可以從給定的命名空間中列出和擷取資源. func (l *dynamicLister) Namespace(namespace string) NamespaceLister { return &dynamicNamespaceLister{indexer: l.indexer, namespace: namespace, gvr: l.gvr} } // dynamicNamespaceLister 實作了 NamespaceLister 接口。相比dynamicLister多了namespace屬性,用來限定namespace type dynamicNamespaceLister struct { // 索引器(緩存) indexer cache.Indexer // 命名空間 namespace string // 索引器對應的資源gvr,該索引器隻存儲該gvr對應的資源 gvr schema.GroupVersionResource } // List 列出索引器中給定命名空間的所有資源。 func (l *dynamicNamespaceLister) List(selector labels.Selector) (ret []*unstructured.Unstructured, err error) { // 該方法到對應包在做具體分析,用來擷取符合selector和namespace對應添加的item err = cache.ListAllByNamespace(l.indexer, l.namespace, selector, func(m interface{}) { ret = append(ret, m.(*unstructured.Unstructured)) }) return ret, err } // Get 從索引器中檢索給定命名空間和名稱的資源。 func (l *dynamicNamespaceLister) Get(name string) (*unstructured.Unstructured, error) { // 注意: 這裡可以看到indexer中items的存放,當namespace不為空時,key是${namespace}/${name} obj, exists, err := l.indexer.GetByKey(l.namespace + "/" + name) if err != nil { return nil, err } if !exists { return nil, errors.NewNotFound(l.gvr.GroupResource(), name) } return obj.(*unstructured.Unstructured), nil }
- 函數
// New 傳回一個新的 Lister. func New(indexer cache.Indexer, gvr schema.GroupVersionResource) Lister { return &dynamicLister{indexer: indexer, gvr: gvr} }
- 結構體
- shim.go
- 結構體
// dynamicListerShim 實作了 cache.GenericLister(後面章節再詳細介紹) 接口。 // 包裝了Lister接口,其實沒有多少差别,隻是List把傳回slice中Unstructured對象變為object對象 type dynamicListerShim struct { lister Lister } // List 将傳回跨命名空間的所有對象 func (s *dynamicListerShim) List(selector labels.Selector) (ret []runtime.Object, err error) { // objs, err := s.lister.List(selector) if err != nil { return nil, err } // 傳回slice中Unstructured對象變為Object對象 ret = make([]runtime.Object, len(objs)) for index, obj := range objs { ret[index] = obj } return ret, err } func (s *dynamicListerShim) Get(name string) (runtime.Object, error) { return s.lister.Get(name) } // 擷取限定命名空間的Lister func (s *dynamicListerShim) ByNamespace(namespace string) cache.GenericNamespaceLister { return &dynamicNamespaceListerShim{ namespaceLister: s.lister.Namespace(namespace), } } // dynamicNamespaceListerShim 實作了 NamespaceLister 接口。 // 它包裝了 NamespaceLister 以便它實作 cache.GenericNamespaceLister 接口 // 其實沒有多少差别,隻是List把傳回slice中Unstructured對象變為object對象 type dynamicNamespaceListerShim struct { namespaceLister NamespaceLister } // List 将傳回此命名空間中的所有對象 func (ns *dynamicNamespaceListerShim) List(selector labels.Selector) (ret []runtime.Object, err error) { objs, err := ns.namespaceLister.List(selector) if err != nil { return nil, err } // 傳回slice中Unstructured對象變為Object對象 ret = make([]runtime.Object, len(objs)) for index, obj := range objs { ret[index] = obj } return ret, err } // Get 将嘗試按命名空間和名稱檢索 func (ns *dynamicNamespaceListerShim) Get(name string) (runtime.Object, error) { return ns.namespaceLister.Get(name) }
- 結構體
dynamicinformer包
- interface.go
- 接口 定義了擷取Informer的方法,等待緩存同步的方法,啟動所有informer的方法
// Dynamic SharedInformerFactory 為動态用戶端提供對共享informer和lister的通路 type DynamicSharedInformerFactory interface { // 啟動所有informer的方法 Start(stopCh <-chan struct{}) // 擷取Informer的方法 ForResource(gvr schema.GroupVersionResource) informers.GenericInformer // 等待緩存同步的方法 WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool } // TweakListOptionsFunc 定義了一個輔助函數的簽名,想要為 API 提供更多的清單選項 type TweakListOptionsFunc func(*metav1.ListOptions)
- 接口 定義了擷取Informer的方法,等待緩存同步的方法,啟動所有informer的方法
- informer.go
- 函數
// NewDynamicSharedInformerFactory 為所有命名空間構造一個 dynamicSharedInformerFactory 的新執行個體。 func NewDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration) DynamicSharedInformerFactory { return NewFilteredDynamicSharedInformerFactory(client, defaultResync, metav1.NamespaceAll, nil) } // NewFilteredDynamicSharedInformerFactory 構造了一個 dynamicSharedInformerFactory 的新執行個體。 // 通過此工廠獲得的lister将受到此處指定的相同過濾器的限制。 func NewFilteredDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration, namespace string, tweakListOptions TweakListOptionsFunc) DynamicSharedInformerFactory { return &dynamicSharedInformerFactory{ client: client, defaultResync: defaultResync, namespace: namespace, informers: map[schema.GroupVersionResource]informers.GenericInformer{}, startedInformers: make(map[schema.GroupVersionResource]bool), tweakListOptions: tweakListOptions, } } // NewFilteredDynamicInformer 為動态類型構造一個新的 Informer。 func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer { return &dynamicInformer{ gvr: gvr, // 建立共享的SharedIndexInformer informer: cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.Resource(gvr).Namespace(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.Resource(gvr).Namespace(namespace).Watch(context.TODO(), options) }, }, &unstructured.Unstructured{}, resyncPeriod, indexers, ), } }
- 結構體
type dynamicSharedInformerFactory struct { // 建構ListWatch接口使用,為後來建構reflector,執行listWatch監控api resource提供client client dynamic.Interface // 同步周期,informer同步deltaFIFO中資料到listener中的chan中 defaultResync time.Duration // 命名空間 namespace string lock sync.Mutex // 緩存informer到map中 informers map[schema.GroupVersionResource]informers.GenericInformer // startInformers 用于跟蹤哪些 Informers 已啟動。這允許安全地多次調用 Start()。 startedInformers map[schema.GroupVersionResource]bool tweakListOptions TweakListOptionsFunc } // 實作DynamicSharedInformerFactory的ForResource方法 func (f *dynamicSharedInformerFactory) ForResource(gvr schema.GroupVersionResource) informers.GenericInformer { f.lock.Lock() defer f.lock.Unlock() key := gvr // 擷取緩存map中的informer informer, exists := f.informers[key] if exists { return informer } // 不存在就建立 informer = NewFilteredDynamicInformer(f.client, gvr, f.namespace, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) f.informers[key] = informer return informer } // 實作SharedInformerFactory的Start方法,啟動所有未啟動的informer。 func (f *dynamicSharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() // 周遊所有informer for informerType, informer := range f.informers { // 判斷該informer是否已經啟動 if !f.startedInformers[informerType] { // 啟動informer go informer.Informer().Run(stopCh) // 設定對應gvr的informer已經啟動 f.startedInformers[informerType] = true } } } // 實作SharedInformerFactory的WaitForCacheSync方法,等待所有啟動的informer的緩存同步。 func (f *dynamicSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool { informers := func() map[schema.GroupVersionResource]cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() // 定義map,用于接收所有已經啟動的informer informers := map[schema.GroupVersionResource]cache.SharedIndexInformer{} for informerType, informer := range f.informers { if f.startedInformers[informerType] { informers[informerType] = informer.Informer() } } return informers }() // 定義map,用于接收所有同步完成的informer res := map[schema.GroupVersionResource]bool{} // 周遊已經啟動的所有informer for informType, informer := range informers { // 執行同步方法 // (1) 如果informer中controller為空,傳回false, // (2) 如果informer.controller的queue還沒有調用過Add/Update/Delete/AddIfNotPresent或者queue的initialPopulationCount != 0 (隊列中還有資料),傳回false res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) } return res } // 動态Informer結構體,包裝了SharedIndexInformer和gvr type dynamicInformer struct { informer cache.SharedIndexInformer gvr schema.GroupVersionResource } // 實作GenericInformer的Informer方法 func (d *dynamicInformer) Informer() cache.SharedIndexInformer { return d.informer } // 實作GenericInformer的Lister方法,使用dynamicInformer的indexer和gvr構造以Lister func (d *dynamicInformer) Lister() cache.GenericLister { return dynamiclister.NewRuntimeObjectShim(dynamiclister.New(d.informer.GetIndexer(), d.gvr)) }
- 函數