@@ -22,22 +22,11 @@ import (
2222 "sync"
2323 "time"
2424
25- log "github.com/sirupsen/logrus"
26- v1 "k8s.io/api/core/v1"
27- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28- "k8s.io/apimachinery/pkg/fields"
29- "k8s.io/apimachinery/pkg/runtime"
3025 "k8s.io/client-go/kubernetes"
3126 "k8s.io/client-go/rest"
32- "k8s.io/client-go/tools/cache"
33-
34- "px.dev/pixie/src/vizier/services/metadata/storepb"
3527)
3628
3729const (
38- // refreshUpdateDuration is how frequently we request all k8s updates to
39- // refresh the ones in our DataStore.
40- refreshUpdateDuration = 12 * time .Hour
4130 // resourceUpdateTTL is how long the k8s update live in the DataStore.
4231 resourceUpdateTTL = 24 * time .Hour
4332)
@@ -53,14 +42,7 @@ type Controller struct {
5342
5443// watcher watches a k8s resource type and forwards the updates to the given update channel.
5544type watcher interface {
56- Sync (storedUpdates []* storepb.K8SResource ) error
57- StartWatcher (chan struct {}, * sync.WaitGroup )
58- }
59-
60- func listObject (resource string , clientset * kubernetes.Clientset ) (runtime.Object , error ) {
61- watcher := cache .NewListWatchFromClient (clientset .CoreV1 ().RESTClient (), resource , v1 .NamespaceAll , fields .Everything ())
62- opts := metav1.ListOptions {}
63- return watcher .List (opts )
45+ StartWatcher (chan struct {})
6446}
6547
6648// NewController creates a new Controller.
@@ -84,103 +66,20 @@ func NewController(mds Store, updateCh chan *K8sResourceMessage) (*Controller, e
8466 // for example, nodes and namespaces must be synced before pods, since nodes/namespaces
8567 // contain pods.
8668 watchers := []watcher {
87- NewNodeWatcher ("nodes" , updateCh , clientset ),
88- NewNamespaceWatcher ("namespaces" , updateCh , clientset ),
89- NewPodWatcher ("pods" , updateCh , clientset ),
90- NewEndpointsWatcher ("endpoints" , updateCh , clientset ),
91- NewServiceWatcher ("services" , updateCh , clientset ),
69+ nodeWatcher ("nodes" , updateCh , clientset ),
70+ namespaceWatcher ("namespaces" , updateCh , clientset ),
71+ podWatcher ("pods" , updateCh , clientset ),
72+ endpointsWatcher ("endpoints" , updateCh , clientset ),
73+ serviceWatcher ("services" , updateCh , clientset ),
9274 }
9375
9476 mc := & Controller {quitCh : quitCh , updateCh : updateCh , watchers : watchers }
9577
96- go mc .Start (mds )
97-
98- return mc , nil
99- }
100-
101- // Start starts the k8s watcher. Every 12h, it will resync such that the updates from the
102- // last 24h will always contain updates from currently running resources.
103- func (mc * Controller ) Start (mds Store ) {
104- // Loop the sync/watch so that if it ever fails because of an intermittent error, it
105- // will continue trying to collect data.
106- for {
107- select {
108- case <- mc .quitCh :
109- return // Quit signaled, so exit the goroutine.
110- default :
111- err := mc .runSyncWatchLoop (mds )
112- if err == nil {
113- return // Quit signaled, so exit the goroutine.
114- }
115- log .WithError (err ).Info ("Failed K8s metadata sync/watch... Restarting." )
116- // Wait 5 minutes before retrying, however if stop is called, just return.
117- select {
118- case <- mc .quitCh :
119- return
120- case <- time .After (5 * time .Minute ):
121- continue
122- }
123- }
124- }
125- }
126-
127- func (mc * Controller ) runSyncWatchLoop (mds Store ) error {
128- // Run initial sync and watch.
129- watcherQuitCh := make (chan struct {})
130- var wg sync.WaitGroup
131-
132- err := mc .syncAndWatch (mds , watcherQuitCh , & wg )
133- if err != nil {
134- return err
135- }
136-
137- ticker := time .NewTicker (refreshUpdateDuration )
138-
139- defer func () {
140- close (watcherQuitCh )
141- ticker .Stop ()
142- }()
143-
144- for {
145- select {
146- case <- ticker .C :
147- close (watcherQuitCh ) // Stop previous watcher, resync to send updates for all currently running resources.
148- watcherQuitCh = make (chan struct {})
149- wg .Wait ()
150- err = mc .syncAndWatch (mds , watcherQuitCh , & wg )
151- if err != nil {
152- return err
153- }
154- case <- mc .quitCh :
155- return nil
156- }
157- }
158- }
159-
160- // Start syncs the state stored in the datastore with what is currently running in k8s.
161- func (mc * Controller ) syncAndWatch (mds Store , quitCh chan struct {}, wg * sync.WaitGroup ) error {
162- lastUpdate , err := mds .GetUpdateVersion (KelvinUpdateTopic )
163- if err != nil {
164- log .WithError (err ).Info ("Failed to get latest update version" )
165- return err
166- }
167- storedUpdates , err := mds .FetchFullResourceUpdates (0 , lastUpdate )
168- if err != nil {
169- log .WithError (err ).Info ("Failed to fetch resource updates" )
170- return err
171- }
172-
17378 for _ , w := range mc .watchers {
174- err := w .Sync (storedUpdates )
175- if err != nil {
176- log .WithError (err ).Info ("Failed to sync metadata" )
177- return err
178- }
179- wg .Add (1 )
180- go w .StartWatcher (quitCh , wg )
79+ go w .StartWatcher (quitCh )
18180 }
18281
183- return nil
82+ return mc , nil
18483}
18584
18685// Stop stops all K8s watchers.
0 commit comments