简介
informer 是 client-go 提供的一个工具,主要是用来在 api-server
和程序之间同步指定的资源,并作为本地缓存,比如 Pod
, Deployment
等等。
我们都知道,kubernetes 中有很多个 controller 在运行,来保证它们关注的资源处于符合期望的状态。比如 ReplicasSet
,会保证该 ReplicaSet
有期望的副本数一直运行。这是通过一个不会终止的循环,不断监控当前集群的状态,然后调整的期望的状态。如下代码所示:
for {
current := getCurrentState()
desired := getDesiredState()
reconcile(current, desired)
}
因为这样的需求,所以 controller 需要不停的获取集群中一些资源的状态,然后调整到期望的状态。如果我们是通过网络不停的查询集群状态,将是一个性能很差的方案。为了性能,可以使用缓存,来将指定的资源保存在本地,只要我们及时的更新缓存,就不需要通过网络向集群查询了。
这就是 informer 出现的原因。它通过 List&Watch
来实时同步 api-server 中的资源,然后将资源分成三种事件来触发不同的处理。这三种事件是: Add
, Update
和 Delete
。同时它还提供了一个抽象的 Store
来提供本地缓存的查询。
最后,它还可以配合 workqueue
来实现本地的重试等等。informer 是一个非常强大的工具,在我们做 kubernetes 上 controller 的开发时必不可少,但是因为 controller 的编写本身就是一件比较复杂的工作,我们必须要对 informer 本身,以及其周边的工具有清晰的理解,才能写出质量更好的代码。
工作流程
这里先放上一张图来做参考。一般我们在使用 informer 时,会使用如下的代码:
// filterd
lw := cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, err error) {
return k8sCli.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (w watch.Interface, err error) {
return k8sCli.CoreV1().Pods(metav1.NamespaceAll).Watch(context.TODO(), options)
},
}
// indexerInformer, shareInformer
store, ctrl := cache.NewInformer(&lw, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: handleAddPod,
UpdateFunc: handleUpdatePod,
DeleteFunc: handleDeletePod,
})
stopChan := signals.SetupSignalHandler()
go ctrl.Run(stopChan)
if sync := cache.WaitForCacheSync(stopChan, ctrl.HasSynced); !sync {
log.Println("not sync")
}
log.Println("synchronized finish")
首先,我们定义了 ListWatch 的方法,informer 会用 List 方法来获取所有的 Pod 资源,然后使用 Watch 来监听之后 Pod 资源的更新。
之后我们实例化了一个 informer。第二个参数是资源类型。第三个参数是重新同步的周期,0为不同步,否则会在每个周期开始时重新 List 所有的资源。第四个参数是 ResourceEventHandlerFuncs
,这里的 AddFunc
, UpdateFunc
,DeleteFunc
是本地缓存在增加,更新和删除时触发的事件。
NewInformer
返回了 store 和 ctrl 两个值,store 就是 pod 的本地缓存,我们可以通过查询 store 来代替直接向 api-server 查询。这个返回的 store 实现了如下的接口:
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
Resync() error
}
在 controller 中,我们一般使用 List*
, Get*
方法,可以用来查询本地的缓存。同时不要使用其他的方法,这会导致一些不可预知的问题。
另外一个返回值 ctrl,实现了如下的接口:
type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
}
这个接口很简单,Run
用来开始启动同步,stopCh
用来随时停止同步,HasSynced
用来判断同步是否完成。LastSyncResourceVersion
用来获取最新同步的资源 version。
cache.WaitForCacheSync
用来等待同步完成。
总结
关于 informer 的基本使用就先介绍这么多。后面会对 informer 中涉及的代码进行详细的分析,包括 List&Watch
的机制、DeltaFIFO
的实现、本地缓存(Store) 的实现等等。