Skip to content

Commit

Permalink
Merge pull request #205 from intelops/sync
Browse files Browse the repository at this point in the history
parallel to sync
  • Loading branch information
vijeyash1 authored Sep 5, 2023
2 parents bc7d6b1 + b1afc3d commit c28e031
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 164 deletions.
125 changes: 34 additions & 91 deletions agent/kubviz/k8smetrics_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/go-co-op/gocron"
Expand Down Expand Up @@ -57,19 +56,28 @@ var (
schedulingIntervalStr string = os.Getenv("SCHEDULING_INTERVAL")
)

func runTrivyScans(config *rest.Config, js nats.JetStreamContext, wg *sync.WaitGroup, trivyImagescanChan, trivySbomcanChan, trivyK8sMetricsChan chan error) {
RunTrivyK8sClusterScan(js, trivyK8sMetricsChan)
RunTrivyImageScans(config, js, trivyImagescanChan)
RunTrivySbomScan(config, js, trivySbomcanChan)
wg.Done()
func runTrivyScans(config *rest.Config, js nats.JetStreamContext) error {
err := RunTrivyK8sClusterScan(js)
if err != nil {
return err
}
err = RunTrivyImageScans(config, js)
if err != nil {
return err
}
err = RunTrivySbomScan(config, js)
if err != nil {
return err
}
return nil

}

func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
env := Production
clusterMetricsChan := make(chan error, 1)
var (
wg sync.WaitGroup
config *rest.Config
clientset *kubernetes.Clientset
)
Expand Down Expand Up @@ -100,90 +108,22 @@ func main() {
// starting the endless go routine to monitor the cluster
go publishMetrics(clientset, js, clusterMetricsChan)

// starting all the go routines
collectAndPublishMetrics := func() {
// error channels declared for the go routines
outdatedErrChan := make(chan error, 1)
kubePreUpgradeChan := make(chan error, 1)
getAllResourceChan := make(chan error, 1)
trivyK8sMetricsChan := make(chan error, 1)
kubescoreMetricsChan := make(chan error, 1)
trivyImagescanChan := make(chan error, 1)
trivySbomcanChan := make(chan error, 1)
RakeesErrChan := make(chan error, 1)
// Start a goroutine to handle errors
doneChan := make(chan bool)
go func() {
// for loop will wait for the error channels
// logs if any error occurs
for {
select {
case err := <-outdatedErrChan:
if err != nil {
log.Println(err)
}
case err := <-kubePreUpgradeChan:
if err != nil {
log.Println(err)
}
case err := <-getAllResourceChan:
if err != nil {
log.Println(err)
}
case err := <-clusterMetricsChan:
if err != nil {
log.Println(err)
}
case err := <-kubescoreMetricsChan:
if err != nil {
log.Println(err)
}
case err := <-trivyImagescanChan:
if err != nil {
log.Println(err)
}
case err := <-trivySbomcanChan:
if err != nil {
log.Println(err)
}
case err := <-trivyK8sMetricsChan:
if err != nil {
log.Println(err)
}
case err := <-RakeesErrChan:
if err != nil {
log.Println(err)
}
case <-doneChan:
return // All other goroutines have finished, so exit the goroutine
}
}
}()
wg.Add(7) // Initialize the WaitGroup for the seven goroutines
// ... start other goroutines ...
go outDatedImages(config, js, &wg, outdatedErrChan)
go KubePreUpgradeDetector(config, js, &wg, kubePreUpgradeChan)
go GetAllResources(config, js, &wg, getAllResourceChan)
go RakeesOutput(config, js, &wg, RakeesErrChan)
go getK8sEvents(clientset)
// Run these functions sequentially within a single goroutine using the wrapper function
go runTrivyScans(config, js, &wg, trivyImagescanChan, trivySbomcanChan, trivyK8sMetricsChan)
go RunKubeScore(clientset, js, &wg, kubescoreMetricsChan)
wg.Wait()
// once the go routines completes we will close the error channels
close(outdatedErrChan)
close(kubePreUpgradeChan)
close(getAllResourceChan)
// close(clusterMetricsChan)
close(kubescoreMetricsChan)
close(trivyImagescanChan)
close(trivySbomcanChan)
close(trivyK8sMetricsChan)
close(RakeesErrChan)
// Signal that all other goroutines have finished
doneChan <- true
close(doneChan)
err := outDatedImages(config, js)
LogErr(err)
err = KubePreUpgradeDetector(config, js)
LogErr(err)
err = GetAllResources(config, js)
LogErr(err)
err = RakeesOutput(config, js)
LogErr(err)
getK8sEvents(clientset)
err = runTrivyScans(config, js)
LogErr(err)
err = RunKubeScore(clientset, js)
LogErr(err)
}

collectAndPublishMetrics()
if schedulingIntervalStr == "" {
schedulingIntervalStr = "20m" // Default value, e.g., 20 minutes
Expand All @@ -201,7 +141,6 @@ func main() {
// with subject "METRICS.created"
func publishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext, errCh chan error) {
watchK8sEvents(clientset, js)

errCh <- nil
}

Expand Down Expand Up @@ -294,7 +233,11 @@ func checkErr(err error) {
log.Fatal(err)
}
}

func LogErr(err error) {
if err != nil {
log.Println(err)
}
}
func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) {
watchlist := cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(),
Expand Down
20 changes: 8 additions & 12 deletions agent/kubviz/ketall.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package main
import (
"context"
"encoding/json"
"github.com/intelops/kubviz/constants"
"sync"
"time"

"github.com/intelops/kubviz/constants"

"github.com/intelops/kubviz/model"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
Expand All @@ -28,28 +28,24 @@ func PublishAllResources(result model.Resource, js nats.JetStreamContext) error
return nil
}

func GetAllResources(config *rest.Config, js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) {
defer wg.Done()
func GetAllResources(config *rest.Config, js nats.JetStreamContext) error {
// TODO: upto this uncomment for production
// Create a new discovery client to discover all resources in the cluster
dc := discovery.NewDiscoveryClientForConfigOrDie(config)

// Create a new dynamic client to list resources in the cluster
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
log.Error(err)
errCh <- err
return err
}
// Get a list of all available API groups and versions in the cluster
resourceLists, err := dc.ServerPreferredResources()
if err != nil {
log.Error(err)
errCh <- err
return err
}
gvrs, err := discovery.GroupVersionResources(resourceLists)
if err != nil {
panic(err)
errCh <- err
return err
}
// Iterate over all available API groups and versions and list all resources in each group
for gvr := range gvrs {
Expand Down Expand Up @@ -81,9 +77,9 @@ func GetAllResources(config *rest.Config, js nats.JetStreamContext, wg *sync.Wai
}
err := PublishAllResources(resource, js)
if err != nil {
errCh <- err
return err
}
}
}
errCh <- nil
return nil
}
17 changes: 7 additions & 10 deletions agent/kubviz/kubePreUpgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/intelops/kubviz/constants"
"io"
"net/http"
"os"
"strings"
"sync"

"github.com/intelops/kubviz/constants"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -79,29 +79,26 @@ func publishK8sDepricated_Deleted_Api(result *model.Result, js nats.JetStreamCon
return nil
}

func KubePreUpgradeDetector(config *rest.Config, js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) {
defer wg.Done()
func KubePreUpgradeDetector(config *rest.Config, js nats.JetStreamContext) error {
swaggerdir, err := os.MkdirTemp("", "kubepug")
if err != nil {
errCh <- err
return err
}
filename := fmt.Sprintf("%s/swagger-%s.json", swaggerdir, k8sVersion)
url := fmt.Sprintf("%s/%s/%s", baseURL, k8sVersion, fileURL)
err = downloadFile(filename, url)
if err != nil {
errCh <- err
return err
}
defer os.RemoveAll(filename)
swaggerfile := filename
kubernetesAPIs, err := PopulateKubeAPIMap(swaggerfile)
if err != nil {
errCh <- err
return err
}
result = getResults(config, kubernetesAPIs)
err = publishK8sDepricated_Deleted_Api(result, js)
errCh <- err
// b, _ := json.MarshalIndent(result, "", " ")
// fmt.Printf("%s", string(b))
return err
}

func PopulateKubeAPIMap(swagfile string) (model.KubernetesAPIs, error) {
Expand Down
23 changes: 11 additions & 12 deletions agent/kubviz/kube_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,47 @@ package main
import (
"context"
"encoding/json"
"log"
exec "os/exec"

"github.com/google/uuid"
"github.com/intelops/kubviz/constants"
"github.com/intelops/kubviz/model"
"github.com/nats-io/nats.go"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"log"
exec "os/exec"
"sync"
)

func RunKubeScore(clientset *kubernetes.Clientset, js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) {
defer wg.Done()

func RunKubeScore(clientset *kubernetes.Clientset, js nats.JetStreamContext) error {
nsList, err := clientset.CoreV1().
Namespaces().
List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Println("Error occurred while getting client set for kube-score: ", err)
return
return err
}

log.Printf("Namespace size: %d", len(nsList.Items))
for _, n := range nsList.Items {
log.Printf("Publishing kube-score recommendations for namespace: %s\n", n.Name)
publish(n.Name, js, errCh)
publish(n.Name, js)
}
return nil
}

func publish(ns string, js nats.JetStreamContext, errCh chan error) {
func publish(ns string, js nats.JetStreamContext) error {
cmd := "kubectl api-resources --verbs=list --namespaced -o name | xargs -n1 -I{} sh -c \"kubectl get {} -n " + ns + " -oyaml && echo ---\" | kube-score score - "
log.Printf("Command: %#v,", cmd)
out, err := executeCommand(cmd)
if err != nil {
log.Println("Error occurred while running kube-score: ", err)
errCh <- err
return err
}
err = publishKubescoreMetrics(uuid.New().String(), ns, out, js)
if err != nil {
errCh <- err
return err
}
errCh <- nil
return nil
}

func publishKubescoreMetrics(id string, ns string, recommendations string, js nats.JetStreamContext) error {
Expand Down
12 changes: 6 additions & 6 deletions agent/kubviz/outdated.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,11 @@ func PublishOutdatedImages(out model.CheckResultfinal, js nats.JetStreamContext)
return nil
}

func outDatedImages(config *rest.Config, js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) {
defer wg.Done()
func outDatedImages(config *rest.Config, js nats.JetStreamContext) error {
images, err := ListImages(config)
if err != nil {
log.Println("unable to list images")
errCh <- err
return err
}
for _, image := range images {
namespace := image.Namespace
Expand All @@ -92,7 +91,7 @@ func outDatedImages(config *rest.Config, js nats.JetStreamContext, wg *sync.Wait
final.Pod = pod
err := PublishOutdatedImages(final, js)
if err != nil {
errCh <- err
return err
}
} else {
if checkResult != nil {
Expand All @@ -108,7 +107,7 @@ func outDatedImages(config *rest.Config, js nats.JetStreamContext, wg *sync.Wait
final.Pod = pod
err := PublishOutdatedImages(final, js)
if err != nil {
errCh <- err
return err
}
} else {
tagtrunk := truncateTagName(tag)
Expand All @@ -125,12 +124,13 @@ func outDatedImages(config *rest.Config, js nats.JetStreamContext, wg *sync.Wait
final.Pod = pod
err := PublishOutdatedImages(final, js)
if err != nil {
errCh <- err
return err
}
}
}
}
}
return nil
}

func ParseImageName(imageName string) (string, string, string, error) {
Expand Down
Loading

0 comments on commit c28e031

Please sign in to comment.