测试代码

下面代码是是从 client-go 中的测试代码修改而来,通过单步运行来帮助梳理代码逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 跟此文件逻辑一致:examples/fake-client/main_test.go
package main

import (
  "context"
  "fmt"
  "time"

  v1 "k8s.io/api/core/v1"
  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  "k8s.io/apimachinery/pkg/util/wait"
  "k8s.io/apimachinery/pkg/watch"
  "k8s.io/client-go/informers"
  "k8s.io/client-go/kubernetes/fake"
  clienttesting "k8s.io/client-go/testing"
  "k8s.io/client-go/tools/cache"
)

func main() {
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()

  watcherStarted := make(chan struct{})
  // Create the fake client.
  client := fake.NewSimpleClientset()
  // A catch-all watch reactor that allows us to inject the watcherStarted channel.
  client.PrependWatchReactor("*", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
    gvr := action.GetResource()
    ns := action.GetNamespace()
    watch, err := client.Tracker().Watch(gvr, ns)
    if err != nil {
      return false, nil, err
    }
    close(watcherStarted)
    return true, watch, nil
  })

  // We will create an informer that writes added pods to a channel.
  pods := make(chan *v1.Pod, 1)
  informers := informers.NewSharedInformerFactory(client, 0)
  podInformer := informers.Core().V1().Pods().Informer()
  podInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
      pod := obj.(*v1.Pod)
      fmt.Printf("pod added: %s/%s", pod.Namespace, pod.Name)
      pods <- pod
    },
  })

  informers.Start(ctx.Done())

  cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced)

  <-watcherStarted
  // Inject an event into the fake client.
  p := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "my-pod"}}
  _, err := client.CoreV1().Pods("test-ns").Create(context.TODO(), p, metav1.CreateOptions{})
  if err != nil {
    fmt.Printf("error injecting pod add: %v", err)
  }

  select {
  case pod := <-pods:
    fmt.Printf("Got pod from channel: %s/%s", pod.Namespace, pod.Name)
  case <-time.After(wait.ForeverTestTimeout):
    fmt.Printf("Informer did not get the added pod")
  }
}

源码解析

在分析源码之前,可以借助下图的整体流程图来帮助理解。 informer

sharedInformerFactory

先来看下 sharedInformerFactory 结构,它可以统一管理控制器中需要的各资源对象的 informer 实例,避免同一个资源创建多个实例,在上面测试代码中先获取了一个 sharedInformerFactory,在后续就可以通过它拿到对应 API 的 informer。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// informers/factory.go
// 实现了 SharedInformerFactory 接口,为所有已知的 API GVR 提供一个共享的 informer
type sharedInformerFactory struct {
  client           kubernetes.Interface
  namespace        string
  tweakListOptions internalinterfaces.TweakListOptionsFunc
  lock             sync.Mutex
  defaultResync    time.Duration
  customResync     map[reflect.Type]time.Duration
    // map 保存每种类型的 SharedIndexInformer
  informers map[reflect.Type]cache.SharedIndexInformer
  // startedInformers is used for tracking which informers have been started.
  // This allows Start() to be called multiple times safely.
  startedInformers map[reflect.Type]bool
  // wg tracks how many goroutines were started.
  wg sync.WaitGroup
  // shuttingDown is true when Shutdown has been called. It may still be running
  // because it needs to wait for goroutines.
  shuttingDown bool
}

// 为所有命名空间生成一个 SharedInformerFactory
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
  return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

// 实际创建的是 sharedInformerFactory
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
  factory := &sharedInformerFactory{
    client:           client,
    namespace:        v1.NamespaceAll,
    defaultResync:    defaultResync,
    informers:        make(map[reflect.Type]cache.SharedIndexInformer),
    startedInformers: make(map[reflect.Type]bool),
    customResync:     make(map[reflect.Type]time.Duration),
  }

  // Apply all options
  for _, opt := range options {
    factory = opt(factory)
  }

  return factory
}
// 返回 core API 分组
func (f *sharedInformerFactory) Core() core.Interface {
  return core.New(f, f.namespace, f.tweakListOptions)
}

API Informer实例

测试代码中,通过 informers.Core().V1().Pods().Informer() 返回来 pods 对应的 Informer 实例,其中 core 包中主要是提供返回指定版本的 API 组。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// informers/core/interface.go
// Interface 提供了方法来访问这个组所有版本
type Interface interface {
  // V1 provides access to shared informers for resources in V1.
  V1() v1.Interface
}

// 表示 k8s 的一个 api 组
type group struct {
  factory          internalinterfaces.SharedInformerFactory
  namespace        string
  tweakListOptions internalinterfaces.TweakListOptionsFunc
}

// 返回 api 组
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
  return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}

// 返回 api 组的 V1 版本
func (g *group) V1() v1.Interface {
  return v1.New(g.factory, g.namespace, g.tweakListOptions)
}

core/v1 包里面的逻辑,主要是提供方法获取/api/v1 路径下的所有 API 的 informer 实例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// informers/core/v1/interface.go
// Interface 提供方法访问这个版本下的所有 informer
type Interface interface {
  ComponentStatuses() ComponentStatusInformer
  ConfigMaps() ConfigMapInformer
  Endpoints() EndpointsInformer
  Events() EventInformer
  LimitRanges() LimitRangeInformer
  Namespaces() NamespaceInformer
  Nodes() NodeInformer
  PersistentVolumes() PersistentVolumeInformer
  PersistentVolumeClaims() PersistentVolumeClaimInformer
  Pods() PodInformer
  PodTemplates() PodTemplateInformer
  ReplicationControllers() ReplicationControllerInformer
  ResourceQuotas() ResourceQuotaInformer
  Secrets() SecretInformer
  Services() ServiceInformer
  ServiceAccounts() ServiceAccountInformer
}
// 表示 api 组下面的一个版本
type version struct {
  factory          internalinterfaces.SharedInformerFactory
  namespace        string
  tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// 返回一个新的版本
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
  return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// 返回 PodInformer 实例
func (v *version) Pods() PodInformer {
  return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
// ... 返回其它实例

上面返回的是 PodInformer 结构,定义为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// informers/core/v1/pod.go
// 提供方法访问底层的 informer 和 lister
type PodInformer interface {
  Informer() cache.SharedIndexInformer
  Lister() v1.PodLister
}
// 实现了 PodInformer
type podInformer struct {
  factory          internalinterfaces.SharedInformerFactory
  tweakListOptions internalinterfaces.TweakListOptionsFunc
  namespace        string
}

// 创建 pods 的 SharedIndexInformer,不同类型资源的 Informer 都是 SharedIndexInformer,只是创建时传入的 ListWatch 函数和资源类型不同
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
  return cache.NewSharedIndexInformer(
    &cache.ListWatch{
      ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
        if tweakListOptions != nil {
          tweakListOptions(&options)
        }
        return client.CoreV1().Pods(namespace).List(context.TODO(), options)
      },
      WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
        if tweakListOptions != nil {
          tweakListOptions(&options)
        }
        return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
      },
    },
    &corev1.Pod{},
    resyncPeriod,
    indexers,
  )
}
// podInformer 的默认创建函数,factory 中会调用
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    // Indexer 映射名字到索引函数,MetaNamespaceIndexFunc 是默认的索引函数,基于对象的命名空间进行索引
  return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
// 实际调用的是 main.go 中初始化的 sharedInformerFactory 的 InformerFor 方法,返回 pods 的 SharedIndexInformer
func (f *podInformer) Informer() cache.SharedIndexInformer {
  return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
func (f *podInformer) Lister() v1.PodLister {
  return v1.NewPodLister(f.Informer().GetIndexer())
}

看下 InformerFor 是如何实现的,从下面可以看到 factory 增加了本地缓存逻辑,最后还是调用对应类型的创建函数生成 Informer,即 newFunc 为上面 podInformer.Informer() 方法中调用时传入的参数 defaultInformer 函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
  f.lock.Lock()
  defer f.lock.Unlock()

  informerType := reflect.TypeOf(obj)
  informer, exists := f.informers[informerType]
  if exists {     // 如果已经创建了这种类型的 informer,直接返回
    return informer
  }

    // 判断是否设置了同步周期
  resyncPeriod, exists := f.customResync[informerType]
  if !exists {
    resyncPeriod = f.defaultResync
  }

    //  创建 informer 并加入到 map 中
  informer = newFunc(f.client, resyncPeriod)
  f.informers[informerType] = informer

  return informer
}

newFunc 所代表的 NewFilteredPodInformer 方法,最终会调用到 NewSharedIndexInformer,创建具体的 Informer:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// tools/cache/shared_informer.go
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
  realClock := &clock.RealClock{}
  sharedIndexInformer := &sharedIndexInformer{
    processor:                       &sharedProcessor{clock: realClock},
    indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
    listerWatcher:                   lw,
    objectType:                      exampleObject,
    resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
    defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
    cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
    clock:                           realClock,
  }
  return sharedIndexInformer
}
// SharedIndexInformer 的实现
type sharedIndexInformer struct {
  indexer    Indexer
  controller Controller

  processor             *sharedProcessor
  cacheMutationDetector MutationDetector

  listerWatcher ListerWatcher

    // 此 informer 期望处理的对象
  objectType runtime.Object

  resyncCheckPeriod time.Duration
  defaultEventHandlerResyncPeriod time.Duration
  // clock allows for testability
  clock clock.Clock

  started, stopped bool
  startedLock      sync.Mutex

  blockDeltas sync.Mutex

  // Called whenever the ListAndWatch drops the connection with an error.
  watchErrorHandler WatchErrorHandler

  transform TransformFunc
}

Indexer

在 sharedIndexInformer 中有使用 Indexer,其定义为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// tools/cache/index.go
// Indexer 使用多个索引来扩展存储,限制每个累积器只能保存当前对象,函数是线程安全的
type Indexer interface {
  Store
  // 根据 obj 对象在 indexName 索引函数上的 key,查找匹配的对象列表
  Index(indexName string, obj interface{}) ([]interface{}, error)
  // 根据 indexName 索引函数,查找索引 key 为 indexedValue 的对象 key 列表
  IndexKeys(indexName, indexedValue string) ([]string, error)
  // 返回一个有 IndexName 索引函数生成的 key 的列表
  ListIndexFuncValues(indexName string) []string
  // 根据 indexName 索引函数,查找索引 key 为 indexedValue 的对象列表
  ByIndex(indexName, indexedValue string) ([]interface{}, error)
  //  返回所有的 Indexers
  GetIndexers() Indexers
  // 添加 indexer
  AddIndexers(newIndexers Indexers) error
}
//  知道如何计算对象的索引值,输出字符串索引数组
type IndexFunc func(obj interface{}) ([]string, error)

// 默认索引函数,基于对象的命名空间
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
  meta, err := meta.Accessor(obj)
  if err != nil {
    return []string{""}, fmt.Errorf("object has no meta: %v", err)
  }
  return []string{meta.GetNamespace()}, nil
}

// 映射索引值到存储对象的 key 集合,多个对象的索引值可能相同
type Index map[string]sets.String

// 索引名称到索引函数的映射
type Indexers map[string]IndexFunc

// 每个索引名称到具体 Index 的映射
type Indices map[string]Index

是通过下面函数创建的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// tools/cache/store.go
// NewIndexer 返回一个 Indexer 的实现
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
  return &cache{
    cacheStorage: NewThreadSafeStore(indexers, Indices{}),
    keyFunc:      keyFunc,
  }
}
// cache 通过一个 ThreadSafeStore 和 KeyFunc 实现了 Indexer
type cache struct {
  // cacheStorage 线程安全的缓存
  cacheStorage ThreadSafeStore
  // keyFunc 用来得到存储对象的 key 和检索对象
  keyFunc KeyFunc
}

在上面使用的 KeyFunc 为 DeletionHandlingMetaNamespaceKeyFunc,里面会调用 MetaNamespaceKeyFunc,即:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// tools/cache/store.go
// 默认的 KeyFunc,为 API 的对象生成 key,格式为:<namespace>/<name>
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
  if key, ok := obj.(ExplicitKey); ok {
    return string(key), nil
  }
  meta, err := meta.Accessor(obj)
  if err != nil {
    return "", fmt.Errorf("object has no meta: %v", err)
  }
  if len(meta.GetNamespace()) > 0 {
    return meta.GetNamespace() + "/" + meta.GetName(), nil
  }
  return meta.GetName(), nil
}

除了上面的 MetaNamespaceIndexFunc 索引函数外,Kubernetes 中还有 indexByPodNodeName,索引函数是根据 pods 所在 node 的名称:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// kubernetes 源码下:pkg/controller/daemon/daemon_controller.go
func indexByPodNodeName(obj interface{}) ([]string, error) {
  pod, ok := obj.(*v1.Pod)
  if !ok {
    return []string{}, nil
  }
  // We are only interested in active pods with nodeName set
  if len(pod.Spec.NodeName) == 0 || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
    return []string{}, nil
  }
  return []string{pod.Spec.NodeName}, nil
}

以上结构体的关系,可以参考以下的图示结构:

indexer

添加事件处理

main 函数中调用了 podInformer.AddEventHandler 来添加 pods 的事件处理。

1
2
3
4
5
6
7
8
  podInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
        // 添加 pods 时打印日志,并加入到 channel 中
    AddFunc: func(obj interface{}) {
      pod := obj.(*v1.Pod)
      fmt.Printf("pod added: %s/%s", pod.Namespace, pod.Name)
      pods <- pod
    },
  })

AddEventHandler 逻辑为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// tools/cache/shared_informer.go
// 增加了一个同步周期参数
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
  return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
  s.startedLock.Lock()
  defer s.startedLock.Unlock()

  if s.stopped {
    return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler)
  }

  if resyncPeriod > 0 {
        ... // 同步周期参数校验和处理
  }
    // 创建 listener,用于传递事件通知并进行处理
  listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

  if !s.started { // 没有启动,直接加入到 processor 中即可
    return s.processor.addListener(listener), nil
  }

  // 已经启动了,需要安全地加入
  // 1. stop sending add/update/delete notifications
  // 2. do a list against the store
  // 3. send synthetic "Add" events to the new handler
  // 4. unblock
  s.blockDeltas.Lock()
  defer s.blockDeltas.Unlock()

  handle := s.processor.addListener(listener)
  for _, item := range s.indexer.List() {
    listener.add(addNotification{newObj: item})
  }
  return handle, nil
}

processorListener

processorListener 用于传递事件通知并进行处理,其定义和创建逻辑为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// tools/cache/shared_informer.go
// processorListener 通过一个 sharedProcessor 来传递通知给一个 ResourceEventHandler
// 使用了两个协程,两个无缓冲的 channel,一个无限的循环缓存
type processorListener struct {
  nextCh chan interface{}
  addCh  chan interface{}

  handler ResourceEventHandler
  // pendingNotifications 无限的循环缓存,保存了所有还未分发的通知, 每个 listener 都有一个,失败或者停滞的 listener 将会有个无限的通知,直到 OOM
  pendingNotifications buffer.RingGrowing

  // requestedResyncPeriod 是 listener 想要从共享 Informer 完全同步的周期
  requestedResyncPeriod time.Duration
  // resyncPeriod 是在 listener 逻辑中使用的阈值,这个值除了在 sharedIndexInformer 不进行同步时跟 requestedResyncPeriod 不一样,其他时候都一致。
  resyncPeriod time.Duration
  // nextResync 应该进行完全同步的最早时间
  nextResync time.Time
  resyncLock sync.Mutex
}

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
  ret := &processorListener{
    nextCh:                make(chan interface{}),
    addCh:                 make(chan interface{}),
    handler:               handler,
    pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
    requestedResyncPeriod: requestedResyncPeriod,
    resyncPeriod:          resyncPeriod,
  }

  ret.determineNextResync(now)
  return ret
}

sharedProcessor

sharedProcessr 结构用于分发 deltaFIFO 的对象,回调用户配置的 EventHandler 方法,其定义和加入 listener 逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// tools/cache/shared_informer.go
// sharedProcessor 有一个 processorListener 集合,可以分发通知对象给它的 listener
type sharedProcessor struct {
  listenersStarted bool
  listenersLock    sync.RWMutex
  // 保存每个 listener 当前是否在同步中
  listeners map[*processorListener]bool
  clock     clock.Clock
  wg        wait.Group
}
func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
  p.listenersLock.Lock()
  defer p.listenersLock.Unlock()

  if p.listeners == nil {
    p.listeners = make(map[*processorListener]bool)
  }

  p.listeners[listener] = true

  if p.listenersStarted {
        // run 实际处理通知的地方,会调用 hander 的函数进行处理
    p.wg.Start(listener.run)
    p.wg.Start(listener.pop)
  }

  return listener
}

func (p *processorListener) pop() {
  defer utilruntime.HandleCrash()
  defer close(p.nextCh) // Tell .run() to stop

  var nextCh chan<- interface{}
  var notification interface{}
  for {
    select {
    case nextCh <- notification:    // 获取通知并发生到 nextCh
      var ok bool
      notification, ok = p.pendingNotifications.ReadOne()
      if !ok { // Nothing to pop
        nextCh = nil // Disable this select case
      }
    case notificationToAdd, ok := <-p.addCh:
      if !ok {
        return
      }
      if notification == nil { // 没有通知可以分发了
        notification = notificationToAdd
        nextCh = p.nextCh
      } else { // 有一个通知等待被分发
        p.pendingNotifications.WriteOne(notificationToAdd)
      }
    }
  }
}
func (p *processorListener) run() {
  // 这个调用会一直阻塞到 channel 关闭
  stopCh := make(chan struct{})
  wait.Until(func() {
    for next := range p.nextCh { // 从 nextCh 中获取进行实际的操作
      switch notification := next.(type) {
      case updateNotification:
        p.handler.OnUpdate(notification.oldObj, notification.newObj)
      case addNotification:
        p.handler.OnAdd(notification.newObj)
      case deleteNotification:
        p.handler.OnDelete(notification.oldObj)
      default:
        utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
      }
    }
    close(stopCh)
  }, 1*time.Second, stopCh)
}

sharedIndexInformer 调用 processor

processor 是在 sharedIndexInformer 中使用的,可以看到主要用在以下几个函数中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// tools/cache/shared_informer.go
// 分发添加事件
func (s *sharedIndexInformer) OnAdd(obj interface{}) {
  s.cacheMutationDetector.AddObject(obj)
  s.processor.distribute(addNotification{newObj: obj}, false)
}

// 分发更新事件
func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
  isSync := false

    // 设置 isSync
  if accessor, err := meta.Accessor(new); err == nil {
    if oldAccessor, err := meta.Accessor(old); err == nil {
      // Events that didn't change resourceVersion are treated as resync events
      // and only propagated to listeners that requested resync
      isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
    }
  }

  s.cacheMutationDetector.AddObject(new)
  s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}

// 分发删除事件
func (s *sharedIndexInformer) OnDelete(old interface{}) {
  s.processor.distribute(deleteNotification{oldObj: old}, false)
}

// 分发事件,添加到 listener 中
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
  p.listenersLock.RLock()
  defer p.listenersLock.RUnlock()

  for listener, isSyncing := range p.listeners {
    switch {
    case !sync:
      // 非同步消息,分发给所有 listener
      listener.add(obj)
    case isSyncing:
      // 同步消息分发给所有的同步 listener
      listener.add(obj)
    default:
      // 跳过(同步消息不分发给非同步 listener)
    }
  }
}
// 添加到 addCh 通道中,会在 pop 中消费
func (p *processorListener) add(notification interface{}) {
  p.addCh <- notification
}

启动 informers

测试代码中调用 informers.Start(ctx.Done()) 来启动 sharedInformerFactory 里的所有 informer,启动代码为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
  f.lock.Lock()
  defer f.lock.Unlock()

  if f.shuttingDown {
    return
  }
    // 遍历所有的 informers
  for informerType, informer := range f.informers {
        // informer 没有启动则启动
    if !f.startedInformers[informerType] {
      f.wg.Add(1)
      informer := informer
      go func() {
        defer f.wg.Done()
        informer.Run(stopCh)
      }()
      f.startedInformers[informerType] = true
    }
  }
}

这里调用了 informer 的 run 函数,进行实际的监听和事件处理操作,代码为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  defer utilruntime.HandleCrash()

  if s.HasStarted() {
    klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
    return
  }
  // 创建 Queue 用来处理对象的改变
  fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
    KnownObjects:          s.indexer,
    EmitDeltaTypeReplaced: true,
  })
  // 用于创建底层 controller 的配置
  cfg := &Config{
    Queue:            fifo,
    ListerWatcher:    s.listerWatcher,
    ObjectType:       s.objectType,
    FullResyncPeriod: s.resyncCheckPeriod,
    RetryOnError:     false,
    ShouldResync:     s.processor.shouldResync,

    Process:           s.HandleDeltas,
    WatchErrorHandler: s.watchErrorHandler,
  }

  func() {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()
    // 创建 Controller
    s.controller = New(cfg)
    s.controller.(*controller).clock = s.clock
    s.started = true
  }()

  // Separate stop channel because Processor should be stopped strictly after controller
  processorStopCh := make(chan struct{})
  var wg wait.Group
  defer wg.Wait()              // Wait for Processor to stop
  defer close(processorStopCh) // Tell Processor to stop
  // 启动 monitor
  wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
  // 启动 processor
  wg.StartWithChannel(processorStopCh, s.processor.run)

  defer func() {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()
    s.stopped = true // Don't want any new listeners
  }()
  // 启动 controller
  s.controller.Run(stopCh)
}

DeltaFIFO

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// tools/cache/delta_fifo.go
// 跟 FIFO 类似,但是有两点不同。
// 一是与对象 key 关联的累积器不是一个对象,而是一个 Deltas,是一个此对象的 Delta 值的切片
// 二是一个对象有两种额外的方式添加到累加器中:Replaced 和 Sync
type DeltaFIFO struct {
  // lock/cond protects access to 'items' and 'queue'.
  lock sync.RWMutex
  cond sync.Cond

  // 映射 key 到 Deltas.
  items map[string]Deltas

  // 维持 Pop() 中消费的 key 的 FIFO 顺序
  queue []string

  // 如果由 Replace 函数插入了 items,则为 true
  populated bool
  // 第一次调用 Replace 函数插入的 items 数量, 在 Pop 中减小
  initialPopulationCount int

  // 用来生成 item 的 key
  keyFunc KeyFunc
  // 实际是 Indexer
  knownObjects KeyListerGetter
  closed bool
  emitDeltaTypeReplaced bool
}

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
  if opts.KeyFunction == nil {
    opts.KeyFunction = MetaNamespaceKeyFunc
  }

  f := &DeltaFIFO{
    items:        map[string]Deltas{},
    queue:        []string{},
    keyFunc:      opts.KeyFunction,
    knownObjects: opts.KnownObjects,

    emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
  }
  f.cond.L = &f.lock
  return f
}

cacheMutationDetector

cacheMutationDetector会比较对象,判断对象是否改变。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// tools/cache/mutation_detector.go
// 提供一种检测缓存对象是否突变的方式
type defaultCacheMutationDetector struct {
  name   string
  period time.Duration

  // compareLock ensures only a single call to CompareObjects runs at a time
  compareObjectsLock sync.Mutex

  // addLock guards addedObjs between AddObject and CompareObjects
  addedObjsLock sync.Mutex
  addedObjs     []cacheObj

  cachedObjs []cacheObj

  retainDuration     time.Duration
  lastRotated        time.Time
  retainedCachedObjs []cacheObj

  failureFunc func(message string)
}
func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
  for {
    if d.lastRotated.IsZero() {
      d.lastRotated = time.Now()
    } else if time.Since(d.lastRotated) > d.retainDuration {
      d.retainedCachedObjs = d.cachedObjs
      d.cachedObjs = nil
      d.lastRotated = time.Now()
    }
    // 比较对象是否改变
    d.CompareObjects()

    select {
    case <-stopCh:
      return
    case <-time.After(d.period):
    }
  }
}

sharedProcessor 的 run 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
  // 启动所有 listener 的 run 函数和 pop 函数
  func() {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    for listener := range p.listeners {
      p.wg.Start(listener.run)
      p.wg.Start(listener.pop)
    }
    p.listenersStarted = true
  }()
  // 处理关闭后操作
  <-stopCh

  p.listenersLock.Lock()
  defer p.listenersLock.Unlock()
  for listener := range p.listeners {
    close(listener.addCh) //关闭 listener
  }

  // 清除 listener 列表
  p.listeners = nil
  p.listenersStarted = false
  p.wg.Wait()
}

controller 定义和 Run 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// tools/cache/controller.go
type controller struct {
  config         Config
  reflector      *Reflector
  reflectorMutex sync.RWMutex
  clock          clock.Clock
}
// 处理 items,直到 stopCh 中被发送了一个值
func (c *controller) Run(stopCh <-chan struct{}) {
  defer utilruntime.HandleCrash()
  go func() {
    <-stopCh
    // 关闭 Queue
    c.config.Queue.Close()
  }()
  // 创建 Reflector,用于保证给定存储中的对象和服务器内容是一致的
  r := NewReflector(
    c.config.ListerWatcher,
    c.config.ObjectType,
    c.config.Queue,
    c.config.FullResyncPeriod,
  )
  r.ShouldResync = c.config.ShouldResync
  r.WatchListPageSize = c.config.WatchListPageSize
  r.clock = c.clock
  if c.config.WatchErrorHandler != nil {
    r.watchErrorHandler = c.config.WatchErrorHandler
  }

  c.reflectorMutex.Lock()
  c.reflector = r
  c.reflectorMutex.Unlock()
  var wg wait.Group
  // 启动 Reflector
  wg.StartWithChannel(stopCh, r.Run)

  wait.Until(c.processLoop, time.Second, stopCh)
  wg.Wait()
}

Reflector 定义和 Run 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// tools/cache/reflector.go
// 监视指定的资源,使所有的改变映射到给定的存储
type Reflector struct {
  name string
  // 期望放置到存储中的类型名称
  expectedTypeName string
  // 期望放置到存储的类型示例
  expectedType reflect.Type
  // 期望放置到存储中的 GVK 对象
  expectedGVK *schema.GroupVersionKind
  // watch 资源后同步的目标存储
  store Store
  // 用来执行 lists 和 watches
  listerWatcher ListerWatcher
  ...
}
// 不停调用 ListAndWatch 来获取所有的对象和后续的改变
func (r *Reflector) Run(stopCh <-chan struct{}) {
  klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
  wait.BackoffUntil(func() {
    if err := r.ListAndWatch(stopCh); err != nil {
      r.watchErrorHandler(r, err)
    }
  }, r.backoffManager, true, stopCh)
  klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

controller 的 Run 函数还调用了 processLoop 函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 消耗任务 queue
func (c *controller) processLoop() {
  for {
    // 这个 Queue 是在 sharedIndexInformer.Run 中调用 NewDeltaFIFOWithOptions 生成的 DeltaFIFO
    obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    if err != nil {
      if err == ErrFIFOClosed {
        return
      }
      if c.config.RetryOnError {
        // This is the safe way to re-enqueue.
        c.config.Queue.AddIfNotPresent(obj)
      }
    }
  }
}

会执行 DeltaFIFO 的 Pop 函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// tools/cache/delta_fifo.go
// 会阻塞直到 queue 中有数据时,如果同时有多个数据,会根据入队时间来处理
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  f.lock.Lock()
  defer f.lock.Unlock()
  for {
    // 为空则一直阻塞
    for len(f.queue) == 0 {
      if f.closed {
        return nil, ErrFIFOClosed
      }

      f.cond.Wait()
    }
    id := f.queue[0]
    f.queue = f.queue[1:]
    depth := len(f.queue)
    if f.initialPopulationCount > 0 {
      f.initialPopulationCount--
    }
    // 取出队列头对应的对象
    item, ok := f.items[id]
    if !ok {
      // This should never happen
      klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
      continue
    }
    delete(f.items, id)
    // 队列长度大于 10 才打印日志
    if depth > 10 {
      trace := utiltrace.New("DeltaFIFO Pop Process",
        utiltrace.Field{Key: "ID", Value: id},
        utiltrace.Field{Key: "Depth", Value: depth},
        utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
      defer trace.LogIfLong(100 * time.Millisecond)
    }
    // 调用传入的处理函数,controller.config.Process, 实际是 sharedIndexInformer.HandleDeltas 函数,里面调用 processDeltas 处理
    err := process(item)
    if e, ok := err.(ErrRequeue); ok {
      f.addIfNotPresent(id, item)
      err = e.Err
    }
    return item, err
  }
}

上面调用的 processDeltas 函数逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// tools/cache/controller.go
// 更新一个 Deltas 列表到 Store 中, 通知给定的 handler 更新、新增、删除事件
func processDeltas(
  handler ResourceEventHandler,
  clientState Store,
  transformer TransformFunc,
  deltas Deltas,
) error {
  // from oldest to newest
  for _, d := range deltas {
    obj := d.Object
    if transformer != nil {
      var err error
      obj, err = transformer(obj)
      if err != nil {
        return err
      }
    }

    switch d.Type {
    case Sync, Replaced, Added, Updated:
      // 获取老数据,更新 Store,然后调用 handler 处理更新
      if old, exists, err := clientState.Get(obj); err == nil && exists {
        if err := clientState.Update(obj); err != nil {
          return err
        }
        // 调用 sharedIndexInformer 的 OnUpdate 函数,前面有贴
        handler.OnUpdate(old, obj)
      } else {
        if err := clientState.Add(obj); err != nil {
          return err
        }
        handler.OnAdd(obj)
      }
    case Deleted:
      if err := clientState.Delete(obj); err != nil {
        return err
      }
      handler.OnDelete(obj)
    }
  }
  return nil
}

参考