Informer pt.1

Pokračujeme v sérii o programovaní pre Kubernetes. Dnes o Informeroch – mechanizme, na ktorom stojí veľká časť Kubernetes kódu. Čo to ten Informer je? Ako funguje? Ako si Informer naprogramovať?

Informer je mechanizmus na efektívne sledovanie Kubernetes objektov a reagovanie na ich zmeny. Namiesto neustáleho dotazovania API servera (polling) využíva kombináciu LIST (na počiatočné načítanie objektov) a WATCH (na získavanie iba zmien). Tieto objekty si ukladá do lokálnej cache, čím šetrí zdroje. Informer tiež umožňuje registráciu event handlerov, ktoré sa spustia pri vytváraní, aktualizácii alebo odstránení objektov. Týmto spôsobom aplikácia vždy pracuje s aktuálnym stavom objektov a môže dynamicky reagovať na zmeny.

Informer využíva na sledovanie a listovanie objektov Clientset. Informer je teda spôsob, ako optimalizovať prístup cez Clientset.

Z pohľadu architektúry, Informer pozostáva z niekoľkých komponentov, ktoré medzi sebou interagujú. Sú to:

Na začiatku Reflector načíta (LIST) všetky objekty a následne sleduje ich zmeny (WATCH). K tomu používa konkrétnu implementáciu ListWatch. Zmeny potom tlačí do tzv. Delta FIFO fronty. Ako jej názov napovedá, zmeny sú uchované vo forme delta udalostí - pridanie, aktualizácia, odstránenie.

img

Z fronty potom vyberá udalosti ďalšia komponenta - Controller. Ten volá príslušné tzv. event handlery (AddFunc, UpdateFunc, DeleteFunc). Ako už ich názov napovedá, ide o registrované funkcie, ktorých kód by mal reagovať na zmeny.

Aktuálny stav objektu sa potom synchronizuje v Store, odkiaľ môžu handlery pristupovať k aktuálnym dátam. Ide o akúsi lokálnu cache. Takto sa predíde zbytočnému zaťažovaniu Kubernetes API volaniami.

Aby sa zamedzilo nekonzistentnosti objektov v prípade, že sa nejaká udalosť stratí, Reflector raz za čas vykoná tzv. re-list. To znamená, že namiesto WATCH sa opäť použije LIST. Tento časový úsek sa nazýva resync period.

Teória je teda jasná. Ako sa ale taký informer naprogramuje? Podobne, ako v minulom článku, aj teraz si Informer naprogramujem ručne. To mi dá lepšiu predstavu o tom, ako skutočne Informer vyzerá.

Môj prvý Informer

Ako základ použijem kód z predchádzajúceho článku. Ide o Kubernetes CRD a Clientset starwars.okontajneroch.sk. Ak si to chceš vyskúšať, môžeš použiť toto repo.

Povedzme, že chcem mať program, ktorý bude sledovať moje starfighter objekty a reagovať na ich zmeny. Pre tento typ si vytvorím informer informers/starwars/v1/starfighter.go.

Ako prvé potrebujem ListWatch a implementovať si ListFunc a WatchFunc. Ide o implementácie, ktoré budú volané Reflectorom. Tieto funkcie budú pristupovať k objektom cez príslušný Clientset:

lw := &cache.ListWatch{
   // ziska zoznam vsetkych `starfighter` objektov v konkretnom `namespace`
   ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {  
      return client.StarwarsV1().Starfighters(namespace).List(context.TODO(), options)  
   },  
       
   // sleduje vsetky zmeny `starfighter` objektov v konkretnom `namespace`
   WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {  
      return client.StarwarsV1().Starfighters(namespace).Watch(context.TODO(), options)  
   },  
}

Moja implementácia je veľmi jednoduchá a bude sledovať všetky starfighter objekty v konkrétnom namespace. Ak by som chcel sledovať nejake konkrétnejšie objekty, tak tu môžem použiť napríklad LabelSelector.

Druhým krokom bude inicializácia veľmi jednoduchého Informera pomocou cache.NewInformerWithOptions(). Tu mu podhodím implementáciu ListWatch, pomocou ObjectType mu napoviem, aké objekty budem sledovať, no a nakoniec zaregistrujem 3 funkcie – AddFunc, UpdateFunc a DeleteFunc. Ide o spomínané event handlery, ktoré budú volané vždy, keď nad sledovaným objektom nastane zmena. Takto získam Controller a Store môjho Informera.

sfi.store, sfi.controller = cache.NewInformerWithOptions(cache.InformerOptions{  
   ListerWatcher: lw,  
   ResyncPeriod:  30 * time.Second,  
   ObjectType:    &starwarsv1.Starfighter{},  
   Handler: cache.ResourceEventHandlerFuncs{  
      AddFunc:    onStarfighterAdded,  
      UpdateFunc: onStarfighterUpdated,  
      DeleteFunc: onStarfighterDeleted,  
   },  
})

Informer je teda inicializovaný a môžem ho spustiť. Tu prichádza Go konkurenčné programovanie. Spustiť Informer znamená, že sa spustí Controller ako Go rutina. Tá je potom ukončená v momente, keď obdrží signál cez stop kanál.

func (sfi *StarfighterInformer) Start(stopCh <-chan struct{}) {  
    go sfi.controller.Run(stopCh)  
    cache.WaitForCacheSync(stopCh, sfi.controller.HasSynced)  
}

Nakoniec ostáva už len samotná implementácia event handlerov, ktorá nespraví nič iné, len vypíše informáciu na výstup. Čo stojí za pozornosť, je potreba pretypovania any na mnou sledovaný *starwarsv1.Starfighter.

func onStarfighterAdded(obj any) {  
    sf := obj.(*starwarsv1.Starfighter)  
    fmt.Printf("Starfighter added: %s\n", sf.Name)  
}  
  
func onStarfighterUpdated(oldObj, newObj any) {  
    sf := newObj.(*starwarsv1.Starfighter)  
    fmt.Printf("Starfighter updated: %s\n", sf.Name)  
}  
  
func onStarfighterDeleted(obj any) {  
    sf := obj.(*starwarsv1.Starfighter)  
    fmt.Printf("Starfighter deleted: %s\n", sf.Name)  
}

Informer je hotový. Jeho celý kód je dostupný tu. Teraz ho treba použiť. Vytvorím si main.go:

package main  
  
import (  
    "fmt"  
    "github.com/okontajneroch/starwars/clientset"  
    swinformer "github.com/okontajneroch/starwars/informers/starwars/v1"  
    "k8s.io/client-go/tools/clientcmd"  
    "os"  
    "os/signal"  
    "syscall"  
)  
  
func main() {  
    // vytvorím si stop kanál, ktorý bude uzavretý unixovským SIGINT alebo SIGTERM signálom  
    stopCh := make(chan struct{})  
    sigCh := make(chan os.Signal, 1)  
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)  
    go func() {  
       <-sigCh  
       close(stopCh)  
    }()  
  
    // vytvorím si klienta na komunikáciu s kube-apiserverom  
    config, _ := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(  
       clientcmd.NewDefaultClientConfigLoadingRules(),  
       nil,  
    ).ClientConfig()  
    client, _ := clientset.NewForConfig(config)  
  
    // vytvorím si Informer pre "default" namespace a spustím ho  
    informer := swinformer.NewStarfighterInformer(client, "default")  
    informer.Start(stopCh)  
    fmt.Println("Starfighter Informer is running. Press Ctrl+C to exit.")  
  
    // čakám na ukončenie programu  
    <-stopCh  
    fmt.Println("Shutting down Informer.")  
}

Program je jednoduchý. Na začiatku je trochu mágie, ktorá preklopí signál zo sigCh do stopCh. Takto dôjde k ukončeniu po stlačení Ctrl + C. Ďalej si vytvorím klienta a môj konkrétny Informer. Kým program čaká na ukončenie, Informer beží v samostatnej Go rutine a priebežne reaguje na zmeny.

Informer v akcii

Teraz budem potrebovať testovací Kubernetes cluster. Použijem môj obľúbený kind a aplikujem moje CRD:

$ kind create cluster
$ kubectl apply -f ./k8s/starwars.okontajneroch.sk_starfighters.yaml 

Spustím si môj program:

$ go run main.go
Starfighter informers is running. Press Ctrl+C to exit.

Ten naštartoval informer, ktorý začal sledovať objekty. Nakoľko v clustri ešte žiadne objekty starfighter nemám, tak výstup je veľmi jednoduchý.

Program ponechám bežať a v druhom termináli vytvorím objekt:

$ kubectl apply -f - << EOF
apiVersion: starwars.okontajneroch.sk/v1
kind: Starfighter
metadata:
  name: x-wing-1
  namespace: default
spec:
  faction: rebellion
  pilot: Luke Skywalker
  type: X-Wing
EOF

Na výstupe sa objaví nasledovný text:

Starfighter Informer is running. Press Ctrl+C to exit.
Starfighter added: x-wing-1

Informer teda funguje. Prostredníctvom Reflectora a mojej implementácie ListWatch prijíma udalosti, ktoré vkladá do Delta FIFO. Odtiaľ ich spracúva Controller, ktorý v tomto prípade vloží objekt do internej cache a následne vyvolá onStarfighterAdded().

Ak nechám program bežať dlhšie, na výstupe sa v intervale 30 sekúnd bude objavovať aj informácia o aktualizácii objektu:

Starfighter Informer is running. Press Ctrl+C to exit.
Starfighter added: x-wing-1
Starfighter updated: x-wing-1
Starfighter updated: x-wing-1
Starfighter updated: x-wing-1

Toto správanie je spôsobené re-listovaním. Informer v pravidelných intervaloch obnovuje stav internej cache tak, že Reflector použije ListFunc namiesto WatchFunc. Tento mechanizmus slúži na prevenciu nekonzistentnosti – ak by sa niektorá udalosť nepodarilo prijať (napríklad kvôli sieťovým problémom), cache sa opätovne zosynchronizuje so stavom v Kubernetes API.

Treba si uvedomiť jednu dôležitú vec: event handlery nepracujú s udalosťami v klastri, ale s udalosťami v lokálnej cache.

Teda volania ...Added(), ...Updated() alebo ...Deleted() neznamenajú automaticky, že objekt bol skutočne pridaný, aktualizovaný alebo zmazaný v Kubernetese. Znamenajú iba to, že objekt bol pridaný, aktualizovaný alebo odstránený v lokálnej pamäti - Store.

Dôkazom toho je, že ak môj program s Informerom ukončím a opäť spustím, na výstupe dostanem:

$ go run main.go
Starfighter added: x-wing-1
Starfighter Informer is running. Press Ctrl+C to exit.

Z výpisu je vidieť, že došlo k opätovnému vyvolaniu onStarfighterAdded(), hoci som do klastra žiadny nový objekt nepridal.

Prečo sa to stalo? Reflector na začiatku spustenia vykonal ListFunc a všetky sledované objekty pridal do lokálnej cache, čo následne vyvolalo volanie event handlera onStarfighterAdded().

Tento fakt je mimoriadne dôležité zohľadniť v implementácii event handlerov, aby sa predišlo nesprávnemu spracovaniu udalostí.

Záver

V tomto článku sme si ukázali, ako funguje Informer v Kubernetes a ako ho môžeme implementovať na sledovanie vlastných objektov. Videli sme, že Informer využíva Reflector, ListWatch, Delta FIFO a Controller, pričom jeho event handlery pracujú nad lokálnou cache, nie priamo s klastrom.

V ďalšej časti sa pozrieme na pokročilejšie koncepty, ako sú Indexer, Lister a Shared Informer, ktoré umožňujú efektívnejšiu prácu s údajmi a zdieľanie Informerov medzi viacerými komponentmi.

<