Skip to content

Commit

Permalink
fix: delete scan jobs if their pods have already been deleted and we …
Browse files Browse the repository at this point in the history
…cannot collect logs (aquasecurity#956)

Resolves: aquasecurity#873

Signed-off-by: Joel Whittaker-Smith <jdws.dev@gmail.com>
  • Loading branch information
jw-s authored Feb 17, 2022
1 parent 7dc61da commit 5c79162
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 4 deletions.
5 changes: 2 additions & 3 deletions itest/starboard-operator/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ var _ = BeforeSuite(func() {
AssertTimeout: 3 * time.Minute,
PrimaryNamespace: corev1.NamespaceDefault,
PrimaryWorkloadPrefix: "wordpress",

Client: kubeClient,
Helper: helper.NewHelper(kubeClient),
Client: kubeClient,
Helper: helper.NewHelper(kubeClient),
}

startCtx, stopFunc = context.WithCancel(context.Background())
Expand Down
9 changes: 8 additions & 1 deletion pkg/kube/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kube

import (
"context"
"errors"
"fmt"
"io"

Expand All @@ -11,6 +12,8 @@ import (
"k8s.io/client-go/kubernetes"
)

var podControlledByJobNotFoundErr = errors.New("pod for job not found")

type LogsReader interface {
GetLogsByJobAndContainerName(ctx context.Context, job *batchv1.Job, containerName string) (io.ReadCloser, error)
GetTerminatedContainersStatusesByJob(ctx context.Context, job *batchv1.Job) (map[string]*corev1.ContainerStateTerminated, error)
Expand All @@ -32,7 +35,7 @@ func (r *logsReader) GetLogsByJobAndContainerName(ctx context.Context, job *batc
return nil, fmt.Errorf("getting pod controlled by job: %q: %w", job.Namespace+"/"+job.Name, err)
}
if pod == nil {
return nil, fmt.Errorf("getting pod controlled by job: %q: pod not found", job.Namespace+"/"+job.Name)
return nil, fmt.Errorf("getting pod controlled by job: %q: %w", job.Namespace+"/"+job.Name, podControlledByJobNotFoundErr)
}

return r.clientset.CoreV1().Pods(pod.Namespace).
Expand Down Expand Up @@ -87,3 +90,7 @@ func GetTerminatedContainersStatusesByPod(pod *corev1.Pod) map[string]*corev1.Co
}
return states
}

func IsPodControlledByJobNotFound(err error) bool {
return errors.Is(err, podControlledByJobNotFoundErr)
}
16 changes: 16 additions & 0 deletions pkg/operator/controller/ciskubebenchreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ func (r *CISKubeBenchReportReconciler) processCompleteScanJob(ctx context.Contex

logsStream, err := r.LogsReader.GetLogsByJobAndContainerName(ctx, job, r.Plugin.GetContainerName())
if err != nil {
if errors.IsNotFound(err) {
log.V(1).Info("Cached job must have been deleted")
return nil
}
if kube.IsPodControlledByJobNotFound(err) {
log.V(1).Info("Pod must have been deleted")
return r.deleteJob(ctx, job)
}
return fmt.Errorf("getting logs: %w", err)
}

Expand Down Expand Up @@ -321,6 +329,14 @@ func (r *CISKubeBenchReportReconciler) processFailedScanJob(ctx context.Context,

statuses, err := r.LogsReader.GetTerminatedContainersStatusesByJob(ctx, job)
if err != nil {
if errors.IsNotFound(err) {
log.V(1).Info("Cached job must have been deleted")
return nil
}
if kube.IsPodControlledByJobNotFound(err) {
log.V(1).Info("Pod must have been deleted")
return r.deleteJob(ctx, job)
}
return err
}
for container, status := range statuses {
Expand Down
16 changes: 16 additions & 0 deletions pkg/operator/controller/configauditreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,14 @@ func (r *ConfigAuditReportReconciler) processCompleteScanJob(ctx context.Context

logsStream, err := r.LogsReader.GetLogsByJobAndContainerName(ctx, job, r.Plugin.GetContainerName())
if err != nil {
if errors.IsNotFound(err) {
log.V(1).Info("Cached job must have been deleted")
return nil
}
if kube.IsPodControlledByJobNotFound(err) {
log.V(1).Info("Pod must have been deleted")
return r.deleteJob(ctx, job)
}
return fmt.Errorf("getting logs: %w", err)
}

Expand Down Expand Up @@ -439,6 +447,14 @@ func (r *ConfigAuditReportReconciler) processFailedScanJob(ctx context.Context,

statuses, err := r.LogsReader.GetTerminatedContainersStatusesByJob(ctx, scanJob)
if err != nil {
if errors.IsNotFound(err) {
log.V(1).Info("Cached job must have been deleted")
return nil
}
if kube.IsPodControlledByJobNotFound(err) {
log.V(1).Info("Pod must have been deleted")
return r.deleteJob(ctx, scanJob)
}
return err
}
for container, status := range statuses {
Expand Down
16 changes: 16 additions & 0 deletions pkg/operator/controller/vulnerabilityreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,14 @@ func (r *VulnerabilityReportReconciler) processCompleteScanJob(ctx context.Conte
for containerName, containerImage := range containerImages {
logsStream, err := r.LogsReader.GetLogsByJobAndContainerName(ctx, job, containerName)
if err != nil {
if k8sapierror.IsNotFound(err) {
log.V(1).Info("Cached job must have been deleted")
return nil
}
if kube.IsPodControlledByJobNotFound(err) {
log.V(1).Info("Pod must have been deleted")
return r.deleteJob(ctx, job)
}
return fmt.Errorf("getting logs for pod %q: %w", job.Namespace+"/"+job.Name, err)
}
reportData, err := r.Plugin.ParseVulnerabilityReportData(r.PluginContext, containerImage, logsStream)
Expand Down Expand Up @@ -411,6 +419,14 @@ func (r *VulnerabilityReportReconciler) processFailedScanJob(ctx context.Context

statuses, err := r.GetTerminatedContainersStatusesByJob(ctx, scanJob)
if err != nil {
if k8sapierror.IsNotFound(err) {
log.V(1).Info("Cached job must have been deleted")
return nil
}
if kube.IsPodControlledByJobNotFound(err) {
log.V(1).Info("Pod must have been deleted")
return r.deleteJob(ctx, scanJob)
}
return err
}
for container, status := range statuses {
Expand Down

0 comments on commit 5c79162

Please sign in to comment.