diff --git a/agent/kubviz/k8smetrics_agent.go b/agent/kubviz/k8smetrics_agent.go index 7f28d0a3..3a6f8479 100644 --- a/agent/kubviz/k8smetrics_agent.go +++ b/agent/kubviz/k8smetrics_agent.go @@ -6,7 +6,6 @@ import ( "os" "strconv" "strings" - "sync" "time" "github.com/go-co-op/gocron" @@ -57,11 +56,21 @@ var ( schedulingIntervalStr string = os.Getenv("SCHEDULING_INTERVAL") ) -func runTrivyScans(config *rest.Config, js nats.JetStreamContext, wg *sync.WaitGroup, trivyImagescanChan, trivySbomcanChan, trivyK8sMetricsChan chan error) { - RunTrivyK8sClusterScan(js, trivyK8sMetricsChan) - RunTrivyImageScans(config, js, trivyImagescanChan) - RunTrivySbomScan(config, js, trivySbomcanChan) - wg.Done() +func runTrivyScans(config *rest.Config, js nats.JetStreamContext) error { + err := RunTrivyK8sClusterScan(js) + if err != nil { + return err + } + err = RunTrivyImageScans(config, js) + if err != nil { + return err + } + err = RunTrivySbomScan(config, js) + if err != nil { + return err + } + return nil + } func main() { @@ -69,7 +78,6 @@ func main() { env := Production clusterMetricsChan := make(chan error, 1) var ( - wg sync.WaitGroup config *rest.Config clientset *kubernetes.Clientset ) @@ -100,90 +108,22 @@ func main() { // starting the endless go routine to monitor the cluster go publishMetrics(clientset, js, clusterMetricsChan) - // starting all the go routines collectAndPublishMetrics := func() { - // error channels declared for the go routines - outdatedErrChan := make(chan error, 1) - kubePreUpgradeChan := make(chan error, 1) - getAllResourceChan := make(chan error, 1) - trivyK8sMetricsChan := make(chan error, 1) - kubescoreMetricsChan := make(chan error, 1) - trivyImagescanChan := make(chan error, 1) - trivySbomcanChan := make(chan error, 1) - RakeesErrChan := make(chan error, 1) - // Start a goroutine to handle errors - doneChan := make(chan bool) - go func() { - // for loop will wait for the error channels - // logs if any error occurs - for { - select { - case err := <-outdatedErrChan: - if err != nil { - log.Println(err) - } - case err := <-kubePreUpgradeChan: - if err != nil { - log.Println(err) - } - case err := <-getAllResourceChan: - if err != nil { - log.Println(err) - } - case err := <-clusterMetricsChan: - if err != nil { - log.Println(err) - } - case err := <-kubescoreMetricsChan: - if err != nil { - log.Println(err) - } - case err := <-trivyImagescanChan: - if err != nil { - log.Println(err) - } - case err := <-trivySbomcanChan: - if err != nil { - log.Println(err) - } - case err := <-trivyK8sMetricsChan: - if err != nil { - log.Println(err) - } - case err := <-RakeesErrChan: - if err != nil { - log.Println(err) - } - case <-doneChan: - return // All other goroutines have finished, so exit the goroutine - } - } - }() - wg.Add(7) // Initialize the WaitGroup for the seven goroutines - // ... start other goroutines ... - go outDatedImages(config, js, &wg, outdatedErrChan) - go KubePreUpgradeDetector(config, js, &wg, kubePreUpgradeChan) - go GetAllResources(config, js, &wg, getAllResourceChan) - go RakeesOutput(config, js, &wg, RakeesErrChan) - go getK8sEvents(clientset) - // Run these functions sequentially within a single goroutine using the wrapper function - go runTrivyScans(config, js, &wg, trivyImagescanChan, trivySbomcanChan, trivyK8sMetricsChan) - go RunKubeScore(clientset, js, &wg, kubescoreMetricsChan) - wg.Wait() - // once the go routines completes we will close the error channels - close(outdatedErrChan) - close(kubePreUpgradeChan) - close(getAllResourceChan) - // close(clusterMetricsChan) - close(kubescoreMetricsChan) - close(trivyImagescanChan) - close(trivySbomcanChan) - close(trivyK8sMetricsChan) - close(RakeesErrChan) - // Signal that all other goroutines have finished - doneChan <- true - close(doneChan) + err := outDatedImages(config, js) + LogErr(err) + err = KubePreUpgradeDetector(config, js) + LogErr(err) + err = GetAllResources(config, js) + LogErr(err) + err = RakeesOutput(config, js) + LogErr(err) + getK8sEvents(clientset) + err = runTrivyScans(config, js) + LogErr(err) + err = RunKubeScore(clientset, js) + LogErr(err) } + collectAndPublishMetrics() if schedulingIntervalStr == "" { schedulingIntervalStr = "20m" // Default value, e.g., 20 minutes @@ -201,7 +141,6 @@ func main() { // with subject "METRICS.created" func publishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext, errCh chan error) { watchK8sEvents(clientset, js) - errCh <- nil } @@ -294,7 +233,11 @@ func checkErr(err error) { log.Fatal(err) } } - +func LogErr(err error) { + if err != nil { + log.Println(err) + } +} func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) { watchlist := cache.NewListWatchFromClient( clientset.CoreV1().RESTClient(), diff --git a/agent/kubviz/ketall.go b/agent/kubviz/ketall.go index f1423564..ce6d125f 100644 --- a/agent/kubviz/ketall.go +++ b/agent/kubviz/ketall.go @@ -3,10 +3,10 @@ package main import ( "context" "encoding/json" - "github.com/intelops/kubviz/constants" - "sync" "time" + "github.com/intelops/kubviz/constants" + "github.com/intelops/kubviz/model" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" @@ -28,8 +28,7 @@ func PublishAllResources(result model.Resource, js nats.JetStreamContext) error return nil } -func GetAllResources(config *rest.Config, js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) { - defer wg.Done() +func GetAllResources(config *rest.Config, js nats.JetStreamContext) error { // TODO: upto this uncomment for production // Create a new discovery client to discover all resources in the cluster dc := discovery.NewDiscoveryClientForConfigOrDie(config) @@ -37,19 +36,16 @@ func GetAllResources(config *rest.Config, js nats.JetStreamContext, wg *sync.Wai // Create a new dynamic client to list resources in the cluster dynamicClient, err := dynamic.NewForConfig(config) if err != nil { - log.Error(err) - errCh <- err + return err } // Get a list of all available API groups and versions in the cluster resourceLists, err := dc.ServerPreferredResources() if err != nil { - log.Error(err) - errCh <- err + return err } gvrs, err := discovery.GroupVersionResources(resourceLists) if err != nil { - panic(err) - errCh <- err + return err } // Iterate over all available API groups and versions and list all resources in each group for gvr := range gvrs { @@ -81,9 +77,9 @@ func GetAllResources(config *rest.Config, js nats.JetStreamContext, wg *sync.Wai } err := PublishAllResources(resource, js) if err != nil { - errCh <- err + return err } } } - errCh <- nil + return nil } diff --git a/agent/kubviz/kubePreUpgrade.go b/agent/kubviz/kubePreUpgrade.go index 9c993a59..2c49997e 100644 --- a/agent/kubviz/kubePreUpgrade.go +++ b/agent/kubviz/kubePreUpgrade.go @@ -4,12 +4,12 @@ import ( "context" "encoding/json" "fmt" - "github.com/intelops/kubviz/constants" "io" "net/http" "os" "strings" - "sync" + + "github.com/intelops/kubviz/constants" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -79,29 +79,26 @@ func publishK8sDepricated_Deleted_Api(result *model.Result, js nats.JetStreamCon return nil } -func KubePreUpgradeDetector(config *rest.Config, js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) { - defer wg.Done() +func KubePreUpgradeDetector(config *rest.Config, js nats.JetStreamContext) error { swaggerdir, err := os.MkdirTemp("", "kubepug") if err != nil { - errCh <- err + return err } filename := fmt.Sprintf("%s/swagger-%s.json", swaggerdir, k8sVersion) url := fmt.Sprintf("%s/%s/%s", baseURL, k8sVersion, fileURL) err = downloadFile(filename, url) if err != nil { - errCh <- err + return err } defer os.RemoveAll(filename) swaggerfile := filename kubernetesAPIs, err := PopulateKubeAPIMap(swaggerfile) if err != nil { - errCh <- err + return err } result = getResults(config, kubernetesAPIs) err = publishK8sDepricated_Deleted_Api(result, js) - errCh <- err - // b, _ := json.MarshalIndent(result, "", " ") - // fmt.Printf("%s", string(b)) + return err } func PopulateKubeAPIMap(swagfile string) (model.KubernetesAPIs, error) { diff --git a/agent/kubviz/kube_score.go b/agent/kubviz/kube_score.go index f7a2b838..437a0850 100644 --- a/agent/kubviz/kube_score.go +++ b/agent/kubviz/kube_score.go @@ -3,48 +3,47 @@ package main import ( "context" "encoding/json" + "log" + exec "os/exec" + "github.com/google/uuid" "github.com/intelops/kubviz/constants" "github.com/intelops/kubviz/model" "github.com/nats-io/nats.go" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "log" - exec "os/exec" - "sync" ) -func RunKubeScore(clientset *kubernetes.Clientset, js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) { - defer wg.Done() - +func RunKubeScore(clientset *kubernetes.Clientset, js nats.JetStreamContext) error { nsList, err := clientset.CoreV1(). Namespaces(). List(context.Background(), metav1.ListOptions{}) if err != nil { log.Println("Error occurred while getting client set for kube-score: ", err) - return + return err } log.Printf("Namespace size: %d", len(nsList.Items)) for _, n := range nsList.Items { log.Printf("Publishing kube-score recommendations for namespace: %s\n", n.Name) - publish(n.Name, js, errCh) + publish(n.Name, js) } + return nil } -func publish(ns string, js nats.JetStreamContext, errCh chan error) { +func publish(ns string, js nats.JetStreamContext) error { cmd := "kubectl api-resources --verbs=list --namespaced -o name | xargs -n1 -I{} sh -c \"kubectl get {} -n " + ns + " -oyaml && echo ---\" | kube-score score - " log.Printf("Command: %#v,", cmd) out, err := executeCommand(cmd) if err != nil { log.Println("Error occurred while running kube-score: ", err) - errCh <- err + return err } err = publishKubescoreMetrics(uuid.New().String(), ns, out, js) if err != nil { - errCh <- err + return err } - errCh <- nil + return nil } func publishKubescoreMetrics(id string, ns string, recommendations string, js nats.JetStreamContext) error { diff --git a/agent/kubviz/outdated.go b/agent/kubviz/outdated.go index 68ec985e..4470abe8 100644 --- a/agent/kubviz/outdated.go +++ b/agent/kubviz/outdated.go @@ -66,12 +66,11 @@ func PublishOutdatedImages(out model.CheckResultfinal, js nats.JetStreamContext) return nil } -func outDatedImages(config *rest.Config, js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) { - defer wg.Done() +func outDatedImages(config *rest.Config, js nats.JetStreamContext) error { images, err := ListImages(config) if err != nil { log.Println("unable to list images") - errCh <- err + return err } for _, image := range images { namespace := image.Namespace @@ -92,7 +91,7 @@ func outDatedImages(config *rest.Config, js nats.JetStreamContext, wg *sync.Wait final.Pod = pod err := PublishOutdatedImages(final, js) if err != nil { - errCh <- err + return err } } else { if checkResult != nil { @@ -108,7 +107,7 @@ func outDatedImages(config *rest.Config, js nats.JetStreamContext, wg *sync.Wait final.Pod = pod err := PublishOutdatedImages(final, js) if err != nil { - errCh <- err + return err } } else { tagtrunk := truncateTagName(tag) @@ -125,12 +124,13 @@ func outDatedImages(config *rest.Config, js nats.JetStreamContext, wg *sync.Wait final.Pod = pod err := PublishOutdatedImages(final, js) if err != nil { - errCh <- err + return err } } } } } + return nil } func ParseImageName(imageName string) (string, string, string, error) { diff --git a/agent/kubviz/rakees_agent.go b/agent/kubviz/rakees_agent.go index 54e610fb..0b8426c1 100644 --- a/agent/kubviz/rakees_agent.go +++ b/agent/kubviz/rakees_agent.go @@ -4,13 +4,13 @@ import ( "context" "encoding/json" "fmt" - "github.com/intelops/kubviz/constants" "log" "os" "os/signal" - "sync" "syscall" + "github.com/intelops/kubviz/constants" + "github.com/intelops/kubviz/agent/kubviz/rakkess" "github.com/intelops/kubviz/model" "github.com/nats-io/nats.go" @@ -33,17 +33,17 @@ func accessToOutcome(access rakkess.Access) (rakkess.Outcome, error) { } } -func RakeesOutput(config *rest.Config, js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) { +func RakeesOutput(config *rest.Config, js nats.JetStreamContext) error { // Create a new Kubernetes client client, err := kubernetes.NewForConfig(config) if err != nil { - errCh <- err + return err } // Retrieve all available resource types resourceList, err := client.Discovery().ServerPreferredResources() if err != nil { - errCh <- err + return err } var opts = rakkess.NewRakkessOptions() opts.Verbs = []string{"list", "create", "update", "delete"} @@ -56,25 +56,25 @@ func RakeesOutput(config *rest.Config, js nats.JetStreamContext, wg *sync.WaitGr res, err := rakkess.Resource(ctx, opts) if err != nil { fmt.Println("Error") - errCh <- err + return err } fmt.Println("Result..") for resourceType, access := range res { createOutcome, err := accessToOutcome(access["create"]) if err != nil { - errCh <- err + return err } deleteOutcome, err := accessToOutcome(access["delete"]) if err != nil { - errCh <- err + return err } listOutcome, err := accessToOutcome(access["list"]) if err != nil { - errCh <- err + return err } updateOutcome, err := accessToOutcome(access["update"]) if err != nil { - errCh <- err + return err } metrics := model.RakeesMetrics{ ClusterName: ClusterName, @@ -87,12 +87,11 @@ func RakeesOutput(config *rest.Config, js nats.JetStreamContext, wg *sync.WaitGr metricsJson, _ := json.Marshal(metrics) _, err = js.Publish(constants.EventSubject_rakees, metricsJson) if err != nil { - errCh <- err + return err } log.Printf("Metrics with resource %s has been published", resourceType) } - // t := res.Table(opts.Verbs) - // t.Render(opts.Streams.Out, opts.OutputFormat) + return nil } diff --git a/agent/kubviz/trivy.go b/agent/kubviz/trivy.go index 5951e54f..654cf481 100644 --- a/agent/kubviz/trivy.go +++ b/agent/kubviz/trivy.go @@ -12,15 +12,14 @@ import ( "github.com/nats-io/nats.go" ) -func RunTrivyK8sClusterScan(js nats.JetStreamContext, errCh chan error) { +func RunTrivyK8sClusterScan(js nats.JetStreamContext) error { var report report.ConsolidatedReport out, err := executeCommand("trivy k8s --report summary cluster --timeout 60m -f json -q --cache-dir /tmp/.cache") log.Println("Commnd for k8s cluster scan: trivy k8s --report summary cluster --timeout 60m -f json -q --cache-dir /tmp/.cache") parts := strings.SplitN(out, "{", 2) if len(parts) <= 1 { log.Println("No output from k8s cluster scan command", err) - errCh <- err - return + return err } log.Println("Command logs for k8s cluster scan", parts[0]) jsonPart := "{" + parts[1] @@ -29,12 +28,16 @@ func RunTrivyK8sClusterScan(js nats.JetStreamContext, errCh chan error) { err = json.Unmarshal([]byte(jsonPart), &report) if err != nil { log.Printf("Error occurred while Unmarshalling json for k8s cluster scan: %v", err) - errCh <- err + return err } - publishTrivyK8sReport(report, js, errCh) + err = publishTrivyK8sReport(report, js) + if err != nil { + return err + } + return nil } -func publishTrivyK8sReport(report report.ConsolidatedReport, js nats.JetStreamContext, errCh chan error) { +func publishTrivyK8sReport(report report.ConsolidatedReport, js nats.JetStreamContext) error { metrics := model.Trivy{ ID: uuid.New().String(), ClusterName: ClusterName, @@ -43,8 +46,8 @@ func publishTrivyK8sReport(report report.ConsolidatedReport, js nats.JetStreamCo metricsJson, _ := json.Marshal(metrics) _, err := js.Publish(constants.TRIVY_K8S_SUBJECT, metricsJson) if err != nil { - errCh <- err + return err } log.Printf("Trivy k8s cluster report with ID:%s has been published\n", metrics.ID) - errCh <- nil + return nil } diff --git a/agent/kubviz/trivy_image.go b/agent/kubviz/trivy_image.go index 6e2af2fe..4eaac417 100644 --- a/agent/kubviz/trivy_image.go +++ b/agent/kubviz/trivy_image.go @@ -13,7 +13,7 @@ import ( "k8s.io/client-go/rest" ) -func RunTrivyImageScans(config *rest.Config, js nats.JetStreamContext, errCh chan error) { +func RunTrivyImageScans(config *rest.Config, js nats.JetStreamContext) error { images, err := ListImages(config) if err != nil { log.Fatal(err) @@ -43,13 +43,15 @@ func RunTrivyImageScans(config *rest.Config, js nats.JetStreamContext, errCh cha log.Printf("Error occurred while Unmarshalling json for image: %v", err) continue // Move on to the next image in case of an error } - publishImageScanReports(report, js, errCh) - // If you want to publish the report or perform any other action with it, you can do it here - + err = publishImageScanReports(report, js) + if err != nil { + return err + } } + return nil } -func publishImageScanReports(report types.Report, js nats.JetStreamContext, errCh chan error) { +func publishImageScanReports(report types.Report, js nats.JetStreamContext) error { metrics := model.TrivyImage{ ID: uuid.New().String(), ClusterName: ClusterName, @@ -58,8 +60,8 @@ func publishImageScanReports(report types.Report, js nats.JetStreamContext, errC metricsJson, _ := json.Marshal(metrics) _, err := js.Publish(constants.TRIVY_IMAGE_SUBJECT, metricsJson) if err != nil { - errCh <- err + return err } log.Printf("Trivy image report with ID:%s has been published\n", metrics.ID) - errCh <- nil + return nil } diff --git a/agent/kubviz/trivy_sbom.go b/agent/kubviz/trivy_sbom.go index 34ca7712..d33d2709 100644 --- a/agent/kubviz/trivy_sbom.go +++ b/agent/kubviz/trivy_sbom.go @@ -14,7 +14,7 @@ import ( "k8s.io/client-go/rest" ) -func publishTrivySbomReport(report model.Sbom, js nats.JetStreamContext, errCh chan error) { +func publishTrivySbomReport(report model.Sbom, js nats.JetStreamContext) error { metrics := model.Reports{ ID: uuid.New().String(), Report: report, @@ -22,11 +22,11 @@ func publishTrivySbomReport(report model.Sbom, js nats.JetStreamContext, errCh c metricsJson, _ := json.Marshal(metrics) _, err := js.Publish(constants.TRIVY_SBOM_SUBJECT, metricsJson) if err != nil { - errCh <- err + return err } log.Printf("Trivy report with BomFormat:%v has been published\n", metrics.Report.BomFormat) - errCh <- nil + return nil } func executeCommandSbom(command string) ([]byte, error) { @@ -44,7 +44,7 @@ func executeCommandSbom(command string) ([]byte, error) { return outc.Bytes(), err } -func RunTrivySbomScan(config *rest.Config, js nats.JetStreamContext, errCh chan error) { +func RunTrivySbomScan(config *rest.Config, js nats.JetStreamContext) error { log.Println("trivy sbom run started") images, err := ListImages(config) @@ -77,6 +77,7 @@ func RunTrivySbomScan(config *rest.Config, js nats.JetStreamContext, errCh chan log.Println("report", report) // Publish the report using the given function - publishTrivySbomReport(report, js, errCh) + publishTrivySbomReport(report, js) } + return nil }