Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a metadata file to KubeHound dumps #247

Merged
merged 16 commits into from
Sep 10, 2024
4 changes: 2 additions & 2 deletions pkg/cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ func InitRemoteIngestCmd(cmd *cobra.Command, standalone bool) {

func InitLocalIngestCmd(cmd *cobra.Command) {
cmd.PersistentFlags().String("cluster", "", "Cluster name to ingest (e.g.: my-cluster-1)")
viper.BindPFlag(config.IngestorClusterName, cmd.Flags().Lookup("cluster")) //nolint: errcheck
cmd.MarkFlagRequired("cluster") //nolint: errcheck
viper.BindPFlag(config.IngestorClusterName, cmd.PersistentFlags().Lookup("cluster")) //nolint: errcheck
cmd.MarkFlagRequired("cluster") //nolint: errcheck
}
27 changes: 23 additions & 4 deletions pkg/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package collector
import (
"context"
"fmt"
"time"

"github.com/DataDog/KubeHound/pkg/config"
"github.com/DataDog/KubeHound/pkg/globals/types"
Expand Down Expand Up @@ -77,15 +78,20 @@ type EndpointIngestor interface {
Complete(context.Context) error
}

// MetadataIngestor defines the interface to allow an ingestor to computed metrics and metadata from a collector.
type MetadataIngestor interface {
DumpMetadata(context.Context, Metadata) error
}

//go:generate mockery --name CollectorClient --output mockcollector --case underscore --filename collector_client.go --with-expecter
type CollectorClient interface { //nolint: interfacebloat
services.Dependency

// ClusterInfo returns the target cluster information for the current run.
ClusterInfo(ctx context.Context) (*config.ClusterInfo, error)

// Tags return the tags for the current run.
Tags(ctx context.Context) []string
// Compute the metrics and gather all the metadata and dump it through the ingestor.DumpMetadata
ComputeMetadata(ctx context.Context, ingestor MetadataIngestor) error

// StreamNodes will iterate through all NodeType objects collected by the collector and invoke the ingestor.IngestNode method on each.
// Once all the NodeType objects have been exhausted the ingestor.Complete method will be invoked to signal the end of the stream.
Expand Down Expand Up @@ -142,8 +148,8 @@ type collectorTags struct {
baseTags []string
}

func newCollectorTags() *collectorTags {
return &collectorTags{
func newCollectorTags() collectorTags {
return collectorTags{
pod: tag.GetBaseTagsWith(tag.Collector(FileCollectorName), tag.Entity(tag.EntityPods)),
role: tag.GetBaseTagsWith(tag.Collector(FileCollectorName), tag.Entity(tag.EntityRoles)),
rolebinding: tag.GetBaseTagsWith(tag.Collector(FileCollectorName), tag.Entity(tag.EntityRolebindings)),
Expand All @@ -154,3 +160,16 @@ func newCollectorTags() *collectorTags {
baseTags: tag.GetBaseTags(),
}
}

type Metrics struct {
DumpTime time.Time `json:"dump_time"`
RunDuration time.Duration `json:"run_duration"`
TotalWaitTime time.Duration `json:"total_wait_time"`
ThrottlingPercentage float64 `json:"throttling_percentage"`
}

type Metadata struct {
RunID string `json:"run_id"`
ClusterName string `json:"cluster"`
Metrics Metrics `json:"metrics"`
}
7 changes: 4 additions & 3 deletions pkg/collector/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
PodPath = "pods.json"
RolesPath = "roles.rbac.authorization.k8s.io.json"
RoleBindingsPath = "rolebindings.rbac.authorization.k8s.io.json"
MetadataPath = "metadata.json"
)

const (
Expand All @@ -55,7 +56,7 @@ const (
type FileCollector struct {
cfg *config.FileCollectorConfig
log *log.KubehoundLogger
tags *collectorTags
tags collectorTags
}

// NewFileCollector creates a new instance of the file collector from the provided application config.
Expand All @@ -78,8 +79,8 @@ func NewFileCollector(ctx context.Context, cfg *config.KubehoundConfig) (Collect
}, nil
}

// TODO: remove this after all PR
func (c *FileCollector) Tags(ctx context.Context) []string {
// This function has no meaning in the file collector as it should already have all the metadata gathered in the dumped files.
func (c *FileCollector) ComputeMetadata(ctx context.Context, ingestor MetadataIngestor) error {
return nil
}

Expand Down
57 changes: 39 additions & 18 deletions pkg/collector/k8s_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type k8sAPICollector struct {
startTime time.Time
mu *sync.Mutex
isStreaming bool
clusterName string
runID string
}

const (
Expand Down Expand Up @@ -114,19 +116,36 @@ func NewK8sAPICollector(ctx context.Context, cfg *config.KubehoundConfig) (Colle
}

return &k8sAPICollector{
cfg: cfg.Collector.Live,
clientset: clientset,
log: l,
rl: ratelimit.New(cfg.Collector.Live.RateLimitPerSecond), // per second
tags: *newCollectorTags(),
waitTime: map[string]time.Duration{},
startTime: time.Now(),
mu: &sync.Mutex{},
cfg: cfg.Collector.Live,
clientset: clientset,
log: l,
rl: ratelimit.New(cfg.Collector.Live.RateLimitPerSecond), // per second
tags: newCollectorTags(),
waitTime: map[string]time.Duration{},
startTime: time.Now(),
mu: &sync.Mutex{},
clusterName: clusterName,
runID: cfg.Dynamic.RunID.String(),
}, nil
}

// TODO: remove this after all PR
func (c *k8sAPICollector) Tags(ctx context.Context) []string {
func (c *k8sAPICollector) ComputeMetadata(ctx context.Context, ingestor MetadataIngestor) error {
metrics, err := c.computeMetrics(ctx)
if err != nil {
return fmt.Errorf("error computing metrics: %w", err)
}

metadata := Metadata{
ClusterName: c.clusterName,
RunID: c.runID,
Metrics: metrics,
}

err = ingestor.DumpMetadata(ctx, metadata)
if err != nil {
return fmt.Errorf("ingesting metadata: %w", err)
}

return nil
}

Expand Down Expand Up @@ -186,7 +205,7 @@ func (c *k8sAPICollector) ClusterInfo(ctx context.Context) (*config.ClusterInfo,
}

// Generate metrics for k8sAPI collector
func (c *k8sAPICollector) computeMetrics(_ context.Context) error {
func (c *k8sAPICollector) computeMetrics(_ context.Context) (Metrics, error) {
var errMetric error
var runTotalWaitTime time.Duration
for _, wait := range c.waitTime {
Expand All @@ -213,16 +232,18 @@ func (c *k8sAPICollector) computeMetrics(_ context.Context) error {
}
c.log.Infof("Stats for the run time duration: %s / wait: %s / throttling: %f%%", runDuration, runTotalWaitTime, 100*runThrottlingPercentage) //nolint:gomnd

return errMetric
// SaveMetadata
metadata := Metrics{
DumpTime: time.Now(),
RunDuration: runDuration,
TotalWaitTime: runTotalWaitTime,
ThrottlingPercentage: runThrottlingPercentage,
}

return metadata, errMetric
}

func (c *k8sAPICollector) Close(ctx context.Context) error {
err := c.computeMetrics(ctx)
if err != nil {
// We don't want to return an error here as it is just metrics and won't affect the collection of data
c.log.Errorf("Error computing metrics: %s", err)
}

return nil
}

Expand Down
87 changes: 43 additions & 44 deletions pkg/collector/mockcollector/collector_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion pkg/dump/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package dump

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"

"github.com/DataDog/KubeHound/pkg/collector"
"github.com/DataDog/KubeHound/pkg/config"
Expand Down Expand Up @@ -50,6 +53,22 @@ func getClusterName(ctx context.Context, collector collector.CollectorClient) (s
return cluster.Name, nil
}

func (d *DumpIngestor) Metadata() (collector.Metadata, error) {
path := filepath.Join(d.writer.OutputPath(), collector.MetadataPath)
data, err := os.ReadFile(path)
if err != nil {
return collector.Metadata{}, err
}

md := collector.Metadata{}
err = json.Unmarshal(data, &md)
if err != nil {
return collector.Metadata{}, err
}

return md, nil
}

func (d *DumpIngestor) OutputPath() string {
return d.writer.OutputPath()
}
Expand All @@ -71,7 +90,7 @@ func (d *DumpIngestor) DumpK8sObjects(ctx context.Context) error {
return fmt.Errorf("run pipeline ingestor: %w", err)
}

return pipeline.Wait(ctx)
return pipeline.WaitAndClose(ctx)
}

// Close() is invoked by the collector to close all handlers used to dump k8s objects.
Expand Down
28 changes: 28 additions & 0 deletions pkg/dump/pipeline/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package pipeline

import (
"context"

"github.com/DataDog/KubeHound/pkg/collector"
"github.com/DataDog/KubeHound/pkg/dump/writer"
)

type MetadataIngestor struct {
buffer map[string]collector.Metadata
writer writer.DumperWriter
}

func NewMetadataIngestor(ctx context.Context, dumpWriter writer.DumperWriter) *MetadataIngestor {
return &MetadataIngestor{
buffer: make(map[string]collector.Metadata),
writer: dumpWriter,
}
}

func (d *MetadataIngestor) DumpMetadata(ctx context.Context, metadata collector.Metadata) error {
data := make(map[string]*collector.Metadata)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it expected that this is a pointer to the collector metadata ? 😅

The buffer above is make(map[string]collector.Metadata),

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it is not 😅

data[collector.MetadataPath] = &metadata

return dumpObj[*collector.Metadata](ctx, data, d.writer)

}
Loading
Loading