Pokračujeme v sérii o programovaní pre Kubernetes. Dnes o Informeroch – mechanizme, na ktorom stojí veľká časť Kubernetes kódu. Čo to ten Informer je? Ako funguje? Ako si Informer naprogramovať?
Informer je mechanizmus na efektívne sledovanie Kubernetes objektov a reagovanie
na ich zmeny. Namiesto neustáleho dotazovania API servera (polling
)
využíva kombináciu LIST
(na počiatočné načítanie objektov) a WATCH
(na získavanie iba zmien). Tieto objekty si ukladá do lokálnej cache, čím
šetrí zdroje. Informer tiež umožňuje registráciu event handlerov, ktoré sa
spustia pri vytváraní, aktualizácii alebo odstránení objektov. Týmto spôsobom
aplikácia vždy pracuje s aktuálnym stavom objektov a môže dynamicky reagovať
na zmeny.
Informer využíva na sledovanie a listovanie objektov Clientset. Informer je teda spôsob, ako optimalizovať prístup cez Clientset.
Z pohľadu architektúry, Informer pozostáva z niekoľkých komponentov, ktoré medzi sebou interagujú. Sú to:
- Reflector a ListWatch
- Delta FIFO
- Controller
- Store
- Event Handlers
Na začiatku Reflector načíta (LIST
) všetky objekty a následne sleduje
ich zmeny (WATCH
). K tomu používa konkrétnu implementáciu ListWatch.
Zmeny potom tlačí do tzv. Delta FIFO fronty. Ako jej názov napovedá,
zmeny sú uchované vo forme delta udalostí - pridanie, aktualizácia, odstránenie.

Z fronty potom vyberá udalosti ďalšia komponenta - Controller. Ten volá
príslušné tzv. event handlery (AddFunc
, UpdateFunc
, DeleteFunc
). Ako už
ich názov napovedá, ide o registrované funkcie, ktorých kód by mal reagovať na
zmeny.
Aktuálny stav objektu sa potom synchronizuje v Store, odkiaľ môžu handlery pristupovať k aktuálnym dátam. Ide o akúsi lokálnu cache. Takto sa predíde zbytočnému zaťažovaniu Kubernetes API volaniami.
Aby sa zamedzilo nekonzistentnosti objektov v prípade, že sa nejaká udalosť
stratí, Reflector raz za čas vykoná tzv. re-list. To znamená, že
namiesto WATCH
sa opäť použije LIST
. Tento časový úsek sa nazýva
resync period.
Teória je teda jasná. Ako sa ale taký informer naprogramuje? Podobne, ako v minulom článku, aj teraz si Informer naprogramujem ručne. To mi dá lepšiu predstavu o tom, ako skutočne Informer vyzerá.
Môj prvý Informer
Ako základ použijem kód z predchádzajúceho článku. Ide o Kubernetes CRD a Clientset starwars.okontajneroch.sk. Ak si to chceš vyskúšať, môžeš použiť toto repo.
Povedzme, že chcem mať program, ktorý bude sledovať moje starfighter
objekty
a reagovať na ich zmeny. Pre tento typ si vytvorím informer
informers/starwars/v1/starfighter.go
.
Ako prvé potrebujem ListWatch a implementovať si ListFunc
a WatchFunc
.
Ide o implementácie, ktoré budú volané Reflectorom. Tieto funkcie budú
pristupovať k objektom cez príslušný Clientset:
lw := &cache.ListWatch{
// ziska zoznam vsetkych `starfighter` objektov v konkretnom `namespace`
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.StarwarsV1().Starfighters(namespace).List(context.TODO(), options)
},
// sleduje vsetky zmeny `starfighter` objektov v konkretnom `namespace`
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.StarwarsV1().Starfighters(namespace).Watch(context.TODO(), options)
},
}
Moja implementácia je veľmi jednoduchá a bude sledovať všetky starfighter
objekty v konkrétnom namespace
. Ak by som chcel sledovať nejake konkrétnejšie
objekty, tak tu môžem použiť napríklad LabelSelector
.
Druhým krokom bude inicializácia veľmi jednoduchého Informera pomocou
cache.NewInformerWithOptions()
. Tu mu podhodím implementáciu ListWatch
,
pomocou ObjectType
mu napoviem, aké objekty budem sledovať, no a nakoniec
zaregistrujem 3 funkcie – AddFunc
, UpdateFunc
a DeleteFunc
. Ide o
spomínané event handlery, ktoré budú volané vždy, keď nad sledovaným objektom
nastane zmena. Takto získam Controller a Store môjho Informera.
sfi.store, sfi.controller = cache.NewInformerWithOptions(cache.InformerOptions{
ListerWatcher: lw,
ResyncPeriod: 30 * time.Second,
ObjectType: &starwarsv1.Starfighter{},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: onStarfighterAdded,
UpdateFunc: onStarfighterUpdated,
DeleteFunc: onStarfighterDeleted,
},
})
Informer je teda inicializovaný a môžem ho spustiť. Tu prichádza Go konkurenčné programovanie. Spustiť Informer znamená, že sa spustí Controller ako Go rutina. Tá je potom ukončená v momente, keď obdrží signál cez stop kanál.
func (sfi *StarfighterInformer) Start(stopCh <-chan struct{}) {
go sfi.controller.Run(stopCh)
cache.WaitForCacheSync(stopCh, sfi.controller.HasSynced)
}
Nakoniec ostáva už len samotná implementácia event handlerov, ktorá nespraví
nič iné, len vypíše informáciu na výstup. Čo stojí za pozornosť, je potreba
pretypovania any na mnou sledovaný *starwarsv1.Starfighter
.
func onStarfighterAdded(obj any) {
sf := obj.(*starwarsv1.Starfighter)
fmt.Printf("Starfighter added: %s\n", sf.Name)
}
func onStarfighterUpdated(oldObj, newObj any) {
sf := newObj.(*starwarsv1.Starfighter)
fmt.Printf("Starfighter updated: %s\n", sf.Name)
}
func onStarfighterDeleted(obj any) {
sf := obj.(*starwarsv1.Starfighter)
fmt.Printf("Starfighter deleted: %s\n", sf.Name)
}
Informer je hotový. Jeho celý kód je dostupný tu. Teraz ho treba použiť. Vytvorím si main.go
:
package main
import (
"fmt"
"github.com/okontajneroch/starwars/clientset"
swinformer "github.com/okontajneroch/starwars/informers/starwars/v1"
"k8s.io/client-go/tools/clientcmd"
"os"
"os/signal"
"syscall"
)
func main() {
// vytvorím si stop kanál, ktorý bude uzavretý unixovským SIGINT alebo SIGTERM signálom
stopCh := make(chan struct{})
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
close(stopCh)
}()
// vytvorím si klienta na komunikáciu s kube-apiserverom
config, _ := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
nil,
).ClientConfig()
client, _ := clientset.NewForConfig(config)
// vytvorím si Informer pre "default" namespace a spustím ho
informer := swinformer.NewStarfighterInformer(client, "default")
informer.Start(stopCh)
fmt.Println("Starfighter Informer is running. Press Ctrl+C to exit.")
// čakám na ukončenie programu
<-stopCh
fmt.Println("Shutting down Informer.")
}
Program je jednoduchý. Na začiatku je trochu mágie, ktorá preklopí signál zo sigCh do stopCh. Takto dôjde k ukončeniu po stlačení Ctrl + C. Ďalej si vytvorím klienta a môj konkrétny Informer. Kým program čaká na ukončenie, Informer beží v samostatnej Go rutine a priebežne reaguje na zmeny.
Informer v akcii
Teraz budem potrebovať testovací Kubernetes cluster. Použijem môj obľúbený kind a aplikujem moje CRD:
$ kind create cluster
$ kubectl apply -f ./k8s/starwars.okontajneroch.sk_starfighters.yaml
Spustím si môj program:
$ go run main.go
Starfighter informers is running. Press Ctrl+C to exit.
Ten naštartoval informer, ktorý začal sledovať objekty. Nakoľko v clustri
ešte žiadne objekty starfighter
nemám, tak výstup je veľmi jednoduchý.
Program ponechám bežať a v druhom termináli vytvorím objekt:
$ kubectl apply -f - << EOF
apiVersion: starwars.okontajneroch.sk/v1
kind: Starfighter
metadata:
name: x-wing-1
namespace: default
spec:
faction: rebellion
pilot: Luke Skywalker
type: X-Wing
EOF
Na výstupe sa objaví nasledovný text:
Starfighter Informer is running. Press Ctrl+C to exit.
Starfighter added: x-wing-1
Informer teda funguje. Prostredníctvom Reflectora a mojej implementácie
ListWatch
prijíma udalosti, ktoré vkladá do Delta FIFO. Odtiaľ ich spracúva
Controller, ktorý v tomto prípade vloží objekt do internej cache a následne
vyvolá onStarfighterAdded()
.
Ak nechám program bežať dlhšie, na výstupe sa v intervale 30 sekúnd bude objavovať aj informácia o aktualizácii objektu:
Starfighter Informer is running. Press Ctrl+C to exit.
Starfighter added: x-wing-1
Starfighter updated: x-wing-1
Starfighter updated: x-wing-1
Starfighter updated: x-wing-1
Toto správanie je spôsobené re-listovaním. Informer v pravidelných intervaloch
obnovuje stav internej cache tak, že Reflector použije ListFunc
namiesto
WatchFunc
. Tento mechanizmus slúži na prevenciu nekonzistentnosti – ak by sa
niektorá udalosť nepodarilo prijať (napríklad kvôli sieťovým problémom),
cache sa opätovne zosynchronizuje so stavom v Kubernetes API.
Treba si uvedomiť jednu dôležitú vec: event handlery nepracujú s udalosťami v klastri, ale s udalosťami v lokálnej cache.
Teda volania ...Added()
, ...Updated()
alebo ...Deleted()
neznamenajú
automaticky, že objekt bol skutočne pridaný, aktualizovaný alebo zmazaný v
Kubernetese. Znamenajú iba to, že objekt bol pridaný, aktualizovaný alebo
odstránený v lokálnej pamäti - Store.
Dôkazom toho je, že ak môj program s Informerom ukončím a opäť spustím, na výstupe dostanem:
$ go run main.go
Starfighter added: x-wing-1
Starfighter Informer is running. Press Ctrl+C to exit.
Z výpisu je vidieť, že došlo k opätovnému vyvolaniu onStarfighterAdded()
,
hoci som do klastra žiadny nový objekt nepridal.
Prečo sa to stalo? Reflector na začiatku spustenia vykonal ListFunc a všetky
sledované objekty pridal do lokálnej cache, čo následne vyvolalo volanie event
handlera onStarfighterAdded()
.
Tento fakt je mimoriadne dôležité zohľadniť v implementácii event handlerov, aby sa predišlo nesprávnemu spracovaniu udalostí.
Záver
V tomto článku sme si ukázali, ako funguje Informer v Kubernetes a ako ho môžeme implementovať na sledovanie vlastných objektov. Videli sme, že Informer využíva Reflector, ListWatch, Delta FIFO a Controller, pričom jeho event handlery pracujú nad lokálnou cache, nie priamo s klastrom.
V ďalšej časti sa pozrieme na pokročilejšie koncepty, ako sú Indexer, Lister a Shared Informer, ktoré umožňujú efektívnejšiu prácu s údajmi a zdieľanie Informerov medzi viacerými komponentmi.