Skip to content

Commit

Permalink
wait for collect and publish function to finish and the start watchin…
Browse files Browse the repository at this point in the history
…g for events, this is done as our resources are very less
  • Loading branch information
vijeyash1 committed Sep 5, 2023
1 parent 322157a commit bd8a64c
Showing 1 changed file with 26 additions and 21 deletions.
47 changes: 26 additions & 21 deletions agent/kubviz/k8smetrics_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ func main() {
}
clientset = getK8sClient(config)
}

// starting the endless go routine to monitor the cluster
go publishMetrics(clientset, js, clusterMetricsChan)
controlChan := make(chan bool)
go publishMetrics(clientset, js, clusterMetricsChan, controlChan)

collectAndPublishMetrics := func() {
err := outDatedImages(config, js)
Expand All @@ -123,23 +122,29 @@ func main() {
LogErr(err)
}

controlChan <- true
collectAndPublishMetrics()
controlChan <- true
if schedulingIntervalStr == "" {
schedulingIntervalStr = "20m" // Default value, e.g., 20 minutes
schedulingIntervalStr = "20m"
}
schedulingInterval, err := time.ParseDuration(schedulingIntervalStr)
if err != nil {
log.Fatalf("Failed to parse SCHEDULING_INTERVAL: %v", err)
}
s := gocron.NewScheduler(time.UTC)
s.Every(schedulingInterval).Do(collectAndPublishMetrics) // Run immediately and then at the scheduled interval
s.StartBlocking() // Blocks the main function
s.Every(schedulingInterval).Do(func() {
controlChan <- true
collectAndPublishMetrics()
controlChan <- true
})
s.StartBlocking()
}

// publishMetrics publishes stream of events
// with subject "METRICS.created"
func publishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext, errCh chan error) {
watchK8sEvents(clientset, js)
func publishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext, errCh chan error, controlChan <-chan bool) {
watchK8sEvents(clientset, js, controlChan)
errCh <- nil
}

Expand Down Expand Up @@ -237,44 +242,44 @@ func LogErr(err error) {
log.Println(err)
}
}
func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) {
func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext, controlChan <-chan bool) {
watchlist := cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(),
"events",
v1.NamespaceAll,
fields.Everything(),
)
_, controller := cache.NewInformer( // also take a look at NewSharedIndexInformer
_, controller := cache.NewInformer(
watchlist,
&v1.Event{},
0, //Duration is int64
0, // Duration is int64
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
event := obj.(*v1.Event)
// fmt.Printf("Event namespace: %s \n", event.GetNamespace())
// y, err := yaml.Marshal(event)
// if err != nil {
// fmt.Printf("err: %v\n", err)
// }
// fmt.Printf("Add event: %s \n", y)
publishK8sMetrics(string(event.ObjectMeta.UID), "ADD", event, js)
},
DeleteFunc: func(obj interface{}) {
event := obj.(*v1.Event)
// fmt.Printf("Delete event: %s \n", obj)
publishK8sMetrics(string(event.ObjectMeta.UID), "DELETE", event, js)
},
UpdateFunc: func(oldObj, newObj interface{}) {
event := newObj.(*v1.Event)
// fmt.Printf("Change event \n")
publishK8sMetrics(string(event.ObjectMeta.UID), "UPDATE", event, js)
},
},
)
stop := make(chan struct{})
defer close(stop)
go controller.Run(stop)

for {
time.Sleep(time.Second)
select {
case <-controlChan:
close(stop)
<-controlChan
stop = make(chan struct{})
go controller.Run(stop)
default:
time.Sleep(time.Second)
}
}
}

0 comments on commit bd8a64c

Please sign in to comment.