Skip to content

Commit

Permalink
refactor: Use kube.LogsReader to parse logs and termination statuses (#…
Browse files Browse the repository at this point in the history
…353)

Signed-off-by: Daniel Pacak <pacak.daniel@gmail.com>
  • Loading branch information
danielpacak authored Jan 20, 2021
1 parent 83e9a56 commit 27a2885
Show file tree
Hide file tree
Showing 15 changed files with 205 additions and 248 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ itests-starboard-operator: check-env get-ginkgo
github.com/aquasecurity/starboard/pkg/operator/controller,\
github.com/aquasecurity/starboard/pkg/operator/controller/job,\
github.com/aquasecurity/starboard/pkg/operator/controller/pod,\
github.com/aquasecurity/starboard/pkg/operator/logs,\
github.com/aquasecurity/starboard/pkg/trivy,\
github.com/aquasecurity/starboard/pkg/vulnerabilityreport \
./itest/starboard-operator
Expand Down
1 change: 1 addition & 0 deletions docs/images/starboard-icon-white.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ nav:
theme:
name: material
language: 'en'
logo: images/starboard-icon-white.svg

markdown_extensions:
- pymdownx.highlight
Expand Down
19 changes: 10 additions & 9 deletions pkg/configauditreport/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
)

type Scanner struct {
scheme *runtime.Scheme
clientset kubernetes.Interface
opts kube.ScannerOpts
pods *pod.Manager
plugin Plugin
scheme *runtime.Scheme
clientset kubernetes.Interface
opts kube.ScannerOpts
pods *pod.Manager
logsReader kube.LogsReader
plugin Plugin
ext.IDGenerator
}

Expand All @@ -41,6 +42,7 @@ func NewScanner(
opts: opts,
plugin: plugin,
pods: pod.NewPodManager(clientset),
logsReader: kube.NewLogsReader(clientset),
IDGenerator: ext.NewGoogleUUIDGenerator(),
}
}
Expand All @@ -61,7 +63,6 @@ func (s *Scanner) Scan(ctx context.Context, workload kube.Object, gvk schema.Gro

err = runner.New().Run(ctx, kube.NewRunnableJob(s.scheme, s.clientset, job))
if err != nil {
s.pods.LogRunnerErrors(ctx, job)
return v1alpha1.ConfigAuditReport{}, fmt.Errorf("running scan job: %w", err)
}

Expand All @@ -81,14 +82,14 @@ func (s *Scanner) Scan(ctx context.Context, workload kube.Object, gvk schema.Gro

klog.V(3).Infof("Getting logs for %s container in job: %s/%s", containerName,
job.Namespace, job.Name)
logsReader, err := s.pods.GetContainerLogsByJob(ctx, job, containerName)
logsStream, err := s.logsReader.GetLogsByJobAndContainerName(ctx, job, containerName)
if err != nil {
return v1alpha1.ConfigAuditReport{}, fmt.Errorf("getting logs: %w", err)
}

result, err := s.plugin.ParseConfigAuditResult(logsReader)
result, err := s.plugin.ParseConfigAuditResult(logsStream)
defer func() {
_ = logsReader.Close()
_ = logsStream.Close()
}()

return NewBuilder(s.scheme).
Expand Down
86 changes: 86 additions & 0 deletions pkg/kube/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package kube

import (
"context"
"fmt"
"io"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

type LogsReader interface {
GetLogsByJobAndContainerName(ctx context.Context, job *batchv1.Job, containerName string) (io.ReadCloser, error)
GetTerminatedContainersStatusesByJob(ctx context.Context, job *batchv1.Job) (map[string]*corev1.ContainerStateTerminated, error)
}

type logsReader struct {
clientset kubernetes.Interface
}

func NewLogsReader(clientset kubernetes.Interface) LogsReader {
return &logsReader{
clientset: clientset,
}
}

func (r *logsReader) GetLogsByJobAndContainerName(ctx context.Context, job *batchv1.Job, containerName string) (io.ReadCloser, error) {
pod, err := r.getPodByJob(ctx, job)
if err != nil {
return nil, fmt.Errorf("getting pod controllered by job: %q: %w", job.Namespace+"/"+job.Name, err)
}

return r.clientset.CoreV1().Pods(pod.Namespace).
GetLogs(pod.Name, &corev1.PodLogOptions{
Follow: true,
Container: containerName,
}).Stream(ctx)
}

func (r *logsReader) GetTerminatedContainersStatusesByJob(ctx context.Context, job *batchv1.Job) (map[string]*corev1.ContainerStateTerminated, error) {
pod, err := r.getPodByJob(ctx, job)
if err != nil {
return nil, err
}
statuses := GetTerminatedContainersStatusesByPod(pod)
return statuses, nil
}

func (r *logsReader) getPodByJob(ctx context.Context, job *batchv1.Job) (*corev1.Pod, error) {
refreshedJob, err := r.clientset.BatchV1().Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
selector := fmt.Sprintf("controller-uid=%s", refreshedJob.Spec.Selector.MatchLabels["controller-uid"])
podList, err := r.clientset.CoreV1().Pods(job.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: selector})
if err != nil {
return nil, err
}
if podList != nil && len(podList.Items) > 0 {
return &podList.Items[0], nil
}
return nil, nil
}

func GetTerminatedContainersStatusesByPod(pod *corev1.Pod) map[string]*corev1.ContainerStateTerminated {
states := make(map[string]*corev1.ContainerStateTerminated)
if pod == nil {
return states
}
for _, status := range pod.Status.InitContainerStatuses {
if status.State.Terminated == nil {
continue
}
states[status.Name] = status.State.Terminated
}
for _, status := range pod.Status.ContainerStatuses {
if status.State.Terminated == nil {
continue
}
states[status.Name] = status.State.Terminated
}
return states
}
28 changes: 3 additions & 25 deletions pkg/kube/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ import (
"strings"

"k8s.io/apimachinery/pkg/api/meta"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

// Object is a simplified representation of a Kubernetes object.
Expand Down Expand Up @@ -88,33 +87,12 @@ func (ci ContainerImages) FromJSON(value string) error {
return json.Unmarshal([]byte(value), &ci)
}

func GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersionKind, error) {
gvks, isUnversioned, err := scheme.ObjectKinds(obj)
if err != nil {
return schema.GroupVersionKind{}, err
}
if isUnversioned {
return schema.GroupVersionKind{}, fmt.Errorf("cannot create group-version-kind for unversioned type %T", obj)
}

if len(gvks) < 1 {
return schema.GroupVersionKind{}, fmt.Errorf("no group-version-kinds associated with type %T", obj)
}
if len(gvks) > 1 {
// this should only trigger for things like metav1.XYZ --
// normal versioned types should be fine
return schema.GroupVersionKind{}, fmt.Errorf(
"multiple group-version-kinds associated with type %T, refusing to guess at one", obj)
}
return gvks[0], nil
}

func KindForObject(object metav1.Object, scheme *runtime.Scheme) (string, error) {
ro, ok := object.(runtime.Object)
if !ok {
return "", fmt.Errorf("%T is not a runtime.Object", object)
}
gvk, err := GVKForObject(ro, scheme)
gvk, err := apiutil.GVKForObject(ro, scheme)
if err != nil {
return "", err
}
Expand Down
Loading

0 comments on commit 27a2885

Please sign in to comment.