测试代码
下面代码是是从 client-go 中的测试代码修改而来,通过单步运行来帮助梳理代码逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
// 跟此文件逻辑一致:examples/fake-client/main_test.go
package main
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
watcherStarted := make(chan struct{})
// Create the fake client.
client := fake.NewSimpleClientset()
// A catch-all watch reactor that allows us to inject the watcherStarted channel.
client.PrependWatchReactor("*", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
gvr := action.GetResource()
ns := action.GetNamespace()
watch, err := client.Tracker().Watch(gvr, ns)
if err != nil {
return false, nil, err
}
close(watcherStarted)
return true, watch, nil
})
// We will create an informer that writes added pods to a channel.
pods := make(chan *v1.Pod, 1)
informers := informers.NewSharedInformerFactory(client, 0)
podInformer := informers.Core().V1().Pods().Informer()
podInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
fmt.Printf("pod added: %s/%s", pod.Namespace, pod.Name)
pods <- pod
},
})
informers.Start(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced)
<-watcherStarted
// Inject an event into the fake client.
p := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "my-pod"}}
_, err := client.CoreV1().Pods("test-ns").Create(context.TODO(), p, metav1.CreateOptions{})
if err != nil {
fmt.Printf("error injecting pod add: %v", err)
}
select {
case pod := <-pods:
fmt.Printf("Got pod from channel: %s/%s", pod.Namespace, pod.Name)
case <-time.After(wait.ForeverTestTimeout):
fmt.Printf("Informer did not get the added pod")
}
}
|
源码解析
在分析源码之前,可以借助下图的整体流程图来帮助理解。
先来看下 sharedInformerFactory 结构,它可以统一管理控制器中需要的各资源对象的 informer 实例,避免同一个资源创建多个实例,在上面测试代码中先获取了一个 sharedInformerFactory,在后续就可以通过它拿到对应 API 的 informer。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
// informers/factory.go
// 实现了 SharedInformerFactory 接口,为所有已知的 API GVR 提供一个共享的 informer
type sharedInformerFactory struct {
client kubernetes.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration
// map 保存每种类型的 SharedIndexInformer
informers map[reflect.Type]cache.SharedIndexInformer
// startedInformers is used for tracking which informers have been started.
// This allows Start() to be called multiple times safely.
startedInformers map[reflect.Type]bool
// wg tracks how many goroutines were started.
wg sync.WaitGroup
// shuttingDown is true when Shutdown has been called. It may still be running
// because it needs to wait for goroutines.
shuttingDown bool
}
// 为所有命名空间生成一个 SharedInformerFactory
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync)
}
// 实际创建的是 sharedInformerFactory
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
// Apply all options
for _, opt := range options {
factory = opt(factory)
}
return factory
}
// 返回 core API 分组
func (f *sharedInformerFactory) Core() core.Interface {
return core.New(f, f.namespace, f.tweakListOptions)
}
|
测试代码中,通过 informers.Core().V1().Pods().Informer()
返回来 pods 对应的 Informer 实例,其中 core 包中主要是提供返回指定版本的 API 组。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// informers/core/interface.go
// Interface 提供了方法来访问这个组所有版本
type Interface interface {
// V1 provides access to shared informers for resources in V1.
V1() v1.Interface
}
// 表示 k8s 的一个 api 组
type group struct {
factory internalinterfaces.SharedInformerFactory
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// 返回 api 组
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// 返回 api 组的 V1 版本
func (g *group) V1() v1.Interface {
return v1.New(g.factory, g.namespace, g.tweakListOptions)
}
|
core/v1 包里面的逻辑,主要是提供方法获取/api/v1 路径下的所有 API 的 informer 实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
// informers/core/v1/interface.go
// Interface 提供方法访问这个版本下的所有 informer
type Interface interface {
ComponentStatuses() ComponentStatusInformer
ConfigMaps() ConfigMapInformer
Endpoints() EndpointsInformer
Events() EventInformer
LimitRanges() LimitRangeInformer
Namespaces() NamespaceInformer
Nodes() NodeInformer
PersistentVolumes() PersistentVolumeInformer
PersistentVolumeClaims() PersistentVolumeClaimInformer
Pods() PodInformer
PodTemplates() PodTemplateInformer
ReplicationControllers() ReplicationControllerInformer
ResourceQuotas() ResourceQuotaInformer
Secrets() SecretInformer
Services() ServiceInformer
ServiceAccounts() ServiceAccountInformer
}
// 表示 api 组下面的一个版本
type version struct {
factory internalinterfaces.SharedInformerFactory
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// 返回一个新的版本
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// 返回 PodInformer 实例
func (v *version) Pods() PodInformer {
return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
// ... 返回其它实例
|
上面返回的是 PodInformer 结构,定义为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
// informers/core/v1/pod.go
// 提供方法访问底层的 informer 和 lister
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
// 实现了 PodInformer
type podInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
// 创建 pods 的 SharedIndexInformer,不同类型资源的 Informer 都是 SharedIndexInformer,只是创建时传入的 ListWatch 函数和资源类型不同
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
// podInformer 的默认创建函数,factory 中会调用
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
// Indexer 映射名字到索引函数,MetaNamespaceIndexFunc 是默认的索引函数,基于对象的命名空间进行索引
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
// 实际调用的是 main.go 中初始化的 sharedInformerFactory 的 InformerFor 方法,返回 pods 的 SharedIndexInformer
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
func (f *podInformer) Lister() v1.PodLister {
return v1.NewPodLister(f.Informer().GetIndexer())
}
|
看下 InformerFor 是如何实现的,从下面可以看到 factory 增加了本地缓存逻辑,最后还是调用对应类型的创建函数生成 Informer,即 newFunc 为上面 podInformer.Informer() 方法中调用时传入的参数 defaultInformer 函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists { // 如果已经创建了这种类型的 informer,直接返回
return informer
}
// 判断是否设置了同步周期
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
// 创建 informer 并加入到 map 中
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
|
newFunc 所代表的 NewFilteredPodInformer 方法,最终会调用到 NewSharedIndexInformer,创建具体的 Informer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
// tools/cache/shared_informer.go
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
// SharedIndexInformer 的实现
type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor *sharedProcessor
cacheMutationDetector MutationDetector
listerWatcher ListerWatcher
// 此 informer 期望处理的对象
objectType runtime.Object
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
// clock allows for testability
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
blockDeltas sync.Mutex
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
transform TransformFunc
}
|
Indexer
在 sharedIndexInformer 中有使用 Indexer,其定义为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
// tools/cache/index.go
// Indexer 使用多个索引来扩展存储,限制每个累积器只能保存当前对象,函数是线程安全的
type Indexer interface {
Store
// 根据 obj 对象在 indexName 索引函数上的 key,查找匹配的对象列表
Index(indexName string, obj interface{}) ([]interface{}, error)
// 根据 indexName 索引函数,查找索引 key 为 indexedValue 的对象 key 列表
IndexKeys(indexName, indexedValue string) ([]string, error)
// 返回一个有 IndexName 索引函数生成的 key 的列表
ListIndexFuncValues(indexName string) []string
// 根据 indexName 索引函数,查找索引 key 为 indexedValue 的对象列表
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// 返回所有的 Indexers
GetIndexers() Indexers
// 添加 indexer
AddIndexers(newIndexers Indexers) error
}
// 知道如何计算对象的索引值,输出字符串索引数组
type IndexFunc func(obj interface{}) ([]string, error)
// 默认索引函数,基于对象的命名空间
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{meta.GetNamespace()}, nil
}
// 映射索引值到存储对象的 key 集合,多个对象的索引值可能相同
type Index map[string]sets.String
// 索引名称到索引函数的映射
type Indexers map[string]IndexFunc
// 每个索引名称到具体 Index 的映射
type Indices map[string]Index
|
是通过下面函数创建的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// tools/cache/store.go
// NewIndexer 返回一个 Indexer 的实现
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
// cache 通过一个 ThreadSafeStore 和 KeyFunc 实现了 Indexer
type cache struct {
// cacheStorage 线程安全的缓存
cacheStorage ThreadSafeStore
// keyFunc 用来得到存储对象的 key 和检索对象
keyFunc KeyFunc
}
|
在上面使用的 KeyFunc 为 DeletionHandlingMetaNamespaceKeyFunc,里面会调用 MetaNamespaceKeyFunc,即:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// tools/cache/store.go
// 默认的 KeyFunc,为 API 的对象生成 key,格式为:<namespace>/<name>
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
}
return meta.GetName(), nil
}
|
除了上面的 MetaNamespaceIndexFunc 索引函数外,Kubernetes 中还有 indexByPodNodeName,索引函数是根据 pods 所在 node 的名称:
1
2
3
4
5
6
7
8
9
10
11
12
|
// kubernetes 源码下:pkg/controller/daemon/daemon_controller.go
func indexByPodNodeName(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
// We are only interested in active pods with nodeName set
if len(pod.Spec.NodeName) == 0 || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
return []string{}, nil
}
return []string{pod.Spec.NodeName}, nil
}
|
以上结构体的关系,可以参考以下的图示结构:
添加事件处理
main 函数中调用了 podInformer.AddEventHandler 来添加 pods 的事件处理。
1
2
3
4
5
6
7
8
|
podInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
// 添加 pods 时打印日志,并加入到 channel 中
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
fmt.Printf("pod added: %s/%s", pod.Namespace, pod.Name)
pods <- pod
},
})
|
AddEventHandler 逻辑为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
// tools/cache/shared_informer.go
// 增加了一个同步周期参数
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.stopped {
return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler)
}
if resyncPeriod > 0 {
... // 同步周期参数校验和处理
}
// 创建 listener,用于传递事件通知并进行处理
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
if !s.started { // 没有启动,直接加入到 processor 中即可
return s.processor.addListener(listener), nil
}
// 已经启动了,需要安全地加入
// 1. stop sending add/update/delete notifications
// 2. do a list against the store
// 3. send synthetic "Add" events to the new handler
// 4. unblock
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
handle := s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
return handle, nil
}
|
processorListener
processorListener 用于传递事件通知并进行处理,其定义和创建逻辑为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
// tools/cache/shared_informer.go
// processorListener 通过一个 sharedProcessor 来传递通知给一个 ResourceEventHandler
// 使用了两个协程,两个无缓冲的 channel,一个无限的循环缓存
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
handler ResourceEventHandler
// pendingNotifications 无限的循环缓存,保存了所有还未分发的通知, 每个 listener 都有一个,失败或者停滞的 listener 将会有个无限的通知,直到 OOM
pendingNotifications buffer.RingGrowing
// requestedResyncPeriod 是 listener 想要从共享 Informer 完全同步的周期
requestedResyncPeriod time.Duration
// resyncPeriod 是在 listener 逻辑中使用的阈值,这个值除了在 sharedIndexInformer 不进行同步时跟 requestedResyncPeriod 不一样,其他时候都一致。
resyncPeriod time.Duration
// nextResync 应该进行完全同步的最早时间
nextResync time.Time
resyncLock sync.Mutex
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
ret.determineNextResync(now)
return ret
}
|
sharedProcessor
sharedProcessr 结构用于分发 deltaFIFO 的对象,回调用户配置的 EventHandler 方法,其定义和加入 listener 逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
// tools/cache/shared_informer.go
// sharedProcessor 有一个 processorListener 集合,可以分发通知对象给它的 listener
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
// 保存每个 listener 当前是否在同步中
listeners map[*processorListener]bool
clock clock.Clock
wg wait.Group
}
func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
if p.listeners == nil {
p.listeners = make(map[*processorListener]bool)
}
p.listeners[listener] = true
if p.listenersStarted {
// run 实际处理通知的地方,会调用 hander 的函数进行处理
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
return listener
}
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification: // 获取通知并发生到 nextCh
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // 没有通知可以分发了
notification = notificationToAdd
nextCh = p.nextCh
} else { // 有一个通知等待被分发
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
func (p *processorListener) run() {
// 这个调用会一直阻塞到 channel 关闭
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh { // 从 nextCh 中获取进行实际的操作
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
close(stopCh)
}, 1*time.Second, stopCh)
}
|
processor 是在 sharedIndexInformer 中使用的,可以看到主要用在以下几个函数中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
// tools/cache/shared_informer.go
// 分发添加事件
func (s *sharedIndexInformer) OnAdd(obj interface{}) {
s.cacheMutationDetector.AddObject(obj)
s.processor.distribute(addNotification{newObj: obj}, false)
}
// 分发更新事件
func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
isSync := false
// 设置 isSync
if accessor, err := meta.Accessor(new); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
s.cacheMutationDetector.AddObject(new)
s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}
// 分发删除事件
func (s *sharedIndexInformer) OnDelete(old interface{}) {
s.processor.distribute(deleteNotification{oldObj: old}, false)
}
// 分发事件,添加到 listener 中
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for listener, isSyncing := range p.listeners {
switch {
case !sync:
// 非同步消息,分发给所有 listener
listener.add(obj)
case isSyncing:
// 同步消息分发给所有的同步 listener
listener.add(obj)
default:
// 跳过(同步消息不分发给非同步 listener)
}
}
}
// 添加到 addCh 通道中,会在 pop 中消费
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
|
测试代码中调用 informers.Start(ctx.Done()) 来启动 sharedInformerFactory 里的所有 informer,启动代码为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
if f.shuttingDown {
return
}
// 遍历所有的 informers
for informerType, informer := range f.informers {
// informer 没有启动则启动
if !f.startedInformers[informerType] {
f.wg.Add(1)
informer := informer
go func() {
defer f.wg.Done()
informer.Run(stopCh)
}()
f.startedInformers[informerType] = true
}
}
}
|
这里调用了 informer 的 run 函数,进行实际的监听和事件处理操作,代码为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
// tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
if s.HasStarted() {
klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
return
}
// 创建 Queue 用来处理对象的改变
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
// 用于创建底层 controller 的配置
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 创建 Controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
// 启动 monitor
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// 启动 processor
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// 启动 controller
s.controller.Run(stopCh)
}
|
DeltaFIFO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
// tools/cache/delta_fifo.go
// 跟 FIFO 类似,但是有两点不同。
// 一是与对象 key 关联的累积器不是一个对象,而是一个 Deltas,是一个此对象的 Delta 值的切片
// 二是一个对象有两种额外的方式添加到累加器中:Replaced 和 Sync
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// 映射 key 到 Deltas.
items map[string]Deltas
// 维持 Pop() 中消费的 key 的 FIFO 顺序
queue []string
// 如果由 Replace 函数插入了 items,则为 true
populated bool
// 第一次调用 Replace 函数插入的 items 数量, 在 Pop 中减小
initialPopulationCount int
// 用来生成 item 的 key
keyFunc KeyFunc
// 实际是 Indexer
knownObjects KeyListerGetter
closed bool
emitDeltaTypeReplaced bool
}
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc
}
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
}
f.cond.L = &f.lock
return f
}
|
cacheMutationDetector
cacheMutationDetector会比较对象,判断对象是否改变。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
// tools/cache/mutation_detector.go
// 提供一种检测缓存对象是否突变的方式
type defaultCacheMutationDetector struct {
name string
period time.Duration
// compareLock ensures only a single call to CompareObjects runs at a time
compareObjectsLock sync.Mutex
// addLock guards addedObjs between AddObject and CompareObjects
addedObjsLock sync.Mutex
addedObjs []cacheObj
cachedObjs []cacheObj
retainDuration time.Duration
lastRotated time.Time
retainedCachedObjs []cacheObj
failureFunc func(message string)
}
func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
for {
if d.lastRotated.IsZero() {
d.lastRotated = time.Now()
} else if time.Since(d.lastRotated) > d.retainDuration {
d.retainedCachedObjs = d.cachedObjs
d.cachedObjs = nil
d.lastRotated = time.Now()
}
// 比较对象是否改变
d.CompareObjects()
select {
case <-stopCh:
return
case <-time.After(d.period):
}
}
}
|
sharedProcessor 的 run 函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
// 启动所有 listener 的 run 函数和 pop 函数
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
// 处理关闭后操作
<-stopCh
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
for listener := range p.listeners {
close(listener.addCh) //关闭 listener
}
// 清除 listener 列表
p.listeners = nil
p.listenersStarted = false
p.wg.Wait()
}
|
controller 定义和 Run 函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
// tools/cache/controller.go
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
// 处理 items,直到 stopCh 中被发送了一个值
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
// 关闭 Queue
c.config.Queue.Close()
}()
// 创建 Reflector,用于保证给定存储中的对象和服务器内容是一致的
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
// 启动 Reflector
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
|
Reflector 定义和 Run 函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// tools/cache/reflector.go
// 监视指定的资源,使所有的改变映射到给定的存储
type Reflector struct {
name string
// 期望放置到存储中的类型名称
expectedTypeName string
// 期望放置到存储的类型示例
expectedType reflect.Type
// 期望放置到存储中的 GVK 对象
expectedGVK *schema.GroupVersionKind
// watch 资源后同步的目标存储
store Store
// 用来执行 lists 和 watches
listerWatcher ListerWatcher
...
}
// 不停调用 ListAndWatch 来获取所有的对象和后续的改变
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
|
controller 的 Run 函数还调用了 processLoop 函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// 消耗任务 queue
func (c *controller) processLoop() {
for {
// 这个 Queue 是在 sharedIndexInformer.Run 中调用 NewDeltaFIFOWithOptions 生成的 DeltaFIFO
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
|
会执行 DeltaFIFO 的 Pop 函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
// tools/cache/delta_fifo.go
// 会阻塞直到 queue 中有数据时,如果同时有多个数据,会根据入队时间来处理
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 为空则一直阻塞
for len(f.queue) == 0 {
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
// 取出队列头对应的对象
item, ok := f.items[id]
if !ok {
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
// 队列长度大于 10 才打印日志
if depth > 10 {
trace := utiltrace.New("DeltaFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: depth},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
// 调用传入的处理函数,controller.config.Process, 实际是 sharedIndexInformer.HandleDeltas 函数,里面调用 processDeltas 处理
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
|
上面调用的 processDeltas 函数逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
// tools/cache/controller.go
// 更新一个 Deltas 列表到 Store 中, 通知给定的 handler 更新、新增、删除事件
func processDeltas(
handler ResourceEventHandler,
clientState Store,
transformer TransformFunc,
deltas Deltas,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}
switch d.Type {
case Sync, Replaced, Added, Updated:
// 获取老数据,更新 Store,然后调用 handler 处理更新
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
// 调用 sharedIndexInformer 的 OnUpdate 函数,前面有贴
handler.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
handler.OnDelete(obj)
}
}
return nil
}
|
参考