天天看点

client-go源码分析--Reflector

前言

在informer机制流程分析中,简单介绍并梳理了Reflector函数执行流程。

整个controller framework中,Reflector的角色是生产者,从k8s api-server中获取runtime.Object,并添加到DeltaFIFO中;indexer是消费者,从DeltaFIFO中获取并处理runtime.Object。

1 Reflector创建和运行

在controller执行Run方法是创建,然后运行,实际是在

func (r *Reflector) ListAndWatch

中执行的。

func (c *controller) Run(stopCh <-chan struct{}) {
    。。。
        r := NewReflector(
                c.config.ListerWatcher,
                c.config.ObjectType,
                c.config.Queue,
                c.config.FullResyncPeriod,
        )
    。。。

        wg.StartWithChannel(stopCh, r.Run)

        wait.Until(c.processLoop, time.Second, stopCh)
}

func (r *Reflector) Run(stopCh <-chan struct{}) {
        klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
        wait.Until(func() {
                if err := r.ListAndWatch(stopCh); err != nil {
                        utilruntime.HandleError(err)
                }
        }, r.period, stopCh)
}
           

2 List&Watch

2.1 简介

Controller framework通过List&Watch机制完成k8s资源对象和Index缓存之间的同步。

  • List: controller启动时执行一次,k8s资源对象全量更新到Indexer。
  • Watch: controller运行时持续监控,k8s资源对象增量更新到Indexer。
    client-go源码分析--Reflector

2.2 List&Watch的实现

接下来我们详细分析函数

func (r *Reflector) ListAndWatch

,流程图如下:

client-go源码分析--Reflector

具体代码如下:

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
        klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
        var resourceVersion string

        // Explicitly set "0" as resource version - it's fine for the List()
        // to be served from cache and potentially be delayed relative to
        // etcd contents. Reflector framework will catch up via Watch() eventually.
        options := metav1.ListOptions{ResourceVersion: "0"}

        if err := func() error {
                initTrace := trace.New("Reflector " + r.name + " ListAndWatch")
                defer initTrace.LogIfLong(10 * time.Second)
                var list runtime.Object
                var err error
                listCh := make(chan struct{}, 1)
                panicCh := make(chan interface{}, 1)
                go func() {             // 启动一个子协程(goroutine)执行List,主协程阻塞等待List执行完成。
                        defer func() {
                                if r := recover(); r != nil {
                                        panicCh <- r
                                }
                        }()
                        // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
                        // list request will return the full response.
                        pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                                return r.listerWatcher.List(opts)
                        }))
                        if r.WatchListPageSize != 0 {
                                pager.PageSize = r.WatchListPageSize
                        }
                        // Pager falls back to full list if paginated list calls fail due to an "Expired" error.
                        list, err = pager.List(context.Background(), options)
                        close(listCh)
                }()
        // 主协程阻塞等待List执行完成。
                select {
                case <-stopCh:
                        return nil
                case r := <-panicCh:
                        panic(r)
                case <-listCh:
                }
                if err != nil {
                        return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
                }
                initTrace.Step("Objects listed")
                listMetaInterface, err := meta.ListAccessor(list)
                if err != nil {
                        return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
                }
                resourceVersion = listMetaInterface.GetResourceVersion()
                initTrace.Step("Resource version extracted")
                items, err := meta.ExtractList(list)    // Meta.ExtractList(list)把List结果转化成runtime.Object数组。
                if err != nil {
                        return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
                }
                initTrace.Step("Objects extracted")
                if err := r.syncWith(items, resourceVersion); err != nil {  // 写入DeltaFIFO,全量同步到Indexer。
                        return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
                }
                initTrace.Step("SyncWith done")
                r.setLastSyncResourceVersion(resourceVersion)
                initTrace.Step("Resource version updated")
                return nil
        }(); err != nil {
                return err
        }

        resyncerrc := make(chan error, 1)
        cancelCh := make(chan struct{})
        defer close(cancelCh)
    // reflector这里有一个非常有趣的地方,就是resyncPeriod这个参数,根据源代码注释,看起来是每个一段时间完整调用List API,全
    // 量更新数据,但是实际上,这个更新是 把Index中的所有数据 重新同步到DeltaFIFO。
        go func() {
                resyncCh, cleanup := r.resyncChan()     // resyncCh返回的就是一个定时器,如果resyncPeriod这个为0那么就会返回一个永久定时器,cleanup函数是用来清理定时器的
                defer func() {
                        cleanup() // Call the last one written into cleanup
                }()
                for {
                        select {
                        case <-resyncCh:
                        case <-stopCh:
                                return
                        case <-cancelCh:
                                return
                        }
                        if r.ShouldResync == nil || r.ShouldResync() {
                                klog.V(4).Infof("%s: forcing resync", r.name)
                                //这里的store是deltafifo,在sharedIndexInformer.run()中定义
                                if err := r.store.Resync(); err != nil {    
                                        resyncerrc <- err
                                        return
                                }       
                        }               
                        cleanup()
                        resyncCh, cleanup = r.resyncChan()
                }
        }()

        for {
                // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
                select {
                case <-stopCh:
                        return nil
                default:
                }

                timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
                options = metav1.ListOptions{
                        ResourceVersion: resourceVersion,
                        // We want to avoid situations of hanging watchers. Stop any wachers that do not
                        // receive any events within the timeout window.
                        TimeoutSeconds: &timeoutSeconds,
                        // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
                        // Reflector doesn't assume bookmarks are returned at all (if the server do not support
                        // watch bookmarks, it will ignore this field).
                        AllowWatchBookmarks: true,
                }

                w, err := r.listerWatcher.Watch(options)
                if err != nil {
                        switch err {
                        case io.EOF:
                                // watch closed normally
                        case io.ErrUnexpectedEOF:
                                klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
                        default:
                                utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
                        }
                        // If this is "connection refused" error, it means that most likely apiserver is not responsive.
                        // It doesn't make sense to re-list all objects because most likely we will be able to restart
                        // watch where we ended.
                        // If that's the case wait and resend watch request.
                        if urlError, ok := err.(*url.Error); ok {
                                if opError, ok := urlError.Err.(*net.OpError); ok {
                                        if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
                                                time.Sleep(time.Second)
                                                continue
                                        }
                                }
                        }
                        return nil
                }
                // r.WatchHandler增量同步runtime.Object到indexer。
                if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
                        if err != errorStopRequested {
                                switch {
                                case apierrs.IsResourceExpired(err):
                                        klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
                                default:
                                        klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
                                }
                        }
                        return nil
                }
        }
}
           
  • Reflector.Run()调用ListAndWatch(), 启动一个子协程(goroutine)执行List,主协程阻塞等待List执行完成。
  • Meta.ExtractList(list)把List结果转化成runtime.Object数组。
  • r.syncWith(items, resourceVersion)写入DeltaFIFO,全量同步到Indexer。
  • r.resyncChan()也是在一个子协程里执行。
  • 循环执行r.ListerWatcher.Watch(optiopns)。
  • r.WatchHandler增量同步runtime.Object到indexer。
    client-go源码分析--Reflector

2.3 r.ListerWatcher.List & r.ListerWatcher.Watch

ListerWatcher是接口类型,定义如下:

type ListerWatcher interface {
        Lister
        Watcher
}
           

listwatch.go文件,NewListWatchFromClient创建了ListWatch对象,该对象实现了ListerWatcher interface。r.ListerWatcher.List(options)和r.ListerWatcher.Watch(options),是调用了某个实现了ListerWatcher接口的对象的List,Watch函数。当用户用NewListWatchFromClient创建一个ListWatch对象时,r.ListerWatcher.List(options)实际上调用了NewFilteredListWatchFromClient里的listFunc,而r.ListerWatcher.Watch(options)调用了watchFunc。

type ListWatch struct {
        ListFunc  ListFunc
        WatchFunc WatchFunc
        // DisableChunking requests no chunking for this list watcher.
        DisableChunking bool
}

func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
        optionsModifier := func(options *metav1.ListOptions) {
                options.FieldSelector = fieldSelector.String()
        }
        return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}

// List a set of apiserver resourcesfunc (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
        if !lw.DisableChunking {
                return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
        }
        return lw.ListFunc(options)
}

// Watch a set of apiserver resourcesfunc (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
        return lw.WatchFunc(options)
}
           

2.4 r.syncWith(items, resourceVersion)

r.syncWith --> r.store.Replace --> func (c *cache) Replace --> c.cacheStorage.Replace --> func (c *threadSafeMap) Replace

所以实际调用的是func (c *threadSafeMap) Replace方法。

2.5 r.watchHandlerr.watchHandler

从watch.Interface中获取event(Add,Modified,Deleted),并调用DeltaFIFO的Add,Modified,Deleted函数触发DeltaFIFO对象增量同步。

2.6 r.resyncChan()

Resync核心函数是r.store.Resync(),这里的store是deltafifo,在sharedIndexInformer.run()中定义。该函数实际上调用DeltaFIFO的Resync函数做对象周期性同步。函数的本质是从localstore中遍历item,将Delta{sync, obj}写入到DeltaFIFO中。

参考

https://www.cnblogs.com/charlieroro/p/10330390.html

https://blog.csdn.net/weixin_42663840/article/details/81699303

https://blog.csdn.net/li_101357/article/details/89763992

https://www.jianshu.com/p/1daeae7b6970

继续阅读