Skip to content

Commit 134dd05

Browse files
committed
event sources improvements, simplified APIs
1 parent 1288657 commit 134dd05

15 files changed

+95
-222
lines changed

operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import io.javaoperatorsdk.operator.api.ResourceController;
1313
import io.javaoperatorsdk.operator.processing.EventDispatcher;
1414
import io.javaoperatorsdk.operator.processing.DefaultEventHandler;
15-
import io.javaoperatorsdk.operator.processing.ResourceCache;
15+
import io.javaoperatorsdk.operator.processing.CustomResourceCache;
1616
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
1717
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
1818
import org.slf4j.Logger;
@@ -57,36 +57,35 @@ private <R extends CustomResource> void registerController(ResourceController<R>
5757
finalizer, new EventDispatcher.CustomResourceFacade(client), ControllerUtils.getGenerationEventProcessing(controller));
5858

5959

60-
ResourceCache resourceCache = new ResourceCache();
61-
DefaultEventHandler defaultEventHandler = new DefaultEventHandler(resourceCache, eventDispatcher, controller.getClass().getName());
60+
CustomResourceCache customResourceCache = new CustomResourceCache();
61+
DefaultEventHandler defaultEventHandler = new DefaultEventHandler(customResourceCache, eventDispatcher, controller.getClass().getName());
6262
DefaultEventSourceManager eventSourceManager = new DefaultEventSourceManager(defaultEventHandler);
6363
defaultEventHandler.setDefaultEventSourceManager(eventSourceManager);
6464
eventDispatcher.setEventSourceManager(eventSourceManager);
6565

6666
customResourceClients.put(resClass, (CustomResourceOperationsImpl) client);
6767

6868
CustomResourceEventSource customResourceEventSource
69-
= createCustomResourceEventSource(client, resourceCache, watchAllNamespaces, targetNamespaces,
70-
defaultEventHandler, eventSourceManager);
69+
= createCustomResourceEventSource(client, customResourceCache, watchAllNamespaces, targetNamespaces,
70+
defaultEventHandler);
7171

7272
eventSourceManager.registerCustomResourceEventSource(customResourceEventSource);
73+
controller.init(eventSourceManager);
7374

7475
log.info("Registered Controller: '{}' for CRD: '{}' for namespaces: {}", controller.getClass().getSimpleName(),
7576
resClass, targetNamespaces.length == 0 ? "[all/client namespace]" : Arrays.toString(targetNamespaces));
7677
}
7778

7879
private CustomResourceEventSource createCustomResourceEventSource(MixedOperation client,
79-
ResourceCache resourceCache,
80+
CustomResourceCache customResourceCache,
8081
boolean watchAllNamespaces,
8182
String[] targetNamespaces,
82-
DefaultEventHandler defaultEventHandler,
83-
DefaultEventSourceManager eventSourceManager) {
83+
DefaultEventHandler defaultEventHandler) {
8484
CustomResourceEventSource customResourceEventSource = watchAllNamespaces ?
85-
CustomResourceEventSource.customResourceEventSourceForAllNamespaces(resourceCache, client) :
86-
CustomResourceEventSource.customResourceEventSourceForTargetNamespaces(resourceCache, client, targetNamespaces);
85+
CustomResourceEventSource.customResourceEventSourceForAllNamespaces(customResourceCache, client) :
86+
CustomResourceEventSource.customResourceEventSourceForTargetNamespaces(customResourceCache, client, targetNamespaces);
8787

8888
customResourceEventSource.setEventHandler(defaultEventHandler);
89-
customResourceEventSource.setEventSourceManager(eventSourceManager);
9089

9190
return customResourceEventSource;
9291
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/api/ResourceController.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.javaoperatorsdk.operator.api;
22

33
import io.fabric8.kubernetes.client.CustomResource;
4+
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
45

56
public interface ResourceController<R extends CustomResource> {
67

@@ -27,4 +28,10 @@ public interface ResourceController<R extends CustomResource> {
2728
* <b>However we will always call an update if there is no finalizer on object and its not marked for deletion.</b>
2829
*/
2930
UpdateControl<R> createOrUpdateResource(R resource, Context<R> context);
31+
32+
/**
33+
* In init typically you might want to register event sources.
34+
* @param eventSourceManager
35+
*/
36+
default void init(EventSourceManager eventSourceManager) {}
3037
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ResourceCache.java renamed to operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/CustomResourceCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import java.util.Optional;
77
import java.util.concurrent.ConcurrentHashMap;
88

9-
public class ResourceCache {
9+
public class CustomResourceCache {
1010

1111
private final Map<String, CustomResource> resources = new ConcurrentHashMap<>();
1212

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package io.javaoperatorsdk.operator.processing;
22

33

4+
import io.fabric8.kubernetes.client.CustomResource;
45
import io.javaoperatorsdk.operator.processing.event.*;
56
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
78

89
import java.time.LocalDateTime;
910
import java.util.HashSet;
11+
import java.util.Optional;
1012
import java.util.Set;
1113
import java.util.concurrent.ScheduledThreadPoolExecutor;
1214
import java.util.concurrent.ThreadFactory;
@@ -23,7 +25,7 @@ public class DefaultEventHandler implements EventHandler {
2325

2426
private final static Logger log = LoggerFactory.getLogger(DefaultEventHandler.class);
2527

26-
private final ResourceCache resourceCache;
28+
private final CustomResourceCache customResourceCache;
2729
private final EventBuffer eventBuffer;
2830
private final Set<String> underProcessing = new HashSet<>();
2931
private final ScheduledThreadPoolExecutor executor;
@@ -32,8 +34,8 @@ public class DefaultEventHandler implements EventHandler {
3234

3335
private final ReentrantLock lock = new ReentrantLock();
3436

35-
public DefaultEventHandler(ResourceCache resourceCache, EventDispatcher eventDispatcher, String relatedControllerName) {
36-
this.resourceCache = resourceCache;
37+
public DefaultEventHandler(CustomResourceCache customResourceCache, EventDispatcher eventDispatcher, String relatedControllerName) {
38+
this.customResourceCache = customResourceCache;
3739
this.eventDispatcher = eventDispatcher;
3840
eventBuffer = new EventBuffer();
3941
executor = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
@@ -63,16 +65,19 @@ public void handleEvent(Event<? extends EventSource> event) {
6365
private void executeBufferedEvents(String customResourceUid) {
6466
boolean newEventForResourceId = eventBuffer.containsEvents(customResourceUid);
6567
boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid);
66-
if (!controllerUnderExecution && newEventForResourceId) {
68+
Optional<CustomResource> latestCustomResource = customResourceCache.getLatestResource(customResourceUid);
69+
70+
if (!controllerUnderExecution && newEventForResourceId && latestCustomResource.isPresent()) {
6771
setUnderExecutionProcessing(customResourceUid);
6872
ExecutionScope executionScope = new ExecutionScope(
6973
eventBuffer.getAndRemoveEventsForExecution(customResourceUid),
70-
resourceCache.getLatestResource(customResourceUid).get());
74+
latestCustomResource.get());
7175
log.debug("Executing events for custom resource. Scope: {}", executionScope);
7276
executor.execute(new ExecutionConsumer(executionScope, eventDispatcher, this));
7377
} else {
74-
log.debug("Skipping executing controller for resource id: {}. Events in queue: {}. Controller in execution: {}"
75-
, customResourceUid, newEventForResourceId, controllerUnderExecution);
78+
log.debug("Skipping executing controller for resource id: {}. Events in queue: {}." +
79+
" Controller in execution: {}. Latest CustomResource present: {}"
80+
, customResourceUid, newEventForResourceId, controllerUnderExecution, latestCustomResource.isPresent());
7681
}
7782
}
7883

@@ -96,7 +101,7 @@ void eventProcessingFinished(ExecutionScope executionScope, PostExecutionControl
96101
private void cleanupAfterDeletedEvent(String customResourceUid) {
97102
defaultEventSourceManager.cleanup(customResourceUid);
98103
eventBuffer.cleanup(customResourceUid);
99-
resourceCache.cleanup(customResourceUid);
104+
customResourceCache.cleanup(customResourceUid);
100105
}
101106

102107
private boolean isControllerUnderExecution(String customResourceUid) {

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/AbstractEventSource.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,14 @@
55
public abstract class AbstractEventSource implements EventSource {
66

77
protected EventHandler eventHandler;
8-
protected EventSourceManager eventSourceManager;
98

109
@Override
1110
public void setEventHandler(EventHandler eventHandler) {
1211
this.eventHandler = eventHandler;
1312
}
1413

15-
@Override
16-
public void setEventSourceManager(EventSourceManager eventSourceManager) {
17-
this.eventSourceManager = eventSourceManager;
18-
}
19-
20-
@Override
21-
public void eventSourceRegisteredForResource(CustomResource customResource) {
22-
}
23-
2414
@Override
2515
public void eventSourceDeRegisteredForResource(String customResourceUid) {
2616
}
2717

28-
@Override
29-
public void controllerExecuted(ExecutionDescriptor executionDescriptor) {
30-
}
31-
3218
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

Lines changed: 14 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3-
import io.fabric8.kubernetes.client.CustomResource;
43
import io.javaoperatorsdk.operator.processing.DefaultEventHandler;
5-
import io.javaoperatorsdk.operator.processing.KubernetesResourceUtils;
64
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
75
import org.slf4j.Logger;
86
import org.slf4j.LoggerFactory;
@@ -17,7 +15,7 @@ public class DefaultEventSourceManager implements EventSourceManager {
1715
private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class);
1816

1917
private final ReentrantLock lock = new ReentrantLock();
20-
private Map<String, Map<String, EventSource>> eventSources = new ConcurrentHashMap<>();
18+
private Map<String, EventSource> eventSources = new ConcurrentHashMap<>();
2119
private CustomResourceEventSource customResourceEventSource;
2220
private DefaultEventHandler defaultEventHandler;
2321

@@ -31,52 +29,29 @@ public void registerCustomResourceEventSource(CustomResourceEventSource customRe
3129
}
3230

3331
@Override
34-
public <T extends EventSource> void registerEventSource(CustomResource customResource, String name, T eventSource) {
32+
public <T extends EventSource> void registerEventSource(String name, T eventSource) {
3533
try {
3634
lock.lock();
37-
Map<String, EventSource> eventSourceList = eventSources.get(KubernetesResourceUtils.getUID(customResource));
38-
if (eventSourceList == null) {
39-
eventSourceList = new HashMap<>(1);
40-
eventSources.put(KubernetesResourceUtils.getUID(customResource), eventSourceList);
35+
EventSource currentEventSource = eventSources.get(name);
36+
if (currentEventSource != null) {
37+
throw new IllegalStateException("Event source with name already registered. Event source name: " + name);
4138
}
42-
if (eventSourceList.get(name) != null) {
43-
throw new IllegalStateException("Event source with name already registered. Resource id: "
44-
+ KubernetesResourceUtils.getUID(customResource) + ", event source name: " + name);
45-
}
46-
eventSourceList.put(name, eventSource);
39+
eventSources.put(name,eventSource);
4740
eventSource.setEventHandler(defaultEventHandler);
48-
eventSource.eventSourceRegisteredForResource(customResource);
49-
} finally {
50-
lock.unlock();
51-
}
52-
}
53-
54-
@Override
55-
public <T extends EventSource> T registerEventSourceIfNotRegistered(CustomResource customResource, String name, Supplier<T> eventSourceSupplier) {
56-
try {
57-
lock.lock();
58-
if (eventSources.get(KubernetesResourceUtils.getUID(customResource)) == null ||
59-
eventSources.get(KubernetesResourceUtils.getUID(customResource)).get(name) == null) {
60-
EventSource eventSource = eventSourceSupplier.get();
61-
registerEventSource(customResource, name, eventSource);
62-
return (T) eventSource;
63-
}
64-
return (T) eventSources.get(KubernetesResourceUtils.getUID(customResource)).get(name);
6541
} finally {
6642
lock.unlock();
6743
}
6844
}
6945

7046
@Override
71-
public Optional<EventSource> deRegisterEventSource(String customResourceUid, String name) {
47+
public Optional<EventSource> deRegisterCustomResourceFromEventSource(String eventSourceName,String customResourceUid) {
7248
try {
7349
lock.lock();
74-
Map<String, EventSource> eventSources = this.eventSources.get(customResourceUid);
75-
if (eventSources == null || !eventSources.containsKey(name)) {
76-
log.warn("Event producer: {} not found for custom resource: {}", name, customResourceUid);
50+
EventSource eventSource = this.eventSources.get(eventSourceName);
51+
if (eventSource == null) {
52+
log.warn("Event producer: {} not found for custom resource: {}", eventSourceName, customResourceUid);
7753
return Optional.empty();
7854
} else {
79-
EventSource eventSource = eventSources.remove(name);
8055
eventSource.eventSourceDeRegisteredForResource(customResourceUid);
8156
return Optional.of(eventSource);
8257
}
@@ -86,19 +61,17 @@ public Optional<EventSource> deRegisterEventSource(String customResourceUid, Str
8661
}
8762

8863
@Override
89-
public Map<String, EventSource> getRegisteredEventSources(String customResourceUid) {
90-
Map<String, EventSource> eventSourceMap = eventSources.get(customResourceUid);
91-
return eventSourceMap != null ? eventSourceMap : Collections.EMPTY_MAP;
64+
public Map<String, EventSource> getRegisteredEventSources() {
65+
return Collections.unmodifiableMap(eventSources);
9266
}
9367

9468
public void controllerExecuted(ExecutionDescriptor executionDescriptor) {
9569
String uid = executionDescriptor.getExecutionScope().getCustomResourceUid();
96-
Map<String, EventSource> sources = getRegisteredEventSources(uid);
97-
sources.values().forEach(es -> es.controllerExecuted(executionDescriptor));
70+
Map<String, EventSource> sources = getRegisteredEventSources();
9871
}
9972

10073
public void cleanup(String customResourceUid) {
101-
getRegisteredEventSources(customResourceUid).keySet().forEach(k -> deRegisterEventSource(customResourceUid, k));
74+
getRegisteredEventSources().keySet().forEach(k -> deRegisterCustomResourceFromEventSource(k,customResourceUid));
10275
eventSources.remove(customResourceUid);
10376
}
10477

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,9 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3-
import io.fabric8.kubernetes.client.CustomResource;
4-
53
public interface EventSource {
64

75
void setEventHandler(EventHandler eventHandler);
86

9-
void setEventSourceManager(EventSourceManager eventSourceManager);
10-
11-
void eventSourceRegisteredForResource(CustomResource customResource);
12-
137
void eventSourceDeRegisteredForResource(String customResourceUid);
148

15-
void controllerExecuted(ExecutionDescriptor executionDescriptor);
169
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,10 @@
99

1010
public interface EventSourceManager {
1111

12-
<T extends EventSource> void registerEventSource(CustomResource customResource, String name, T eventSource);
12+
<T extends EventSource> void registerEventSource(String name, T eventSource);
1313

14-
<T extends EventSource> T registerEventSourceIfNotRegistered(CustomResource customResource, String name, Supplier<T> eventSource);
15-
16-
Optional<EventSource> deRegisterEventSource(String customResourceUid, String name);
17-
18-
Map<String, EventSource> getRegisteredEventSources(String customResourceUid);
14+
Optional<EventSource> deRegisterCustomResourceFromEventSource(String name, String customResourceUid);
1915

16+
Map<String, EventSource> getRegisteredEventSources();
2017

2118
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import io.fabric8.kubernetes.client.Watcher;
66
import io.fabric8.kubernetes.client.dsl.MixedOperation;
77
import io.fabric8.kubernetes.client.dsl.internal.CustomResourceOperationsImpl;
8-
import io.javaoperatorsdk.operator.processing.ResourceCache;
8+
import io.javaoperatorsdk.operator.processing.CustomResourceCache;
99
import io.javaoperatorsdk.operator.processing.event.AbstractEventSource;
1010
import org.slf4j.Logger;
1111
import org.slf4j.LoggerFactory;
@@ -20,23 +20,23 @@ public class CustomResourceEventSource extends AbstractEventSource implements Wa
2020

2121
private final static Logger log = LoggerFactory.getLogger(CustomResourceEventSource.class);
2222

23-
private final ResourceCache resourceCache;
23+
private final CustomResourceCache resourceCache;
2424
private MixedOperation client;
2525
private final String[] targetNamespaces;
2626

27-
public static CustomResourceEventSource customResourceEventSourceForAllNamespaces(ResourceCache resourceCache,
27+
public static CustomResourceEventSource customResourceEventSourceForAllNamespaces(CustomResourceCache customResourceCache,
2828
MixedOperation client) {
29-
return new CustomResourceEventSource(resourceCache, client, null);
29+
return new CustomResourceEventSource(customResourceCache, client, null);
3030
}
3131

32-
public static CustomResourceEventSource customResourceEventSourceForTargetNamespaces(ResourceCache resourceCache,
32+
public static CustomResourceEventSource customResourceEventSourceForTargetNamespaces(CustomResourceCache customResourceCache,
3333
MixedOperation client,
3434
String[] namespaces) {
35-
return new CustomResourceEventSource(resourceCache, client, namespaces);
35+
return new CustomResourceEventSource(customResourceCache, client, namespaces);
3636
}
3737

38-
private CustomResourceEventSource(ResourceCache resourceCache, MixedOperation client, String[] targetNamespaces) {
39-
this.resourceCache = resourceCache;
38+
private CustomResourceEventSource(CustomResourceCache customResourceCache, MixedOperation client, String[] targetNamespaces) {
39+
this.resourceCache = customResourceCache;
4040
this.client = client;
4141
this.targetNamespaces = targetNamespaces;
4242
}

0 commit comments

Comments
 (0)