Skip to content

Commit

Permalink
feat: cancel jobs that do not start in time (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaspin authored Jan 24, 2024
1 parent 36ce18d commit 46e2e12
Show file tree
Hide file tree
Showing 7 changed files with 405 additions and 34 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ A Kubernetes controller that runs Semaphore jobs in Kubernetes.
| SEMAPHORE_AGENT_STARTUP_PARAMETERS | Any additional [Semaphore agent configuration parameters](https://docs.semaphoreci.com/ci-cd-environment/configure-self-hosted-agent/) to pass to the agents being created. |
| KEEP_FAILED_JOBS_FOR | A [duration string](https://pkg.go.dev/time#ParseDuration) indicating how long to keep failed Kubernetes jobs. For example, `5m`. Default is 0. |
| KEEP_SUCCESSFUL_JOBS_FOR | A [duration string](https://pkg.go.dev/time#ParseDuration) indicating how long to successful failed Kubernetes jobs. For example, `5m`. Default is 0. |
| JOB_START_TIMEOUT | A [duration string](https://pkg.go.dev/time#ParseDuration) indicating how long to wait for a Kubernetes job created to start; after the timeout has passed, the Kubernetes job is deleted. Default is `5m`. |
74 changes: 74 additions & 0 deletions pkg/checks/checks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package checks

import (
"context"
"fmt"

"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

func IsJobRunning(clientset kubernetes.Interface, logger logr.Logger, job *batchv1.Job, versionFn func() (int, int)) bool {
//
// status.ready is behind a feature gate 'JobReadyPods',
// which is present since 1.23, and enabled by default since Kubernetes 1.24.
// See: https://kubernetes.io/docs/reference/command-line-tools-reference/feature-gates/
//
major, minor := versionFn()
if major > 1 || major == 1 && minor >= 24 {
return job.Status.Ready != nil && *job.Status.Ready > 0
}

//
// For Kubernetes versions <= 1.23,
// we need to check the pod directly.
//
ok, err := RunningPodExists(clientset, logger, job.Namespace, job.Name)
if err != nil {
logger.Error(err, "error checking if pod exists")
return false
}

return ok
}

func RunningPodExists(clientset kubernetes.Interface, logger logr.Logger, namespace, jobName string) (bool, error) {
//
// The built-in Kubernetes job controller adds the 'job-name'
// label to the pods it creates for its jobs, so we use it here,
// to find the one we are interested in.
//
pods, err := clientset.CoreV1().
Pods(namespace).
List(context.Background(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("job-name=%s", jobName),
})

if err != nil {
return false, fmt.Errorf("error listing pods: %v", err)
}

// pod does not exist
if len(pods.Items) == 0 {
logger.Info("Pod does not exist")
return false, nil
}

return IsPodRunning(logger, pods.Items[0]), nil
}

func IsPodRunning(logger logr.Logger, pod corev1.Pod) bool {
// if one of the pod's containers isn't ready, the pod is not running.
for _, container := range pod.Status.ContainerStatuses {
if !container.Ready {
logger.Info("Container is not ready", "container", container)
return false
}
}

logger.Info("Pod status", "status", pod.Status.Phase)
return pod.Status.Phase == corev1.PodRunning
}
143 changes: 143 additions & 0 deletions pkg/checks/checks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package checks

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2"
)

func Test__IsPodRunning(t *testing.T) {
t.Run("container not ready => false", func(t *testing.T) {
require.False(t, IsPodRunning(klog.Background(), corev1.Pod{
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{State: corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{}}},
},
},
}))
})

t.Run("pod pending => false", func(t *testing.T) {
require.False(t, IsPodRunning(klog.Background(), corev1.Pod{
Status: corev1.PodStatus{
Phase: corev1.PodPending,
ContainerStatuses: []corev1.ContainerStatus{
{State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{}}},
},
},
}))
})

t.Run("containers ready and pod running => false", func(t *testing.T) {
require.False(t, IsPodRunning(klog.Background(), corev1.Pod{
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
ContainerStatuses: []corev1.ContainerStatus{
{State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{}}},
},
},
}))
})
}

func Test__IsJobRunning(t *testing.T) {
// test for versions with JobReadyPods feature gate
versions := [][2]int{{1, 24}, {1, 25}, {1, 26}, {1, 27}, {1, 28}, {1, 29}, {1, 30}, {2, 0}}

for _, v := range versions {
versionFn := func() (int, int) { return v[0], v[1] }
t.Run(fmt.Sprintf("v%d.%d, ready flag unset => false", v[0], v[1]), func(t *testing.T) {
clientset := fake.NewSimpleClientset()
ready := int32(0)
j := &batchv1.Job{
Status: batchv1.JobStatus{
Ready: &ready,
},
}

require.False(t, IsJobRunning(clientset, klog.Background(), j, versionFn))
})

t.Run(fmt.Sprintf("v%d.%d, ready flag set => true", v[0], v[1]), func(t *testing.T) {
clientset := fake.NewSimpleClientset()
ready := int32(1)
j := &batchv1.Job{
Status: batchv1.JobStatus{
Ready: &ready,
},
}

require.True(t, IsJobRunning(clientset, klog.Background(), j, versionFn))
})
}

// test for versions without JobReadyPods feature gate
versions = [][2]int{{1, 18}, {1, 19}, {1, 20}, {1, 21}, {1, 22}, {1, 23}}

for _, v := range versions {
versionFn := func() (int, int) { return v[0], v[1] }

t.Run(fmt.Sprintf("v%d.%d, pod does not exist => false", v[0], v[1]), func(t *testing.T) {
clientset := fake.NewSimpleClientset()
ready := int32(1)
j := &batchv1.Job{
Status: batchv1.JobStatus{
Ready: &ready,
},
}

require.False(t, IsJobRunning(clientset, klog.Background(), j, versionFn))
})

t.Run(fmt.Sprintf("v%d.%d, pending pod exists => false", v[0], v[1]), func(t *testing.T) {
jobName := "job1"
clientset := fake.NewSimpleClientset([]runtime.Object{
&corev1.Pod{
Status: corev1.PodStatus{Phase: corev1.PodPending},
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{"job-name": jobName},
},
},
}...)

ready := int32(1)
j := &batchv1.Job{
ObjectMeta: v1.ObjectMeta{Name: jobName},
Status: batchv1.JobStatus{
Ready: &ready,
},
}

require.False(t, IsJobRunning(clientset, klog.Background(), j, versionFn))
})

t.Run(fmt.Sprintf("v%d.%d, running pod exists => true", v[0], v[1]), func(t *testing.T) {
jobName := "job1"
clientset := fake.NewSimpleClientset([]runtime.Object{
&corev1.Pod{
Status: corev1.PodStatus{Phase: corev1.PodRunning},
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{"job-name": jobName},
},
},
}...)

ready := int32(1)
j := &batchv1.Job{
ObjectMeta: v1.ObjectMeta{Name: jobName},
Status: batchv1.JobStatus{
Ready: &ready,
},
}

require.True(t, IsJobRunning(clientset, klog.Background(), j, versionFn))
})
}
}
16 changes: 16 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ const (
SemaphoreJobResourceType = "semaphore-job"
)

var (
defaultJobStartTimeout = 5 * time.Minute
)

type Config struct {
Namespace string
ServiceAccountName string
Expand All @@ -29,6 +33,7 @@ type Config struct {
SemaphoreEndpoint string
KeepFailedJobsFor time.Duration
KeepSuccessfulJobsFor time.Duration
JobStartTimeout time.Duration
}

func NewConfigFromEnv(endpoint string) (*Config, error) {
Expand Down Expand Up @@ -76,6 +81,7 @@ func NewConfigFromEnv(endpoint string) (*Config, error) {
Labels: labels,
KeepFailedJobsFor: keepFailedJobsFor(),
KeepSuccessfulJobsFor: keepSuccessfulJobsFor(),
JobStartTimeout: jobStartTimeout(),
}, nil
}

Expand Down Expand Up @@ -115,3 +121,13 @@ func keepSuccessfulJobsFor() time.Duration {

return keepFor
}

func jobStartTimeout() time.Duration {
timeout, err := time.ParseDuration(os.Getenv("JOB_START_TIMEOUT"))
if err != nil {
klog.Infof("No JOB_START_TIMEOUT set, using %v", defaultJobStartTimeout)
return defaultJobStartTimeout
}

return timeout
}
6 changes: 5 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ func New(
return nil, err
}

jobScheduler := NewJobScheduler(clientset, config)
jobScheduler, err := NewJobScheduler(clientset, config)
if err != nil {
return nil, err
}

if err := jobScheduler.RegisterInformer(informerFactory); err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 46e2e12

Please sign in to comment.