Skip to content

Commit dcdd111

Browse files
committed
Event filtering now records resource action and previous resource (#3127)
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent ac347cc commit dcdd111

File tree

17 files changed

+546
-141
lines changed

17 files changed

+546
-141
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
3838
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
3939
import io.javaoperatorsdk.operator.processing.event.source.Cache;
40-
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
40+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
4141
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
4242
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
4343
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@
3737
import io.javaoperatorsdk.operator.processing.LifecycleAware;
3838
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
3939
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
40+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
4041
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
4142
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource;
42-
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
4343
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
4444
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
4545

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceAction.java renamed to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.javaoperatorsdk.operator.processing.event.source.controller;
16+
package io.javaoperatorsdk.operator.processing.event.source;
1717

1818
public enum ResourceAction {
1919
ADDED,

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.javaoperatorsdk.operator.processing.Controller;
2929
import io.javaoperatorsdk.operator.processing.MDCUtils;
3030
import io.javaoperatorsdk.operator.processing.event.ResourceID;
31+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
3132
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
3233
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
3334
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
@@ -83,26 +84,21 @@ public synchronized void start() {
8384
}
8485

8586
@Override
86-
public synchronized void handleEvent(
87-
ResourceAction action,
88-
T resource,
89-
T oldResource,
90-
Boolean deletedFinalStateUnknown,
91-
boolean filterEvent) {
87+
protected synchronized void handleEvent(
88+
ResourceAction action, T resource, T oldResource, Boolean deletedFinalStateUnknown) {
9289
try {
9390
if (log.isDebugEnabled()) {
9491
log.debug(
95-
"Event received for resource: {} version: {} uuid: {} action: {} filter event: {}",
92+
"Event received for resource: {} version: {} uuid: {} action: {}",
9693
ResourceID.fromResource(resource),
9794
getVersion(resource),
9895
resource.getMetadata().getUid(),
99-
action,
100-
filterEvent);
96+
action);
10197
log.trace("Event Old resource: {},\n new resource: {}", oldResource, resource);
10298
}
10399
MDCUtils.addResourceInfo(resource);
104100
controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource);
105-
if (isAcceptedByFilters(action, resource, oldResource) && !filterEvent) {
101+
if (isAcceptedByFilters(action, resource, oldResource)) {
106102
if (deletedFinalStateUnknown != null) {
107103
getEventHandler()
108104
.handleEvent(
@@ -138,28 +134,36 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso
138134
}
139135

140136
@Override
141-
public void onAdd(T resource) {
142-
var handling = temporaryResourceCache.onAddOrUpdateEvent(resource);
143-
handleEvent(ResourceAction.ADDED, resource, null, null, handling != EventHandling.NEW);
137+
public synchronized void onAdd(T resource) {
138+
handleOnAddOrUpdate(ResourceAction.ADDED, null, resource);
144139
}
145140

146141
@Override
147-
public void onUpdate(T oldCustomResource, T newCustomResource) {
148-
var handling = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource);
149-
handleEvent(
150-
ResourceAction.UPDATED,
151-
newCustomResource,
152-
oldCustomResource,
153-
null,
154-
handling != EventHandling.NEW);
142+
public synchronized void onUpdate(T oldCustomResource, T newCustomResource) {
143+
handleOnAddOrUpdate(ResourceAction.UPDATED, oldCustomResource, newCustomResource);
144+
}
145+
146+
private void handleOnAddOrUpdate(
147+
ResourceAction action, T oldCustomResource, T newCustomResource) {
148+
var handling =
149+
temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource);
150+
if (handling == EventHandling.NEW) {
151+
handleEvent(action, newCustomResource, oldCustomResource, null);
152+
} else if (log.isDebugEnabled()) {
153+
log.debug(
154+
"{} event propagation for action: {} resource id: {} ",
155+
handling,
156+
action,
157+
ResourceID.fromResource(newCustomResource));
158+
}
155159
}
156160

157161
@Override
158-
public void onDelete(T resource, boolean deletedFinalStateUnknown) {
162+
public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown) {
159163
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
160164
// delete event is quite special here, that requires special care, since we clean up caches on
161165
// delete event.
162-
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown, false);
166+
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown);
163167
}
164168

165169
@Override

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceDeleteEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import io.fabric8.kubernetes.api.model.HasMetadata;
2121
import io.javaoperatorsdk.operator.processing.event.ResourceID;
22+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
2223

2324
/**
2425
* Extends ResourceEvent for informer Delete events, it holds also information if the final state is

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.fabric8.kubernetes.api.model.HasMetadata;
2222
import io.javaoperatorsdk.operator.processing.event.Event;
2323
import io.javaoperatorsdk.operator.processing.event.ResourceID;
24+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
2425

2526
public class ResourceEvent extends Event {
2627

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.processing.event.source.informer;
17+
18+
import java.util.Optional;
19+
20+
import io.fabric8.kubernetes.api.model.HasMetadata;
21+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
22+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
23+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
24+
25+
/** Used only for resource event filtering. */
26+
public class ExtendedResourceEvent extends ResourceEvent {
27+
28+
private HasMetadata previousResource;
29+
30+
public ExtendedResourceEvent(
31+
ResourceAction action,
32+
ResourceID resourceID,
33+
HasMetadata latestResource,
34+
HasMetadata previousResource) {
35+
super(action, resourceID, latestResource);
36+
this.previousResource = previousResource;
37+
}
38+
39+
public Optional<HasMetadata> getPreviousResource() {
40+
return Optional.ofNullable(previousResource);
41+
}
42+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import io.javaoperatorsdk.operator.processing.event.EventHandler;
3333
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3434
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
35-
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
35+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
3636
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
3737

3838
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
@@ -107,7 +107,7 @@ public void onAdd(R newResource) {
107107
resourceType().getSimpleName(),
108108
newResource.getMetadata().getResourceVersion());
109109
}
110-
onAddOrUpdate(Operation.ADD, newResource, null);
110+
onAddOrUpdate(ResourceAction.ADDED, newResource, null);
111111
}
112112

113113
@Override
@@ -120,7 +120,7 @@ public void onUpdate(R oldObject, R newObject) {
120120
newObject.getMetadata().getResourceVersion(),
121121
oldObject.getMetadata().getResourceVersion());
122122
}
123-
onAddOrUpdate(Operation.UPDATE, newObject, oldObject);
123+
onAddOrUpdate(ResourceAction.UPDATED, newObject, oldObject);
124124
}
125125

126126
@Override
@@ -139,12 +139,8 @@ public synchronized void onDelete(R resource, boolean b) {
139139
}
140140

141141
@Override
142-
public void handleEvent(
143-
ResourceAction action,
144-
R resource,
145-
R oldResource,
146-
Boolean deletedFinalStateUnknown,
147-
boolean filterEvent) {
142+
protected void handleEvent(
143+
ResourceAction action, R resource, R oldResource, Boolean deletedFinalStateUnknown) {
148144
propagateEvent(resource);
149145
}
150146

@@ -156,27 +152,27 @@ public synchronized void start() {
156152
manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate);
157153
}
158154

159-
private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldObject) {
155+
private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R oldObject) {
160156
primaryToSecondaryIndex.onAddOrUpdate(newObject);
161157
var resourceID = ResourceID.fromResource(newObject);
162158

163-
var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(newObject);
159+
var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);
164160

165161
if (eventHandling != EventHandling.NEW) {
166162
log.debug(
167163
"{} event propagation for {}. Resource ID: {}",
168164
eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping",
169-
operation,
165+
action,
170166
ResourceID.fromResource(newObject));
171-
} else if (eventAcceptedByFilter(operation, newObject, oldObject)) {
167+
} else if (eventAcceptedByFilter(action, newObject, oldObject)) {
172168
log.debug(
173169
"Propagating event for {}, resource with same version not result of a reconciliation."
174170
+ " Resource ID: {}",
175-
operation,
171+
action,
176172
resourceID);
177173
propagateEvent(newObject);
178174
} else {
179-
log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID);
175+
log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID);
180176
}
181177
}
182178

@@ -251,11 +247,11 @@ public boolean allowsNamespaceChanges() {
251247
return configuration().followControllerNamespaceChanges();
252248
}
253249

254-
private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) {
250+
private boolean eventAcceptedByFilter(ResourceAction action, R newObject, R oldObject) {
255251
if (genericFilter != null && !genericFilter.accept(newObject)) {
256252
return false;
257253
}
258-
if (operation == Operation.ADD) {
254+
if (action == ResourceAction.ADDED) {
259255
return onAddFilter == null || onAddFilter.accept(newObject);
260256
} else {
261257
return onUpdateFilter == null || onUpdateFilter.accept(newObject, oldObject);
@@ -266,9 +262,4 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) {
266262
return (onDeleteFilter == null || onDeleteFilter.accept(resource, b))
267263
&& (genericFilter == null || genericFilter.accept(resource));
268264
}
269-
270-
private enum Operation {
271-
ADD,
272-
UPDATE
273-
}
274265
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import io.javaoperatorsdk.operator.health.Status;
4343
import io.javaoperatorsdk.operator.processing.event.ResourceID;
4444
import io.javaoperatorsdk.operator.processing.event.source.*;
45-
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
45+
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
4646
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
4747

4848
@SuppressWarnings("rawtypes")
@@ -90,6 +90,7 @@ public void changeNamespaces(Set<String> namespaces) {
9090
* Also makes sure that the even produced by this update is filtered, thus does not trigger the
9191
* reconciliation.
9292
*/
93+
@SuppressWarnings("unchecked")
9394
public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<R> updateMethod) {
9495
ResourceID id = ResourceID.fromResource(resourceToUpdate);
9596
if (log.isDebugEnabled()) {
@@ -107,32 +108,38 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
107108
id,
108109
updatedResource == null ? null : updatedResource.getMetadata().getResourceVersion());
109110
var updatedForLambda = updatedResource;
110-
res.ifPresent(
111+
res.ifPresentOrElse(
111112
r -> {
112113
R latestResource = (R) r.getResource().orElseThrow();
113-
// for update we need to have a historic resource, this might be improved to mimic more
114-
// realistic scenario
114+
115+
// as previous resource version we use the one from successful update, since
116+
// we process new event here only if that is more recent then the event from our update.
117+
// Note that this is equivalent with the scenario when an informer watch connection
118+
// would
119+
// reconnect and loose some events in between.
120+
// If that update was not successful we still record the previous version from the
121+
// actual
122+
// event in the ExtendedResourceEvent.
123+
R extendedResourcePrevVersion =
124+
(r instanceof ExtendedResourceEvent)
125+
? (R) ((ExtendedResourceEvent) r).getPreviousResource().orElse(null)
126+
: null;
115127
R prevVersionOfResource =
116-
updatedForLambda != null
117-
? updatedForLambda
118-
: (r.getAction() == ResourceAction.UPDATED ? latestResource : null);
128+
updatedForLambda != null ? updatedForLambda : extendedResourcePrevVersion;
119129
handleEvent(
120130
r.getAction(),
121131
latestResource,
122132
prevVersionOfResource,
123-
!(r instanceof ResourceDeleteEvent)
124-
|| ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown(),
125-
false);
126-
});
133+
(r instanceof ResourceDeleteEvent)
134+
? ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown()
135+
: null);
136+
},
137+
() -> log.debug("No new event present after the filtering update; id: {}", id));
127138
}
128139
}
129140

130-
public abstract void handleEvent(
131-
ResourceAction action,
132-
R resource,
133-
R oldResource,
134-
Boolean deletedFinalStateUnknown,
135-
boolean filterEvent);
141+
protected abstract void handleEvent(
142+
ResourceAction action, R resource, R oldResource, Boolean deletedFinalStateUnknown);
136143

137144
@SuppressWarnings("unchecked")
138145
@Override

0 commit comments

Comments
 (0)