Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove kubernetes 1.27 from supported versions #3247

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions pkg/controller/jobs/job/job_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"sigs.k8s.io/kueue/pkg/controller/jobframework/webhook"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
)

var (
Expand All @@ -47,7 +46,6 @@ var (

type JobWebhook struct {
manageJobsWithoutQueueName bool
kubeServerVersion *kubeversion.ServerVersionFetcher
queues *queue.Manager
cache *cache.Cache
}
Expand All @@ -57,7 +55,6 @@ func SetupWebhook(mgr ctrl.Manager, opts ...jobframework.Option) error {
options := jobframework.ProcessOptions(opts...)
wh := &JobWebhook{
manageJobsWithoutQueueName: options.ManageJobsWithoutQueueName,
kubeServerVersion: options.KubeServerVersion,
queues: options.Queues,
cache: options.Cache,
}
Expand Down Expand Up @@ -152,12 +149,6 @@ func (w *JobWebhook) validateSyncCompletionCreate(job *Job) field.ErrorList {
if job.Spec.CompletionMode == nil || *job.Spec.CompletionMode == batchv1.NonIndexedCompletion {
allErrs = append(allErrs, field.Invalid(syncCompletionAnnotationsPath, job.Annotations[JobCompletionsEqualParallelismAnnotation], "should not be enabled for NonIndexed jobs"))
}
if w.kubeServerVersion != nil {
version := w.kubeServerVersion.GetServerVersion()
if version.String() == "" || version.LessThan(kubeversion.KubeVersion1_27) {
allErrs = append(allErrs, field.Invalid(syncCompletionAnnotationsPath, job.Annotations[JobCompletionsEqualParallelismAnnotation], "only supported in Kubernetes 1.27 or newer"))
}
}
if ptr.Deref(job.Spec.Parallelism, 1) != ptr.Deref(job.Spec.Completions, 1) {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "completions"), job.Spec.Completions, fmt.Sprintf("should be equal to parallelism when %s is annotation is true", JobCompletionsEqualParallelismAnnotation)))
}
Expand Down
52 changes: 5 additions & 47 deletions pkg/controller/jobs/job/job_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,13 @@ import (
apivalidation "k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/version"
fakediscovery "k8s.io/client-go/discovery/fake"
fakeclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/ptr"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingutil "sigs.k8s.io/kueue/pkg/util/testingjobs/job"

Expand All @@ -60,10 +56,9 @@ var (

func TestValidateCreate(t *testing.T) {
testcases := []struct {
name string
job *batchv1.Job
wantErr field.ErrorList
serverVersion string
name string
job *batchv1.Job
wantErr field.ErrorList
}{
{
name: "simple",
Expand Down Expand Up @@ -134,7 +129,6 @@ func TestValidateCreate(t *testing.T) {
wantErr: field.ErrorList{
field.Invalid(field.NewPath("spec", "completions"), ptr.To[int32](6), fmt.Sprintf("should be equal to parallelism when %s is annotation is true", JobCompletionsEqualParallelismAnnotation)),
},
serverVersion: "1.27.0",
},
{
name: "valid sync completions annotation, wrong job completions type (default)",
Expand All @@ -146,7 +140,6 @@ func TestValidateCreate(t *testing.T) {
wantErr: field.ErrorList{
field.Invalid(syncCompletionAnnotationsPath, "true", "should not be enabled for NonIndexed jobs"),
},
serverVersion: "1.27.0",
},
{
name: "valid sync completions annotation, wrong job completions type",
Expand All @@ -159,32 +152,6 @@ func TestValidateCreate(t *testing.T) {
wantErr: field.ErrorList{
field.Invalid(syncCompletionAnnotationsPath, "true", "should not be enabled for NonIndexed jobs"),
},
serverVersion: "1.27.0",
},
{
name: "valid sync completions annotation, server version less then 1.27",
job: testingutil.MakeJob("job", "default").
Parallelism(4).
Completions(4).
SetAnnotation(JobCompletionsEqualParallelismAnnotation, "true").
Indexed(true).
Obj(),
wantErr: field.ErrorList{
field.Invalid(syncCompletionAnnotationsPath, "true", "only supported in Kubernetes 1.27 or newer"),
},
serverVersion: "1.26.3",
},
{
name: "valid sync completions annotation, server version wasn't specified",
job: testingutil.MakeJob("job", "default").
Parallelism(4).
Completions(4).
SetAnnotation(JobCompletionsEqualParallelismAnnotation, "true").
Indexed(true).
Obj(),
wantErr: field.ErrorList{
field.Invalid(syncCompletionAnnotationsPath, "true", "only supported in Kubernetes 1.27 or newer"),
},
},
{
name: "valid sync completions annotation",
Expand All @@ -194,8 +161,7 @@ func TestValidateCreate(t *testing.T) {
SetAnnotation(JobCompletionsEqualParallelismAnnotation, "true").
Indexed(true).
Obj(),
wantErr: nil,
serverVersion: "1.27.0",
wantErr: nil,
},
{
name: "invalid prebuilt workload",
Expand All @@ -208,7 +174,6 @@ func TestValidateCreate(t *testing.T) {
wantErr: field.ErrorList{
field.Invalid(prebuiltWlNameLabelPath, "workload name", invalidRFC1123Message),
},
serverVersion: "1.27.0",
},
{
name: "valid prebuilt workload",
Expand All @@ -218,20 +183,13 @@ func TestValidateCreate(t *testing.T) {
Label(constants.PrebuiltWorkloadLabel, "workload-name").
Indexed(true).
Obj(),
wantErr: nil,
serverVersion: "1.27.0",
wantErr: nil,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
jw := &JobWebhook{}
fakeDiscoveryClient, _ := fakeclient.NewSimpleClientset().Discovery().(*fakediscovery.FakeDiscovery)
fakeDiscoveryClient.FakedServerVersion = &version.Info{GitVersion: tc.serverVersion}
jw.kubeServerVersion = kubeversion.NewServerVersionFetcher(fakeDiscoveryClient)
if err := jw.kubeServerVersion.FetchServerVersion(); err != nil && tc.serverVersion != "" {
t.Fatalf("Failed fetching server version: %v", err)
}

gotErr := jw.validateCreate((*Job)(tc.job))

Expand Down
10 changes: 1 addition & 9 deletions pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
"sigs.k8s.io/kueue/pkg/util/maps"
"sigs.k8s.io/kueue/pkg/util/parallelize"
utilpod "sigs.k8s.io/kueue/pkg/util/pod"
Expand Down Expand Up @@ -88,7 +87,6 @@ var (
gvk = corev1.SchemeGroupVersion.WithKind("Pod")
errIncorrectReconcileRequest = errors.New("event handler error: got a single pod reconcile request for a pod group")
errPendingOps = jobframework.UnretryableError("waiting to observe previous operations on pods")
errPodNoSupportKubeVersion = errors.New("pod integration only supported in Kubernetes 1.27 or newer")
errPodGroupLabelsMismatch = errors.New("constructing workload: pods have different label values")
)

Expand Down Expand Up @@ -495,13 +493,7 @@ func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {
return nil
}

func CanSupportIntegration(opts ...jobframework.Option) (bool, error) {
options := jobframework.ProcessOptions(opts...)

v := options.KubeServerVersion.GetServerVersion()
if v.String() == "" || v.LessThan(kubeversion.KubeVersion1_27) {
return false, fmt.Errorf("kubernetesVersion %q: %w", v.String(), errPodNoSupportKubeVersion)
}
func CanSupportIntegration(_ ...jobframework.Option) (bool, error) {
Copy link
Contributor

@mbobrovskyi mbobrovskyi Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this function as well? It seems to be optional.

return true, nil
}

Expand Down
4 changes: 0 additions & 4 deletions pkg/util/kubeversion/kubeversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ import (

const fetchServerVersionInterval = time.Minute * 10

var (
KubeVersion1_27 = versionutil.MustParseSemantic("1.27.0")
)

type ServerVersionFetcher struct {
dc discovery.DiscoveryInterface
ticker *time.Ticker
Expand Down
17 changes: 0 additions & 17 deletions test/e2e/singlecluster/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/discovery"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/jobs/pod"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
"sigs.k8s.io/kueue/pkg/util/testing"
podtesting "sigs.k8s.io/kueue/pkg/util/testingjobs/pod"
"sigs.k8s.io/kueue/test/util"
Expand All @@ -50,9 +46,6 @@ var _ = ginkgo.Describe("Pod groups", func() {
)

ginkgo.BeforeEach(func() {
if kubeVersion().LessThan(kubeversion.KubeVersion1_27) {
vladikkuzn marked this conversation as resolved.
Show resolved Hide resolved
ginkgo.Skip("Unsupported in versions older than 1.27")
}
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "pod-e2e-",
Expand Down Expand Up @@ -590,13 +583,3 @@ func expectWorkloadFinalized(ctx context.Context, k8sClient client.Client, wlKey
return fmt.Errorf("workload %s is not finalized yet", wlKey)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
}

func kubeVersion() *version.Version {
cfg, err := config.GetConfigWithContext("")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
v, err := kubeversion.FetchServerVersion(discoveryClient)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
return v
}
13 changes: 0 additions & 13 deletions test/integration/webhook/jobs/job_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,6 @@ var _ = ginkgo.Describe("Job Webhook With manageJobsWithoutQueueName enabled", g
createdJob.Spec.Suspend = ptr.To(false)
gomega.Expect(k8sClient.Update(ctx, createdJob)).ShouldNot(gomega.Succeed())
})

ginkgo.It("Should not succeed Job when kubernetes less than 1.27 and sync completions annotation is enabled for indexed jobs", func() {
if v := serverVersionFetcher.GetServerVersion(); v.AtLeast(kubeversion.KubeVersion1_27) {
ginkgo.Skip("Kubernetes version is not less then 1.27. Skip test...")
}
j := testingjob.MakeJob("job-without-queue-name", ns.Name).
Parallelism(5).
Completions(5).
SetAnnotation(job.JobCompletionsEqualParallelismAnnotation, "true").
Indexed(true).
Obj()
gomega.Expect(apierrors.IsForbidden(k8sClient.Create(ctx, j))).Should(gomega.BeTrue())
})
})

var _ = ginkgo.Describe("Job Webhook with manageJobsWithoutQueueName disabled", ginkgo.Ordered, func() {
Expand Down