Skip to content

Commit

Permalink
Switch from record.EventRecorder to events.EventRecorder
Browse files Browse the repository at this point in the history
  • Loading branch information
vladikkuzn committed Jul 22, 2024
1 parent a2e7fdb commit 006767f
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 63 deletions.
2 changes: 1 addition & 1 deletion keps/369-job-interface/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ Besides, we'll provide a full controller for developers to follow, all they need
type reconcileOptions struct {
client client.Client
scheme *runtime.Scheme
record record.EventRecorder
record events.EventRecorder
manageJobsWithoutQueueName bool
waitForPodsReady bool
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/admissionchecks/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
autoscaling "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -71,7 +71,7 @@ func newProvisioningConfigHelper(c client.Client) (*provisioningConfigHelper, er

type Controller struct {
client client.Client
record record.EventRecorder
record events.EventRecorder
helper *provisioningConfigHelper
maxRetries int32
minBackoffSeconds int32
Expand Down Expand Up @@ -108,7 +108,7 @@ func WithMinBackoffSeconds(minBackoffSeconds int32) Option {
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=admissionchecks,verbs=get;list;watch
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=provisioningrequestconfigs,verbs=get;list;watch

func NewController(client client.Client, record record.EventRecorder, opts ...Option) (*Controller, error) {
func NewController(client client.Client, record events.EventRecorder, opts ...Option) (*Controller, error) {
helper, err := newProvisioningConfigHelper(client)
if err != nil {
return nil, err
Expand Down Expand Up @@ -300,7 +300,7 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
if err := c.client.Create(ctx, req); err != nil {
return nil, err
}
c.record.Eventf(wl, corev1.EventTypeNormal, "ProvisioningRequestCreated", "Created ProvisioningRequest: %q", req.Name)
c.record.Eventf(wl, nil, corev1.EventTypeNormal, "ProvisioningRequestCreated", "", "Created ProvisioningRequest: %q", req.Name)
activeOrLastPRForChecks[checkName] = req
}
if err := c.syncProvisionRequestsPodTemplates(ctx, wl, requestName, prc); err != nil {
Expand Down Expand Up @@ -577,7 +577,7 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
return err
}
for i := range recorderMessages {
c.record.Event(wl, corev1.EventTypeNormal, "AdmissionCheckUpdated", api.TruncateEventMessage(recorderMessages[i]))
c.record.Eventf(wl, nil, corev1.EventTypeNormal, "AdmissionCheckUpdated", "", api.TruncateEventMessage(recorderMessages[i]))
}
}
return nil
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -104,11 +104,11 @@ type WorkloadReconciler struct {
client client.Client
watchers []WorkloadUpdateWatcher
waitForPodsReady *waitForPodsReadyConfig
recorder record.EventRecorder
recorder events.EventRecorder
clock clock.Clock
}

func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache, recorder record.EventRecorder, opts ...Option) *WorkloadReconciler {
func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache, recorder events.EventRecorder, opts ...Option) *WorkloadReconciler {
options := defaultOptions
for _, opt := range opts {
opt(&options)
Expand Down Expand Up @@ -255,7 +255,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
queuedWaitTime := workload.QueuedWaitTime(&wl)
quotaReservedCondition := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadQuotaReserved)
quotaReservedWaitTime := r.clock.Since(quotaReservedCondition.LastTransitionTime.Time)
r.recorder.Eventf(&wl, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was %.0fs", wl.Status.Admission.ClusterQueue, quotaReservedWaitTime.Seconds())
r.recorder.Eventf(&wl, nil, corev1.EventTypeNormal, "Admitted", "", "Admitted by ClusterQueue %v, wait time since reservation was %.0fs", wl.Status.Admission.ClusterQueue, quotaReservedWaitTime.Seconds())
metrics.AdmittedWorkload(kueue.ClusterQueueReference(cqName), queuedWaitTime)
metrics.AdmissionChecksWaitTime(kueue.ClusterQueueReference(cqName), quotaReservedWaitTime)
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl
return false, err
}
rejectedCheck := workload.RejectedChecks(wl)[0]
r.recorder.Eventf(wl, corev1.EventTypeWarning, "AdmissionCheckRejected", "Deactivating workload because AdmissionCheck for %v was Rejected: %s", rejectedCheck.Name, rejectedCheck.Message)
r.recorder.Eventf(wl, nil, corev1.EventTypeWarning, "AdmissionCheckRejected", "", "Deactivating workload because AdmissionCheck for %v was Rejected: %s", rejectedCheck.Name, rejectedCheck.Message)
return true, nil
}
// at this point we know a Workload has at least one Retry AdmissionCheck
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/jobframework/integrationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/utils/set"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -44,7 +44,7 @@ type JobReconcilerInterface interface {
SetupWithManager(mgr ctrl.Manager) error
}

type ReconcilerFactory func(client client.Client, record record.EventRecorder, opts ...Option) JobReconcilerInterface
type ReconcilerFactory func(client client.Client, record events.EventRecorder, opts ...Option) JobReconcilerInterface

// IntegrationCallbacks groups a set of callbacks used to integrate a new framework.
type IntegrationCallbacks struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/jobframework/integrationmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -48,7 +48,7 @@ func (t *testReconciler) SetupWithManager(mgr ctrlmgr.Manager) error {

var _ JobReconcilerInterface = (*testReconciler)(nil)

func testNewReconciler(client.Client, record.EventRecorder, ...Option) JobReconcilerInterface {
func testNewReconciler(client.Client, events.EventRecorder, ...Option) JobReconcilerInterface {
return &testReconciler{}
}

Expand Down Expand Up @@ -356,7 +356,7 @@ func TestForEach(t *testing.T) {

func TestGetJobTypeForOwner(t *testing.T) {
dontManage := IntegrationCallbacks{
NewReconciler: func(client.Client, record.EventRecorder, ...Option) JobReconcilerInterface {
NewReconciler: func(client.Client, events.EventRecorder, ...Option) JobReconcilerInterface {
panic("not implemented")
},
SetupWebhook: func(ctrl.Manager, ...Option) error { panic("not implemented") },
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
Expand Down Expand Up @@ -107,13 +107,13 @@ type ComposableJob interface {
Load(ctx context.Context, c client.Client, key *types.NamespacedName) (removeFinalizers bool, err error)
// Run unsuspends all members of the ComposableJob and injects the node affinity with podSet
// counts extracting from workload to all members of the ComposableJob.
Run(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, r record.EventRecorder, msg string) error
Run(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, r events.EventRecorder, msg string) error
// ConstructComposableWorkload returns a new Workload that's assembled out of all members of the ComposableJob.
ConstructComposableWorkload(ctx context.Context, c client.Client, r record.EventRecorder, labelKeysToCopy []string) (*kueue.Workload, error)
ConstructComposableWorkload(ctx context.Context, c client.Client, r events.EventRecorder, labelKeysToCopy []string) (*kueue.Workload, error)
// ListChildWorkloads returns all workloads related to the composable job.
ListChildWorkloads(ctx context.Context, c client.Client, parent types.NamespacedName) (*kueue.WorkloadList, error)
// FindMatchingWorkloads returns all related workloads, workload that matches the ComposableJob and duplicates that has to be deleted.
FindMatchingWorkloads(ctx context.Context, c client.Client, r record.EventRecorder) (match *kueue.Workload, toDelete []*kueue.Workload, err error)
FindMatchingWorkloads(ctx context.Context, c client.Client, r events.EventRecorder) (match *kueue.Workload, toDelete []*kueue.Workload, err error)
// Stop implements the custom stop procedure for ComposableJob.
Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason StopReason, eventMsg string) ([]client.Object, error)
// ForEach calls f on each member of the ComposableJob.
Expand Down
32 changes: 16 additions & 16 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -71,7 +71,7 @@ var (
// JobReconciler reconciles a GenericJob object
type JobReconciler struct {
client client.Client
record record.EventRecorder
record events.EventRecorder
manageJobsWithoutQueueName bool
waitForPodsReady bool
labelKeysToCopy []string
Expand Down Expand Up @@ -188,7 +188,7 @@ var defaultOptions = Options{}

func NewReconciler(
client client.Client,
record record.EventRecorder,
record events.EventRecorder,
opts ...Option) *JobReconciler {
options := ProcessOptions(opts...)

Expand Down Expand Up @@ -304,7 +304,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
log.Error(err, "suspending child job failed")
return ctrl.Result{}, err
}
r.record.Event(object, corev1.EventTypeNormal, ReasonSuspended, "Kueue managed child job suspended")
r.record.Eventf(object, nil, corev1.EventTypeNormal, ReasonSuspended, "", "Kueue managed child job suspended")
}
}
return ctrl.Result{}, nil
Expand Down Expand Up @@ -334,7 +334,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}

r.record.Eventf(object, corev1.EventTypeNormal, ReasonFinishedWorkload,
r.record.Eventf(object, nil, corev1.EventTypeNormal, ReasonFinishedWorkload, "",
"Workload '%s' is declared finished", workload.Key(wl))
return ctrl.Result{}, workload.RemoveFinalizer(ctx, r.client, wl)
}
Expand Down Expand Up @@ -364,7 +364,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
if err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
r.record.Eventf(object, corev1.EventTypeNormal, ReasonFinishedWorkload,
r.record.Eventf(object, nil, corev1.EventTypeNormal, ReasonFinishedWorkload, "",
"Workload '%s' is declared finished", workload.Key(wl))
}

Expand Down Expand Up @@ -515,10 +515,10 @@ func (r *JobReconciler) recordAdmissionCheckUpdate(wl *kueue.Workload, job Gener
if message != "" {
if cJob, isComposable := job.(ComposableJob); isComposable {
cJob.ForEach(func(obj runtime.Object) {
r.record.Eventf(obj, corev1.EventTypeNormal, ReasonUpdatedAdmissionCheck, message)
r.record.Eventf(obj, nil, corev1.EventTypeNormal, ReasonUpdatedAdmissionCheck, "", message)
})
} else {
r.record.Eventf(object, corev1.EventTypeNormal, ReasonUpdatedAdmissionCheck, message)
r.record.Eventf(object, nil, corev1.EventTypeNormal, ReasonUpdatedAdmissionCheck, "", message)
}
}
}
Expand Down Expand Up @@ -640,7 +640,7 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
}
if err == nil {
existedWls++
r.record.Eventf(object, corev1.EventTypeNormal, ReasonDeletedWorkload,
r.record.Eventf(object, nil, corev1.EventTypeNormal, ReasonDeletedWorkload, "",
"Deleted not matching Workload: %v", wlKey)
}
}
Expand Down Expand Up @@ -785,7 +785,7 @@ func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job Generi
return nil, fmt.Errorf("updating existed workload: %w", err)
}

r.record.Eventf(object, corev1.EventTypeNormal, ReasonUpdatedWorkload,
r.record.Eventf(object, nil, corev1.EventTypeNormal, ReasonUpdatedWorkload, "",
"Updated not matching Workload for suspended job: %v", klog.KObj(wl))
return newWl, nil
}
Expand All @@ -808,7 +808,7 @@ func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object cli
}); err != nil {
return err
}
r.record.Event(object, corev1.EventTypeNormal, ReasonStarted, msg)
r.record.Eventf(object, nil, corev1.EventTypeNormal, ReasonStarted, "", msg)
}

return nil
Expand All @@ -824,7 +824,7 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, wl *kueue.W
if jws, implements := job.(JobWithCustomStop); implements {
stoppedNow, err := jws.Stop(ctx, r.client, info, stopReason, eventMsg)
if stoppedNow {
r.record.Event(object, corev1.EventTypeNormal, ReasonStopped, eventMsg)
r.record.Eventf(object, nil, corev1.EventTypeNormal, ReasonStopped, "", eventMsg)
}
return err
}
Expand All @@ -838,7 +838,7 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, wl *kueue.W
}
stoppedNow, err := jws.Stop(ctx, r.client, info, reason, eventMsg)
for _, objStoppedNow := range stoppedNow {
r.record.Event(objStoppedNow, corev1.EventTypeNormal, ReasonStopped, eventMsg)
r.record.Eventf(objStoppedNow, nil, corev1.EventTypeNormal, ReasonStopped, "", eventMsg)
}
return err
}
Expand All @@ -857,7 +857,7 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, wl *kueue.W
return err
}

r.record.Event(object, corev1.EventTypeNormal, ReasonStopped, eventMsg)
r.record.Eventf(object, nil, corev1.EventTypeNormal, ReasonStopped, "", eventMsg)
return nil
}

Expand Down Expand Up @@ -1019,7 +1019,7 @@ func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job Generic
if err = r.client.Create(ctx, wl); err != nil {
return err
}
r.record.Eventf(object, corev1.EventTypeNormal, ReasonCreatedWorkload,
r.record.Eventf(object, nil, corev1.EventTypeNormal, ReasonCreatedWorkload, "",
"Created Workload: %v", workload.Key(wl))
return nil
}
Expand Down Expand Up @@ -1068,7 +1068,7 @@ type ReconcilerSetup func(*builder.Builder, client.Client) *builder.Builder
// NewGenericReconcilerFactory creates a new reconciler factory for a concrete GenericJob type.
// newJob should return a new empty job.
func NewGenericReconcilerFactory(newJob func() GenericJob, setup ...ReconcilerSetup) ReconcilerFactory {
return func(client client.Client, record record.EventRecorder, opts ...Option) JobReconcilerInterface {
return func(client client.Client, record events.EventRecorder, opts ...Option) JobReconcilerInterface {
return &genericReconciler{
jr: NewReconciler(client, record, opts...),
newJob: newJob,
Expand Down
Loading

0 comments on commit 006767f

Please sign in to comment.