Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Events #299

Merged
merged 10 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/leaderworkerset/v1/leaderworkerset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ const (
// LeaderWorkerSetUpgradeInProgress means lws is performing a rolling update. UpgradeInProgress
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
// is true when the lws is in upgrade process after the (leader/worker) template is updated. If only replicas is modified, it will
// not be considered as UpgradeInProgress.
LeaderWorkerSetUpgradeInProgress LeaderWorkerSetConditionType = "UpgradeInProgress"
LeaderWorkerSetUpdateInProgress LeaderWorkerSetConditionType = "UpdateInProgress"
)

// +genclient
Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func setupControllers(mgr ctrl.Manager, certsReady chan struct{}) {
os.Exit(1)
}
// Set up pod reconciler.
podController := controllers.NewPodReconciler(mgr.GetClient(), mgr.GetScheme())
podController := controllers.NewPodReconciler(mgr.GetClient(), mgr.GetScheme(), mgr.GetEventRecorderFor("leaderworkerset"))
if err := podController.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Pod")
os.Exit(1)
Expand Down
36 changes: 27 additions & 9 deletions pkg/controllers/leaderworkerset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ const (
const (
// FailedCreate Event reason used when a resource creation fails.
// The event uses the error(s) as the reason.
FailedCreate = "FailedCreate"
FailedCreate = "FailedCreate"
GroupsAreProgressing = "GroupsAreProgressing"
ahg-g marked this conversation as resolved.
Show resolved Hide resolved
GroupsAreUpdating = "GroupsAreUpdating"
)

func NewLeaderWorkerSetReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *LeaderWorkerSetReconciler {
Expand Down Expand Up @@ -135,7 +137,21 @@ func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, err
}

if !lwsUpdated {
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
// An event is logged to track update progress.
if leaderSts != nil && partition != *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition {
r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsAreUpdating, fmt.Sprintf("Updating replicas %d to %d", *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition, partition))
}
}

if leaderSts == nil {
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsAreProgressing, fmt.Sprintf("Creating leader statefulset %s", lws.Name))
}

if err := r.SSAWithStatefulset(ctx, lws, partition, replicas, revisionutils.GetRevisionKey(revision)); err != nil {
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
if leaderSts == nil {
r.Record.Eventf(lws, corev1.EventTypeWarning, FailedCreate, fmt.Sprintf("Failed to create leader statefulset %s", lws.Name))
}
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -257,7 +273,9 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context,
// When we have n unready replicas and n bursted replicas, we should
// start to release the burst replica gradually for the accommodation of
// the unready ones.
return lwsReplicas + utils.NonZeroValue(int32(unreadyReplicas)-1)
finalReplicas := lwsReplicas + utils.NonZeroValue(int32(unreadyReplicas)-1)
r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsAreProgressing, fmt.Sprintf("deleting surge replica %s-%d", lws.Name, finalReplicas))
return finalReplicas
}
return burstReplicas
}
Expand Down Expand Up @@ -418,8 +436,8 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l
if updatedNonBurstWorkerCount < currentNonBurstWorkerCount {
// upgradeInProgress is true when the upgrade replicas is smaller than the expected
// number of total replicas not including the burst replicas
conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetUpdateInProgress))
conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing))
conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetUpgradeInProgress))
} else if updatedAndReadyCount == int(*lws.Spec.Replicas) {
conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetAvailable))
updateDone = true
Expand Down Expand Up @@ -677,13 +695,13 @@ func makeCondition(conditionType leaderworkerset.LeaderWorkerSetConditionType) m
condtype = string(leaderworkerset.LeaderWorkerSetAvailable)
reason = "AllGroupsReady"
message = "All replicas are ready"
case leaderworkerset.LeaderWorkerSetUpgradeInProgress:
condtype = string(leaderworkerset.LeaderWorkerSetUpgradeInProgress)
reason = "GroupsAreUpgrading"
case leaderworkerset.LeaderWorkerSetUpdateInProgress:
condtype = string(leaderworkerset.LeaderWorkerSetUpdateInProgress)
reason = GroupsAreUpdating
message = "Rolling Upgrade is in progress"
default:
condtype = string(leaderworkerset.LeaderWorkerSetProgressing)
reason = "GroupsAreProgressing"
reason = GroupsAreProgressing
message = "Replicas are progressing"
}

Expand Down Expand Up @@ -747,8 +765,8 @@ func exclusiveConditionTypes(condition1 metav1.Condition, condition2 metav1.Cond
return true
}

if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetUpgradeInProgress)) ||
(condition1.Type == string(leaderworkerset.LeaderWorkerSetUpgradeInProgress) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress)) ||
(condition1.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
return true
}

Expand Down
14 changes: 12 additions & 2 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
appsapplyv1 "k8s.io/client-go/applyconfigurations/apps/v1"
coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1"
metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -49,12 +51,14 @@ import (
type PodReconciler struct {
client.Client
Scheme *runtime.Scheme
Record record.EventRecorder
}

func NewPodReconciler(client client.Client, schema *runtime.Scheme) *PodReconciler {
return &PodReconciler{Client: client, Scheme: schema}
func NewPodReconciler(client client.Client, schema *runtime.Scheme, record record.EventRecorder) *PodReconciler {
return &PodReconciler{Client: client, Scheme: schema, Record: record}
}

//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
//+kubebuilder:rbac:groups=core,resources=pods,verbs=create;delete;get;list;patch;update;watch
//+kubebuilder:rbac:groups=core,resources=pods/finalizers,verbs=update
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;update;patch
Expand Down Expand Up @@ -161,8 +165,13 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return ctrl.Result{}, err
}
if err = r.Create(ctx, workerStatefulSet); err != nil {
r.Record.Eventf(&leaderWorkerSet, corev1.EventTypeWarning, FailedCreate, fmt.Sprintf("Failed to create worker statefulset for leader pod %s", pod.Name))
if apierrors.IsAlreadyExists(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
}
r.Record.Eventf(&leaderWorkerSet, corev1.EventTypeNormal, GroupsAreProgressing, fmt.Sprintf("Creating worker statefulset for leader pod %s", pod.Name))
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
}
log.V(2).Info("Worker Reconcile completed.")
return ctrl.Result{}, nil
Expand Down Expand Up @@ -202,6 +211,7 @@ func (r *PodReconciler) handleRestartPolicy(ctx context.Context, pod corev1.Pod,
}); err != nil {
return false, err
}
r.Record.Eventf(&leaderWorkerSet, corev1.EventTypeNormal, "RecreateGroupOnPodRestart", fmt.Sprintf("Worker pod %s failed, deleting leader pod %s to recreate group %s", pod.Name, leader.Name, leader.Labels[leaderworkerset.GroupIndexLabelKey]))
Edwinhr716 marked this conversation as resolved.
Show resolved Hide resolved
return true, nil
}

Expand Down
17 changes: 12 additions & 5 deletions test/integration/controllers/leaderworkerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
"sigs.k8s.io/lws/pkg/controllers"
revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
testing "sigs.k8s.io/lws/test/testutils"
)
Expand Down Expand Up @@ -200,6 +201,9 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
{
checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) {
testing.ExpectLeaderSetExist(ctx, deployment, k8sClient)
testing.ValidateEvent(ctx, k8sClient, controllers.GroupsAreProgressing, corev1.EventTypeNormal, "Creating leader statefulset test-sample", deployment.Namespace)
testing.ValidateEvent(ctx, k8sClient, controllers.GroupsAreProgressing, corev1.EventTypeNormal, "Creating worker statefulset for leader pod test-sample-0", deployment.Namespace)
testing.ValidateEvent(ctx, k8sClient, controllers.GroupsAreProgressing, corev1.EventTypeNormal, "Creating worker statefulset for leader pod test-sample-1", deployment.Namespace)
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, deployment, 2)
testing.ExpectValidWorkerStatefulSets(ctx, deployment, k8sClient, true)
},
Expand Down Expand Up @@ -348,12 +352,12 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
{
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing")
testing.ValidateLatestEvent(ctx, k8sClient, "GroupsAreProgressing", corev1.EventTypeNormal, "Replicas are progressing, with 0 groups ready of total 2 groups", lws.Namespace)
testing.ValidateEvent(ctx, k8sClient, controllers.GroupsAreProgressing, corev1.EventTypeNormal, "Replicas are progressing, with 0 groups ready of total 2 groups", lws.Namespace)
// Force groups to ready.
testing.SetPodGroupsToReady(ctx, k8sClient, lws, 2)
testing.ExpectLeaderWorkerSetNotProgressing(ctx, k8sClient, lws, "Replicas are progressing")
testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready")
testing.ValidateLatestEvent(ctx, k8sClient, "AllGroupsReady", corev1.EventTypeNormal, "All replicas are ready, with 2 groups ready of total 2 groups", lws.Namespace)
testing.ValidateEvent(ctx, k8sClient, "AllGroupsReady", corev1.EventTypeNormal, "All replicas are ready, with 2 groups ready of total 2 groups", lws.Namespace)
// Force a reconcile. Refetch most recent version of LWS, increase replicas.
patch := client.MergeFrom(&leaderworkerset.LeaderWorkerSet{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -373,7 +377,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing")
testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready")
// Check most recent event.
testing.ValidateLatestEvent(ctx, k8sClient, "GroupsAreProgressing", corev1.EventTypeNormal, "Replicas are progressing, with 2 groups ready of total 3 groups", lws.Namespace)
testing.ValidateEvent(ctx, k8sClient, controllers.GroupsAreProgressing, corev1.EventTypeNormal, "Replicas are progressing, with 2 groups ready of total 3 groups", lws.Namespace)
},
},
},
Expand Down Expand Up @@ -441,6 +445,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
var leaderPod corev1.Pod
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name + "-0", Namespace: lws.Namespace}, &leaderPod)).To(gomega.Succeed())
gomega.Expect(leaderPod.DeletionTimestamp != nil).To(gomega.BeTrue())
testing.ValidateEvent(ctx, k8sClient, "RecreateGroupOnPodRestart", corev1.EventTypeNormal, "Worker pod test-sample-0-1 failed, deleting leader pod test-sample-0 to recreate group 0", lws.Namespace)
},
},
},
Expand All @@ -453,7 +458,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 2)
testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true)
testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing")
testing.ValidateLatestEvent(ctx, k8sClient, "GroupsAreProgressing", corev1.EventTypeNormal, "Replicas are progressing, with 0 groups ready of total 2 groups", lws.Namespace)
testing.ValidateEvent(ctx, k8sClient, controllers.GroupsAreProgressing, corev1.EventTypeNormal, "Replicas are progressing, with 0 groups ready of total 2 groups", lws.Namespace)
},
},
},
Expand Down Expand Up @@ -481,7 +486,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
},
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready")
testing.ValidateLatestEvent(ctx, k8sClient, "AllGroupsReady", corev1.EventTypeNormal, "All replicas are ready, with 2 groups ready of total 2 groups", lws.Namespace)
testing.ValidateEvent(ctx, k8sClient, "AllGroupsReady", corev1.EventTypeNormal, "All replicas are ready, with 2 groups ready of total 2 groups", lws.Namespace)
},
},
},
Expand Down Expand Up @@ -518,6 +523,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress")
// This should be 4 at the first step, however, reconciliation syncs quickly and
// soon updated to 3 (replicas-maxUnavailable), it's fine here.
testing.ValidateEvent(ctx, k8sClient, controllers.GroupsAreUpdating, corev1.EventTypeNormal, "Updating replicas 4 to 3", lws.Namespace)
testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 3)
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 4, 0)
},
Expand All @@ -532,6 +538,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready")
testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing")
testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress")
testing.ValidateEvent(ctx, k8sClient, controllers.GroupsAreUpdating, corev1.EventTypeNormal, "Updating replicas 3 to 2", lws.Namespace)
testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 2)
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 4, 1)
},
Expand Down
2 changes: 1 addition & 1 deletion test/integration/controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var _ = BeforeSuite(func() {
err = lwsController.SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

podController := controllers.NewPodReconciler(k8sManager.GetClient(), k8sManager.GetScheme())
podController := controllers.NewPodReconciler(k8sManager.GetClient(), k8sManager.GetScheme(), k8sManager.GetEventRecorderFor("pod"))
err = podController.SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

Expand Down
19 changes: 10 additions & 9 deletions test/testutils/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,19 +375,19 @@ func ExpectLeaderWorkerSetNotProgressing(ctx context.Context, k8sClient client.C
}

func ExpectLeaderWorkerSetUpgradeInProgress(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet, message string) {
ginkgo.By(fmt.Sprintf("checking leaderworkerset status(%s) is true", leaderworkerset.LeaderWorkerSetUpgradeInProgress))
ginkgo.By(fmt.Sprintf("checking leaderworkerset status(%s) is true", leaderworkerset.LeaderWorkerSetUpdateInProgress))
condition := metav1.Condition{
Type: string(leaderworkerset.LeaderWorkerSetUpgradeInProgress),
Type: string(leaderworkerset.LeaderWorkerSetUpdateInProgress),
Status: metav1.ConditionTrue,
Message: message,
}
gomega.Eventually(CheckLeaderWorkerSetHasCondition, Timeout, Interval).WithArguments(ctx, k8sClient, lws, condition).Should(gomega.Equal(true))
}

func ExpectLeaderWorkerSetNoUpgradeInProgress(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet, message string) {
ginkgo.By(fmt.Sprintf("checking leaderworkerset status(%s) is true", leaderworkerset.LeaderWorkerSetUpgradeInProgress))
ginkgo.By(fmt.Sprintf("checking leaderworkerset status(%s) is true", leaderworkerset.LeaderWorkerSetUpdateInProgress))
condition := metav1.Condition{
Type: string(leaderworkerset.LeaderWorkerSetUpgradeInProgress),
Type: string(leaderworkerset.LeaderWorkerSetUpdateInProgress),
Status: metav1.ConditionFalse,
Message: message,
}
Expand Down Expand Up @@ -442,7 +442,7 @@ func ExpectLeaderWorkerSetUnavailable(ctx context.Context, k8sClient client.Clie
}

// ValidateLatestEvent will return true if the latest event is as you want.
func ValidateLatestEvent(ctx context.Context, k8sClient client.Client, eventReason string, eventType string, eventNote string, namespace string) {
func ValidateEvent(ctx context.Context, k8sClient client.Client, eventReason string, eventType string, eventNote string, namespace string) {
gomega.Eventually(func() error {
events := &eventsv1.EventList{}
if err := k8sClient.List(ctx, events, &client.ListOptions{Namespace: namespace}); err != nil {
Expand All @@ -454,12 +454,13 @@ func ValidateLatestEvent(ctx context.Context, k8sClient client.Client, eventReas
return fmt.Errorf("no events currently exist")
}

item := events.Items[length-1]
if item.Reason == eventReason && item.Type == eventType && item.Note == eventNote {
return nil
for _, item := range events.Items {
if item.Reason == eventReason && item.Type == eventType && item.Note == eventNote {
return nil
}
}

return fmt.Errorf("mismatch with the latest event: got r:%v t:%v n:%v, reg %v", item.Reason, item.Type, item.Note, item.Regarding)
return fmt.Errorf("mismatch with the expected event: expected r:%v t:%v n:%v", eventReason, eventType, eventNote)

}, Timeout, Interval).Should(gomega.BeNil())
}
Expand Down