Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cmd/deployment-tracker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"time"

"github.com/github/deployment-tracker/internal/controller"
"k8s.io/client-go/metadata"
"github.com/github/deployment-tracker/internal/metadata"
k8smetadata "k8s.io/client-go/metadata"

"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -114,13 +115,16 @@ func main() {
}

// Create metadata client
metadataClient, err := metadata.NewForConfig(k8sCfg)
metadataClient, err := k8smetadata.NewForConfig(k8sCfg)
if err != nil {
slog.Error("Error creating Kubernetes metadata client",
"error", err)
os.Exit(1)
}

// Create metadata aggregator
metadataAggregator := metadata.NewAggregator(metadataClient)

// Start the metrics server
var promSrv = &http.Server{
Addr: ":" + metricsPort,
Expand Down Expand Up @@ -160,7 +164,7 @@ func main() {
cancel()
}()

cntrl, err := controller.New(clientset, metadataClient, namespace, excludeNamespaces, &cntrlCfg)
cntrl, err := controller.New(clientset, metadataAggregator, namespace, excludeNamespaces, &cntrlCfg)
if err != nil {
slog.Error("Failed to create controller",
"error", err)
Expand Down
210 changes: 18 additions & 192 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@ import (
"slices"
"strings"
"time"
"unicode/utf8"

"github.com/github/deployment-tracker/internal/metadata"
"github.com/github/deployment-tracker/pkg/deploymentrecord"
"github.com/github/deployment-tracker/pkg/dtmetrics"
"github.com/github/deployment-tracker/pkg/ociutil"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
amcache "k8s.io/apimachinery/pkg/util/cache"
"k8s.io/client-go/metadata"

corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -34,14 +31,6 @@ const (
EventCreated = "CREATED"
// EventDeleted indicates that a pod has been deleted.
EventDeleted = "DELETED"
// MetadataAnnotationPrefix is the annotation key prefix for deployment record metadata like runtime risk and tags.
MetadataAnnotationPrefix = "metadata.github.com/"
// RuntimeRisksAnnotationKey is the tag key for runtime risks. Comes after MetadataAnnotationPrefix.
RuntimeRisksAnnotationKey = "runtime-risks"
// MaxCustomTags is the maximum number of custom tags per deployment record.
MaxCustomTags = 5
// MaxCustomTagLength is the maximum length for a custom tag key or value.
MaxCustomTagLength = 100
)

type ttlCache interface {
Expand All @@ -50,35 +39,33 @@ type ttlCache interface {
Delete(k any)
}

type podMetadataAggregator interface {
BuildAggregatePodMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *metadata.AggregatePodMetadata
}

// PodEvent represents a pod event to be processed.
type PodEvent struct {
Key string
EventType string
DeletedPod *corev1.Pod // Only populated for delete events
}

// AggregatePodMetadata represents combined metadata for a pod and its ownership hierarchy.
type AggregatePodMetadata struct {
RuntimeRisks map[deploymentrecord.RuntimeRisk]bool
Tags map[string]string
}

// Controller is the Kubernetes controller for tracking deployments.
type Controller struct {
clientset kubernetes.Interface
metadataClient metadata.Interface
podInformer cache.SharedIndexInformer
workqueue workqueue.TypedRateLimitingInterface[PodEvent]
apiClient *deploymentrecord.Client
cfg *Config
clientset kubernetes.Interface
metadataAggregator podMetadataAggregator
podInformer cache.SharedIndexInformer
workqueue workqueue.TypedRateLimitingInterface[PodEvent]
apiClient *deploymentrecord.Client
cfg *Config
// best effort cache to avoid redundant posts
// post requests are idempotent, so if this cache fails due to
// restarts or other events, nothing will break.
observedDeployments ttlCache
}

// New creates a new deployment tracker controller.
func New(clientset kubernetes.Interface, metadataClient metadata.Interface, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) {
func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregator, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) {
// Create informer factory
factory := createInformerFactory(clientset, namespace, excludeNamespaces)

Expand Down Expand Up @@ -111,7 +98,7 @@ func New(clientset kubernetes.Interface, metadataClient metadata.Interface, name

cntrl := &Controller{
clientset: clientset,
metadataClient: metadataClient,
metadataAggregator: metadataAggregator,
podInformer: podInformer,
workqueue: queue,
apiClient: apiClient,
Expand Down Expand Up @@ -363,9 +350,9 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error {
var lastErr error

// Gather aggregate metadata for adds/updates
var aggPodMetadata *AggregatePodMetadata
var aggPodMetadata *metadata.AggregatePodMetadata
if status != deploymentrecord.StatusDecommissioned {
aggPodMetadata = c.aggregateMetadata(ctx, podToPartialMetadata(pod))
aggPodMetadata = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod))
}

// Record info for each container in the pod
Expand Down Expand Up @@ -405,7 +392,7 @@ func (c *Controller) deploymentExists(ctx context.Context, namespace, name strin
}

// recordContainer records a single container's deployment info.
func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string, aggPodMetadata *AggregatePodMetadata) error {
func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string, aggPodMetadata *metadata.AggregatePodMetadata) error {
var cacheKey string

dn := getARDeploymentName(pod, container, c.cfg.Template)
Expand Down Expand Up @@ -521,7 +508,7 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta
case deploymentrecord.StatusDecommissioned:
cacheKey = getCacheKey(EventDeleted, dn, digest)
c.observedDeployments.Set(cacheKey, true, 2*time.Minute)
// If there was a previous created event, remove that
// If there was a previous create event, remove that
cacheKey = getCacheKey(EventCreated, dn, digest)
c.observedDeployments.Delete(cacheKey)
default:
Expand All @@ -531,95 +518,6 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta
return nil
}

// aggregateMetadata returns aggregated metadata for a pod and its owners.
func (c *Controller) aggregateMetadata(ctx context.Context, obj *metav1.PartialObjectMetadata) *AggregatePodMetadata {
aggMetadata := &AggregatePodMetadata{
RuntimeRisks: make(map[deploymentrecord.RuntimeRisk]bool),
Tags: make(map[string]string),
}
queue := []*metav1.PartialObjectMetadata{obj}
visited := make(map[types.UID]bool)

for len(queue) > 0 {
current := queue[0]
queue = queue[1:]

if visited[current.GetUID()] {
slog.Warn("Already visited object, skipping to avoid cycles",
"UID", current.GetUID(),
"name", current.GetName(),
)
continue
}
visited[current.GetUID()] = true

extractMetadataFromObject(current, aggMetadata)
c.addOwnersToQueue(ctx, current, &queue)
}

return aggMetadata
}

// addOwnersToQueue takes a current object and looks up its owners, adding them to the queue for processing
// to collect their metadata.
func (c *Controller) addOwnersToQueue(ctx context.Context, current *metav1.PartialObjectMetadata, queue *[]*metav1.PartialObjectMetadata) {
ownerRefs := current.GetOwnerReferences()

for _, owner := range ownerRefs {
ownerObj, err := c.getOwnerMetadata(ctx, current.GetNamespace(), owner)
if err != nil {
slog.Warn("Failed to get owner object for metadata collection",
"namespace", current.GetNamespace(),
"owner_kind", owner.Kind,
"owner_name", owner.Name,
"error", err,
)
continue
}

if ownerObj == nil {
continue
}

*queue = append(*queue, ownerObj)
}
}

// getOwnerMetadata retrieves partial object metadata for an owner ref.
func (c *Controller) getOwnerMetadata(ctx context.Context, namespace string, owner metav1.OwnerReference) (*metav1.PartialObjectMetadata, error) {
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
}

switch owner.Kind {
case "ReplicaSet":
gvr.Resource = "replicasets"
case "Deployment":
gvr.Resource = "deployments"
default:
slog.Debug("Unsupported owner kind for runtime risk collection",
"kind", owner.Kind,
"name", owner.Name,
)
return nil, nil
}

obj, err := c.metadataClient.Resource(gvr).Namespace(namespace).Get(ctx, owner.Name, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
slog.Debug("Owner object not found for metadata collection",
"namespace", namespace,
"owner_kind", owner.Kind,
"owner_name", owner.Name,
)
return nil, nil
}
return nil, err
}
return obj, nil
}

func getCacheKey(ev, dn, digest string) string {
return ev + "||" + dn + "||" + digest
}
Expand Down Expand Up @@ -676,7 +574,7 @@ func createInformerFactory(clientset kubernetes.Interface, namespace string, exc

// getARDeploymentName converts the pod's metadata into the correct format
// for the deployment name for the artifact registry (this is not the same
// as the K8s deployment's name!
// as the K8s deployment's name!)
// The deployment name must unique within logical, physical environment and
// the cluster.
func getARDeploymentName(p *corev1.Pod, c corev1.Container, tmpl string) string {
Expand Down Expand Up @@ -728,78 +626,6 @@ func getDeploymentName(pod *corev1.Pod) string {
return ""
}

// extractMetadataFromObject extracts metadata from an object.
func extractMetadataFromObject(obj *metav1.PartialObjectMetadata, aggPodMetadata *AggregatePodMetadata) {
annotations := obj.GetAnnotations()

// Extract runtime risks
if risks, exists := annotations[MetadataAnnotationPrefix+RuntimeRisksAnnotationKey]; exists {
for _, risk := range strings.Split(risks, ",") {
r := deploymentrecord.ValidateRuntimeRisk(risk)
if r != "" {
aggPodMetadata.RuntimeRisks[r] = true
}
}
}

// Extract tags by sorted keys to ensure tags are deterministic
// if over the limit and some are dropped, the same ones will be dropped each time.
keys := make([]string, 0, len(annotations))
for key := range annotations {
keys = append(keys, key)
}
slices.Sort(keys)

for _, key := range keys {
if len(aggPodMetadata.Tags) >= MaxCustomTags {
break
}

if strings.HasPrefix(key, MetadataAnnotationPrefix) {
tagKey := strings.TrimPrefix(key, MetadataAnnotationPrefix)
tagValue := annotations[key]

if RuntimeRisksAnnotationKey == tagKey {
// ignore runtime risks for custom tags
continue
}
if utf8.RuneCountInString(tagKey) > MaxCustomTagLength || utf8.RuneCountInString(tagValue) > MaxCustomTagLength {
slog.Warn("Tag key or value exceeds max length, skipping",
"object_name", obj.GetName(),
"kind", obj.Kind,
"tag_key", tagKey,
"tag_value", tagValue,
"key_length", utf8.RuneCountInString(tagKey),
"value_length", utf8.RuneCountInString(tagValue),
"max_length", MaxCustomTagLength,
)
continue
}
if tagKey == "" {
slog.Warn("Tag key is empty, skipping",
"object_name", obj.GetName(),
"kind", obj.Kind,
"annotation", key,
"tag_key", tagKey,
"tag_value", tagValue,
)
continue
}
if _, exists := aggPodMetadata.Tags[tagKey]; exists {
slog.Debug("Duplicate tag key found, skipping",
"object_name", obj.GetName(),
"kind", obj.Kind,
"tag_key", tagKey,
"existing_value", aggPodMetadata.Tags[tagKey],
"new_value", tagValue,
)
continue
}
aggPodMetadata.Tags[tagKey] = tagValue
}
}
}

func podToPartialMetadata(pod *corev1.Pod) *metav1.PartialObjectMetadata {
return &metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
Expand Down
Loading