Skip to content

Commit

Permalink
k8s.io/client-go/tools: support context for event recording
Browse files Browse the repository at this point in the history
Using StartRecordingToSinkWithContext instead of StartRecordingToSink and
StartLogging instead of StartStructuredLogging has several advantages:

- Spawned goroutines no longer get stuck for extended periods of
  time during shutdown when passing in a context that gets canceled.
- Log output can be directed towards a specific logger instead of the global
  default, for example one which writes to a testing.T instance.
- The new methods return an error when something went wrong instead of
  merely recording the error.

That last point is the reason for deprecating the old methods instead of merely
adding new alternatives.

Setting a context when constructing an EventBroadcaster makes calling Shutdown
optional. It can also be used to specify the logger.

Both EventRecorder interfaces in tools/events and tools/record now have a
WithLogger helper. Using that method is optional, but recommended to support
contextual logging properly. Without it, errors that occur while emitting an
event are not associated with the caller.
  • Loading branch information
pohly committed Sep 27, 2023
1 parent f7dacb6 commit 27a68ae
Show file tree
Hide file tree
Showing 12 changed files with 339 additions and 120 deletions.
2 changes: 2 additions & 0 deletions hack/logcheck.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ structured k8s.io/apiserver/pkg/server/options/encryptionconfig/.*
# TODO next: contextual k8s.io/kubernetes/pkg/scheduler/.*
# A few files involved in startup migrated already to contextual
# We can't enable contextual logcheck until all are migrated
contextual k8s.io/client-go/tools/events/.*
contextual k8s.io/client-go/tools/record/.*
contextual k8s.io/dynamic-resource-allocation/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/pkg/controller/.*
Expand Down
101 changes: 60 additions & 41 deletions staging/src/k8s.io/client-go/tools/events/event_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ func (e *eventBroadcasterImpl) Shutdown() {
}

// refreshExistingEventSeries refresh events TTL
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
func (e *eventBroadcasterImpl) refreshExistingEventSeries(ctx context.Context) {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
defer e.mu.Unlock()
for isomorphicKey, event := range e.eventCache {
if event.Series != nil {
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
if recordedEvent, retry := recordEvent(ctx, e.sink, event); !retry {
if recordedEvent != nil {
e.eventCache[isomorphicKey] = recordedEvent
}
Expand All @@ -142,15 +142,15 @@ func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
// finishSeries checks if a series has ended and either:
// - write final count to the apiserver
// - delete a singleton event (i.e. series field is nil) from the cache
func (e *eventBroadcasterImpl) finishSeries() {
func (e *eventBroadcasterImpl) finishSeries(ctx context.Context) {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
defer e.mu.Unlock()
for isomorphicKey, event := range e.eventCache {
eventSerie := event.Series
if eventSerie != nil {
if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
if _, retry := recordEvent(e.sink, event); !retry {
if _, retry := recordEvent(ctx, e.sink, event); !retry {
delete(e.eventCache, isomorphicKey)
}
}
Expand All @@ -161,13 +161,13 @@ func (e *eventBroadcasterImpl) finishSeries() {
}

// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorderLogger {
hostname, _ := os.Hostname()
reportingInstance := reportingController + "-" + hostname
return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
}

func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.Clock) {
func (e *eventBroadcasterImpl) recordToSink(ctx context.Context, event *eventsv1.Event, clock clock.Clock) {
// Make a copy before modification, because there could be multiple listeners.
eventCopy := event.DeepCopy()
go func() {
Expand Down Expand Up @@ -197,39 +197,44 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C
}()
if evToRecord != nil {
// TODO: Add a metric counting the number of recording attempts
e.attemptRecording(evToRecord)
e.attemptRecording(ctx, evToRecord)
// We don't want the new recorded Event to be reflected in the
// client's cache because server-side mutations could mess with the
// aggregation mechanism used by the client.
}
}()
}

func (e *eventBroadcasterImpl) attemptRecording(event *eventsv1.Event) *eventsv1.Event {
func (e *eventBroadcasterImpl) attemptRecording(ctx context.Context, event *eventsv1.Event) {
tries := 0
for {
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
return recordedEvent
if _, retry := recordEvent(ctx, e.sink, event); !retry {
return
}
tries++
if tries >= maxTriesPerEvent {
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
return nil
klog.FromContext(ctx).Error(nil, "Unable to write event (retry limit exceeded!)", "event", event)
return
}
// Randomize sleep so that various clients won't all be
// synced up if the master goes down.
time.Sleep(wait.Jitter(e.sleepDuration, 0.25))
// synced up if the master goes down. Give up when
// the context is canceled.
select {
case <-ctx.Done():
return
case <-time.After(wait.Jitter(e.sleepDuration, 0.25)):
}
}
}

func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) {
func recordEvent(ctx context.Context, sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) {
var newEvent *eventsv1.Event
var err error
isEventSeries := event.Series != nil
if isEventSeries {
patch, patchBytesErr := createPatchBytesForSeries(event)
if patchBytesErr != nil {
klog.Errorf("Unable to calculate diff, no merge is possible: %v", patchBytesErr)
klog.FromContext(ctx).Error(patchBytesErr, "Unable to calculate diff, no merge is possible")
return nil, false
}
newEvent, err = sink.Patch(event, patch)
Expand All @@ -248,7 +253,7 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
switch err.(type) {
case *restclient.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).Error(err, "Unable to construct event (will not retry!)", "event", event)
return nil, false
case *errors.StatusError:
if errors.IsAlreadyExists(err) {
Expand All @@ -260,9 +265,9 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
if isEventSeries {
return nil, true
}
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).V(5).Info("Server rejected event (will not retry!)", "event", event, "err", err)
} else {
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).Error(err, "Server rejected event (will not retry!)", "event", event)
}
return nil, false
case *errors.UnexpectedObjectError:
Expand All @@ -271,7 +276,7 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
default:
// This case includes actual http transport errors. Go ahead and retry.
}
klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
klog.FromContext(ctx).Error(err, "Unable to write event (may retry after sleeping)")
return nil, true
}

Expand Down Expand Up @@ -307,29 +312,38 @@ func getKey(event *eventsv1.Event) eventKey {
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
// The return value can be ignored or used to stop recording, if desired.
// TODO: this function should also return an error.
//
// Deprecated: use StartLogging instead.
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() {
stopWatcher, err := e.StartEventWatcher(
logger := klog.Background().V(int(verbosity))
stopWatcher, err := e.StartLogging(logger)
if err != nil {
logger.Error(err, "Failed to start event watcher")
return func() {}
}
return stopWatcher
}

// StartLogging starts sending events received from this EventBroadcaster to the structured logger.
// To adjust verbosity, use the logger's V method (i.e. pass `logger.V(3)` instead of `logger`).
// The returned function can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartLogging(logger klog.Logger) (func(), error) {
return e.StartEventWatcher(
func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event)
if !ok {
klog.Errorf("unexpected type, expected eventsv1.Event")
logger.Error(nil, "unexpected type, expected eventsv1.Event")
return
}
klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
logger.Info("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
})
if err != nil {
klog.Errorf("failed to start event watcher: '%v'", err)
return func() {}
}
return stopWatcher
}

// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) (func(), error) {
watcher, err := e.Watch()
if err != nil {
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
return nil, err
}
go func() {
Expand All @@ -345,37 +359,42 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime
return watcher.Stop, nil
}

func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) error {
func (e *eventBroadcasterImpl) startRecordingEvents(ctx context.Context) error {
eventHandler := func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event)
if !ok {
klog.Errorf("unexpected type, expected eventsv1.Event")
klog.FromContext(ctx).Error(nil, "unexpected type, expected eventsv1.Event")
return
}
e.recordToSink(event, clock.RealClock{})
e.recordToSink(ctx, event, clock.RealClock{})
}
stopWatcher, err := e.StartEventWatcher(eventHandler)
if err != nil {
return err
}
go func() {
<-stopCh
<-ctx.Done()
stopWatcher()
}()
return nil
}

// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
// Deprecated: use StartRecordingToSinkWithContext instead.
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
go wait.Until(e.finishSeries, finishTime, stopCh)
err := e.startRecordingEvents(stopCh)
err := e.StartRecordingToSinkWithContext(wait.ContextForChannel(stopCh))
if err != nil {
klog.Errorf("unexpected type, expected eventsv1.Event")
return
klog.Background().Error(err, "Failed to start recording to sink")
}
}

// StartRecordingToSinkWithContext starts sending events received from the specified eventBroadcaster to the given sink.
func (e *eventBroadcasterImpl) StartRecordingToSinkWithContext(ctx context.Context) error {
go wait.UntilWithContext(ctx, e.refreshExistingEventSeries, refreshTime)
go wait.UntilWithContext(ctx, e.finishSeries, finishTime)
return e.startRecordingEvents(ctx)
}

type eventBroadcasterAdapterImpl struct {
coreClient typedv1core.EventsGetter
coreBroadcaster record.EventBroadcaster
Expand Down Expand Up @@ -409,14 +428,14 @@ func (e *eventBroadcasterAdapterImpl) StartRecordingToSink(stopCh <-chan struct{
}
}

func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorder {
func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorderLogger {
if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil {
return e.eventsv1Broadcaster.NewRecorder(scheme.Scheme, name)
}
return record.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name))
}

func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorder {
func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorderLogger {
return e.coreBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: name})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
)

func TestRecordEventToSink(t *testing.T) {
Expand Down Expand Up @@ -78,11 +79,12 @@ func TestRecordEventToSink(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
kubeClient := fake.NewSimpleClientset()
eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()}

for _, ev := range tc.eventsToRecord {
recordEvent(eventSink, &ev)
recordEvent(ctx, eventSink, &ev)
}

recordedEvents, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
Expand Down
27 changes: 24 additions & 3 deletions staging/src/k8s.io/client-go/tools/events/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,45 @@ type recorderImpl struct {
clock clock.Clock
}

var _ EventRecorder = &recorderImpl{}

func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
recorder.eventf(klog.Background(), regarding, related, eventtype, reason, action, note, args...)
}

type recorderImplLogger struct {
*recorderImpl
logger klog.Logger
}

var _ EventRecorderLogger = &recorderImplLogger{}

func (recorder *recorderImplLogger) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
recorder.eventf(recorder.logger, regarding, related, eventtype, reason, action, note, args...)
}

func (recorder *recorderImplLogger) WithLogger(logger klog.Logger) EventRecorderLogger {
return &recorderImplLogger{recorderImpl: recorder.recorderImpl, logger: logger}
}

func (recorder *recorderImpl) eventf(logger klog.Logger, regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
timestamp := metav1.MicroTime{Time: time.Now()}
message := fmt.Sprintf(note, args...)
refRegarding, err := reference.GetReference(recorder.scheme, regarding)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
logger.Error(err, "Could not construct reference, will not report event", "object", regarding, "eventType", eventtype, "reason", reason, "message", message)
return
}

var refRelated *v1.ObjectReference
if related != nil {
refRelated, err = reference.GetReference(recorder.scheme, related)
if err != nil {
klog.V(9).Infof("Could not construct reference to: '%#v' due to: '%v'.", related, err)
logger.V(9).Info("Could not construct reference", "object", related, "err", err)
}
}
if !util.ValidateEventType(eventtype) {
klog.Errorf("Unsupported event type: '%v'", eventtype)
logger.Error(nil, "Unsupported event type", "eventType", eventtype)
return
}
event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)
Expand Down
Loading

0 comments on commit 27a68ae

Please sign in to comment.