Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvegamyhre committed Nov 22, 2023
1 parent ddac87b commit 2fe62ec
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 26 deletions.
7 changes: 4 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func main() {
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

kubeConfig := ctrl.GetConfigOrDie()
// TODO(#338): set QPS and Burst via config flags
kubeConfig.QPS = 500
kubeConfig.Burst = 500

Expand Down Expand Up @@ -113,7 +114,7 @@ func main() {
setupLog.Error(err, "unable to setup jobset reconciler indexes")
os.Exit(1)
}
if err := controllers.SetupPodReconcilerIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
if err := controllers.SetupPodIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
setupLog.Error(err, "unable to setup pod reconciler indexes")
os.Exit(1)
}
Expand Down Expand Up @@ -155,14 +156,14 @@ func setupControllers(mgr ctrl.Manager, certsReady chan struct{}) {

// Set up JobSet validating/defaulting webhook.
if err := (&jobset.JobSet{}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create validation/defaulting webhook", "webhook", "JobSet")
setupLog.Error(err, "unable to create webhook", "webhook", "JobSet")
os.Exit(1)
}

// Set up pod mutating and admission webhook.
podWebhook := webhooks.NewPodWebhook(mgr)
if err := podWebhook.SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create mutating webhook", "webhook", "Pod")
setupLog.Error(err, "unable to create webhook", "webhook", "Pod")
os.Exit(1)
}
//+kubebuilder:scaffold:builder
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func TestConstructJobsFromTemplate(t *testing.T) {
ns: ns,
replicas: 1,
jobIdx: 0,
topology: "cloud.google.com/gke-nodepool"}).
topology: "topology"}).
Suspend(false).
Subdomain(jobSetName).
NodeSelector(map[string]string{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func SetupPodReconcilerIndexes(ctx context.Context, indexer client.FieldIndexer) error {
func SetupPodIndexes(ctx context.Context, indexer client.FieldIndexer) error {
// Build index where key is the hash of the namespaced job name of the job that owns this pod,
// and value is the pod itself.
if err := indexer.IndexField(ctx, &corev1.Pod{}, podJobKey, func(obj client.Object) []string {
Expand Down
45 changes: 26 additions & 19 deletions pkg/webhooks/pod_admission_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@ import (

//+kubebuilder:webhook:path=/validate--v1-pod,mutating=false,failurePolicy=fail,sideEffects=None,groups="",resources=pods,verbs=create,versions=v1,name=vpod.kb.io,sideEffects=None,admissionReviewVersions=v1

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
// ValidateCreate validates that follower pods (job completion index != 0) part of a JobSet using exclusive
// placement are only admitted after the leader pod (job completion index == 0) has been scheduled.
func (p *podWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return nil, fmt.Errorf("expected a Pod but got a %T", obj)
}

// If this pod is not part of a JobSet, we don't need to validate anything.
// We can check the existence of the JobSetName annotation to determine this.
if _, isJobSetPod := pod.Annotations[jobset.JobSetNameKey]; !isJobSetPod {
return nil, nil
}

// If pod is part of a JobSet that is using the node selector exclusive placement strategy,
// we don't need to validate anything.
_, usingNodeSelectorStrategy := pod.Annotations[jobset.NodeSelectorStrategyKey]
if usingNodeSelectorStrategy {
if _, usingNodeSelectorStrategy := pod.Annotations[jobset.NodeSelectorStrategyKey]; usingNodeSelectorStrategy {
return nil, nil
}

Expand All @@ -39,22 +45,23 @@ func (p *podWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (ad
}

// Do not validate anything else for leader pods, proceed with creation immediately.
if !shared.IsLeaderPod(pod) {
// If a follower pod node selector has not been set, reject the creation.
if pod.Spec.NodeSelector == nil {
return nil, fmt.Errorf("follower pod node selector not set")
}
if _, exists := pod.Spec.NodeSelector[topologyKey]; !exists {
return nil, fmt.Errorf("follower pod node selector not set")
}
// For follower pods, validate leader pod exists and is scheduled.
leaderScheduled, err := p.leaderPodScheduled(ctx, pod)
if err != nil {
return nil, err
}
if !leaderScheduled {
return nil, fmt.Errorf("leader pod not yet scheduled, not creating follower pod %q", pod.Name)
}
if shared.IsLeaderPod(pod) {
return nil, nil
}
// If a follower pod node selector has not been set, reject the creation.
if pod.Spec.NodeSelector == nil {
return nil, fmt.Errorf("follower pod node selector not set")
}
if _, exists := pod.Spec.NodeSelector[topologyKey]; !exists {
return nil, fmt.Errorf("follower pod node selector not set")
}
// For follower pods, validate leader pod exists and is scheduled.
leaderScheduled, err := p.leaderPodScheduled(ctx, pod)
if err != nil {
return nil, err
}
if !leaderScheduled {
return nil, fmt.Errorf("leader pod not yet scheduled, not creating follower pod %q", pod.Name)
}
return nil, nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/webhooks/pod_mutating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func (p *podWebhook) InjectDecoder(d *admission.Decoder) error {
return nil
}

// Default will mutate pods being created in the following ways:
// 1. For leader pods (job completion index 0), pod affinities/anti-affinities for
// exclusive placement per topology are injected.
// 2. For follower pods (job completion index != 0), nodeSelectors for the same topology
// as their leader pod are injected.
func (p *podWebhook) Default(ctx context.Context, obj runtime.Object) error {
pod, ok := obj.(*corev1.Pod)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ var _ = BeforeSuite(func() {
// Set up pod reconciler and indexes.
podReconciler := controllers.NewPodReconciler(k8sManager.GetClient(), k8sManager.GetScheme(), k8sManager.GetEventRecorderFor("pod"))

err = controllers.SetupPodReconcilerIndexes(ctx, k8sManager.GetFieldIndexer())
err = controllers.SetupPodIndexes(ctx, k8sManager.GetFieldIndexer())
Expect(err).ToNot(HaveOccurred())

err = podReconciler.SetupWithManager(k8sManager)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/webhook/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ var _ = BeforeSuite(func() {

// Set up pod webhook and indexes.
podWebhook := webhooks.NewPodWebhook(mgr)
err = controllers.SetupPodReconcilerIndexes(ctx, mgr.GetFieldIndexer())
err = controllers.SetupPodIndexes(ctx, mgr.GetFieldIndexer())
Expect(err).NotTo(HaveOccurred())

err = podWebhook.SetupWebhookWithManager(mgr)
Expand Down

0 comments on commit 2fe62ec

Please sign in to comment.