Skip to content

Commit 4879c5c

Browse files
committed
remove of NamedEventSource
Signed-off-by: Attila Mészáros <csviri@gmail.com>
1 parent 614117d commit 4879c5c

File tree

7 files changed

+132
-263
lines changed

7 files changed

+132
-263
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.stream.Collectors;
55

66
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
7+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
78

89
@SuppressWarnings("rawtypes")
910
public class ControllerHealthInfo {
@@ -15,21 +16,21 @@ public ControllerHealthInfo(EventSourceManager eventSourceManager) {
1516
}
1617

1718
public Map<String, EventSourceHealthIndicator> eventSourceHealthIndicators() {
18-
return eventSourceManager.allEventSources().entrySet().stream()
19-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
19+
return eventSourceManager.allEventSources().stream()
20+
.collect(Collectors.toMap(EventSource::name, e -> e));
2021
}
2122

2223
public Map<String, EventSourceHealthIndicator> unhealthyEventSources() {
23-
return eventSourceManager.allEventSources().entrySet().stream()
24-
.filter(e -> e.getValue().getStatus() == Status.UNHEALTHY)
25-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
24+
return eventSourceManager.allEventSources().stream()
25+
.filter(e -> e.getStatus() == Status.UNHEALTHY)
26+
.collect(Collectors.toMap(EventSource::name, e -> e));
2627
}
2728

2829
public Map<String, InformerWrappingEventSourceHealthIndicator> informerEventSourceHealthIndicators() {
29-
return eventSourceManager.allEventSources().entrySet().stream()
30-
.filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator)
31-
.collect(Collectors.toMap(Map.Entry::getKey,
32-
e -> (InformerWrappingEventSourceHealthIndicator) e.getValue()));
30+
return eventSourceManager.allEventSources().stream()
31+
.filter(e -> e instanceof InformerWrappingEventSourceHealthIndicator)
32+
.collect(Collectors.toMap(EventSource::name,
33+
e -> (InformerWrappingEventSourceHealthIndicator) e));
3334

3435
}
3536

@@ -40,11 +41,11 @@ public Map<String, InformerWrappingEventSourceHealthIndicator> informerEventSour
4041
* {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}.
4142
*/
4243
public Map<String, InformerWrappingEventSourceHealthIndicator> unhealthyInformerEventSourceHealthIndicators() {
43-
return eventSourceManager.allEventSources().entrySet().stream()
44-
.filter(e -> e.getValue().getStatus() == Status.UNHEALTHY)
45-
.filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator)
46-
.collect(Collectors.toMap(Map.Entry::getKey,
47-
e -> (InformerWrappingEventSourceHealthIndicator) e.getValue()));
44+
return eventSourceManager.allEventSources().stream()
45+
.filter(e -> e.getStatus() == Status.UNHEALTHY)
46+
.filter(e -> e instanceof InformerWrappingEventSourceHealthIndicator)
47+
.collect(Collectors.toMap(EventSource::name,
48+
e -> (InformerWrappingEventSourceHealthIndicator) e));
4849
}
4950

5051
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 21 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@
22

33
import java.util.LinkedHashSet;
44
import java.util.List;
5-
import java.util.Map;
65
import java.util.Objects;
76
import java.util.Optional;
87
import java.util.Set;
98
import java.util.function.Function;
109
import java.util.stream.Collectors;
11-
import java.util.stream.Stream;
1210

1311
import org.slf4j.Logger;
1412
import org.slf4j.LoggerFactory;
@@ -69,27 +67,23 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
6967
*/
7068
@Override
7169
public synchronized void start() {
72-
startEventSource(eventSources.namedControllerResourceEventSource());
70+
startEventSource(eventSources.controllerResourceEventSource());
7371

7472
executorServiceManager.boundedExecuteAndWaitForAllToComplete(
75-
eventSources.additionalNamedEventSources()
73+
eventSources.additionalEventSources()
7674
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)),
7775
this::startEventSource,
7876
getThreadNamer("start"));
7977

8078
executorServiceManager.boundedExecuteAndWaitForAllToComplete(
81-
eventSources.additionalNamedEventSources()
79+
eventSources.additionalEventSources()
8280
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)),
8381
this::startEventSource,
8482
getThreadNamer("start"));
8583
}
8684

87-
private static Function<NamedEventSource, String> getThreadNamer(String stage) {
88-
return es -> {
89-
final var name = es.name();
90-
return es.priority() + " " + stage + " -> "
91-
+ (es.isNameSet() ? name + " " + es.original().getClass() : es.original());
92-
};
85+
private static Function<EventSource, String> getThreadNamer(String stage) {
86+
return es -> es.priority() + " " + stage + " -> " + es.name();
9387
}
9488

9589
private static Function<NamespaceChangeable, String> getEventSourceThreadNamer(String stage) {
@@ -98,28 +92,26 @@ private static Function<NamespaceChangeable, String> getEventSourceThreadNamer(S
9892

9993
@Override
10094
public synchronized void stop() {
101-
stopEventSource(eventSources.namedControllerResourceEventSource());
95+
stopEventSource(eventSources.controllerResourceEventSource());
10296
executorServiceManager.boundedExecuteAndWaitForAllToComplete(
103-
eventSources.additionalNamedEventSources(),
97+
eventSources.additionalEventSources(),
10498
this::stopEventSource,
10599
getThreadNamer("stop"));
106100
}
107101

108102
@SuppressWarnings("rawtypes")
109-
private void logEventSourceEvent(NamedEventSource eventSource, String event) {
103+
private void logEventSourceEvent(EventSource eventSource, String event) {
110104
if (log.isDebugEnabled()) {
111-
if (eventSource.original() instanceof ResourceEventSource source) {
112-
log.debug("{} event source {} for {}", event,
113-
eventSource.isNameSet() ? eventSource.name() : eventSource,
105+
if (eventSource instanceof ResourceEventSource source) {
106+
log.debug("{} event source {} for {}", event, eventSource.name(),
114107
source.resourceType());
115108
} else {
116-
log.debug("{} event source {}", event,
117-
eventSource.isNameSet() ? eventSource.name() : eventSource);
109+
log.debug("{} event source {}", event, eventSource.name());
118110
}
119111
}
120112
}
121113

122-
private Void startEventSource(NamedEventSource eventSource) {
114+
private Void startEventSource(EventSource eventSource) {
123115
try {
124116
logEventSourceEvent(eventSource, "Starting");
125117
eventSource.start();
@@ -132,7 +124,7 @@ private Void startEventSource(NamedEventSource eventSource) {
132124
return null;
133125
}
134126

135-
private Void stopEventSource(NamedEventSource eventSource) {
127+
private Void stopEventSource(EventSource eventSource) {
136128
try {
137129
logEventSourceEvent(eventSource, "Stopping");
138130
eventSource.stop();
@@ -149,13 +141,11 @@ public final synchronized void registerEventSource(EventSource eventSource)
149141
Objects.requireNonNull(eventSource, "EventSource must not be null");
150142
try {
151143
if (eventSource instanceof ManagedInformerEventSource managedInformerEventSource) {
152-
153144
managedInformerEventSource.setConfigurationService(
154145
controller.getConfiguration().getConfigurationService());
155146
}
156-
final var named = new NamedEventSource(eventSource, eventSource.name());
157-
eventSources.add(named);
158-
named.setEventHandler(controller.getEventProcessor());
147+
eventSources.add(eventSource);
148+
eventSource.setEventHandler(controller.getEventProcessor());
159149
} catch (IllegalStateException | MissingCRDException e) {
160150
throw e; // leave untouched
161151
} catch (Exception e) {
@@ -166,8 +156,7 @@ public final synchronized void registerEventSource(EventSource eventSource)
166156

167157
@SuppressWarnings("unchecked")
168158
public void broadcastOnResourceEvent(ResourceAction action, P resource, P oldResource) {
169-
eventSources.additionalNamedEventSources()
170-
.map(NamedEventSource::original)
159+
eventSources.additionalEventSources()
171160
.forEach(source -> {
172161
if (source instanceof ResourceEventAware) {
173162
var lifecycleAwareES = ((ResourceEventAware<P>) source);
@@ -202,18 +191,12 @@ public void changeNamespaces(Set<String> namespaces) {
202191

203192
public Set<EventSource> getRegisteredEventSources() {
204193
return eventSources.flatMappedSources()
205-
.map(NamedEventSource::original)
206-
.collect(Collectors.toCollection(LinkedHashSet::new));
207-
}
208194

209-
public Map<String, EventSource> allEventSources() {
210-
return eventSources.allNamedEventSources().collect(Collectors.toMap(NamedEventSource::name,
211-
NamedEventSource::original));
195+
.collect(Collectors.toCollection(LinkedHashSet::new));
212196
}
213197

214-
@SuppressWarnings("unused")
215-
public Stream<? extends EventSourceMetadata> getNamedEventSourcesStream() {
216-
return eventSources.flatMappedSources();
198+
public List<EventSource> allEventSources() {
199+
return eventSources.allEventSources().toList();
217200
}
218201

219202
public ControllerResourceEventSource<P> getControllerResourceEventSource() {
@@ -227,9 +210,9 @@ public <R> List<ResourceEventSource<R, P>> getResourceEventSourcesFor(Class<R> d
227210
@Override
228211
public EventSource dynamicallyRegisterEventSource(EventSource eventSource) {
229212
synchronized (this) {
230-
var actual = eventSources.existing(eventSource.name(), eventSource);
213+
var actual = eventSources.existing(eventSource);
231214
if (actual != null) {
232-
eventSource = actual.eventSource();
215+
eventSource = actual;
233216
} else {
234217
registerEventSource(eventSource);
235218
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java

Lines changed: 17 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919

2020
class EventSources<R extends HasMetadata> {
2121

22-
public static final String CONTROLLER_RESOURCE_EVENT_SOURCE_NAME =
23-
"ControllerResourceEventSource";
2422
public static final String RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME =
2523
"RetryAndRescheduleTimerEventSource";
2624

27-
private final ConcurrentNavigableMap<String, Map<String, NamedEventSource>> sources =
25+
private final ConcurrentNavigableMap<String, Map<String, EventSource>> sources =
2826
new ConcurrentSkipListMap<>();
2927
private final TimerEventSource<R> retryAndRescheduleTimerEventSource = new TimerEventSource<>();
3028
private ControllerResourceEventSource<R> controllerResourceEventSource;
@@ -42,58 +40,44 @@ TimerEventSource<R> retryEventSource() {
4240
return retryAndRescheduleTimerEventSource;
4341
}
4442

45-
public Stream<NamedEventSource> additionalNamedEventSources() {
46-
return Stream.concat(Stream.of(
47-
new NamedEventSource(retryAndRescheduleTimerEventSource,
48-
RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)),
49-
flatMappedSources());
50-
}
51-
52-
public Stream<NamedEventSource> allNamedEventSources() {
53-
return Stream.concat(Stream.of(namedControllerResourceEventSource(),
54-
new NamedEventSource(retryAndRescheduleTimerEventSource,
55-
RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)),
43+
public Stream<EventSource> allEventSources() {
44+
return Stream.concat(
45+
Stream.of(controllerResourceEventSource(), retryAndRescheduleTimerEventSource),
5646
flatMappedSources());
5747
}
5848

5949
Stream<EventSource> additionalEventSources() {
6050
return Stream.concat(
6151
Stream.of(retryEventSource()).filter(Objects::nonNull),
62-
flatMappedSources().map(NamedEventSource::original));
63-
}
64-
65-
NamedEventSource namedControllerResourceEventSource() {
66-
return new NamedEventSource(controllerResourceEventSource,
67-
CONTROLLER_RESOURCE_EVENT_SOURCE_NAME);
52+
flatMappedSources());
6853
}
6954

70-
Stream<NamedEventSource> flatMappedSources() {
55+
Stream<EventSource> flatMappedSources() {
7156
return sources.values().stream().flatMap(c -> c.values().stream());
7257
}
7358

7459
public void clear() {
7560
sources.clear();
7661
}
7762

78-
public NamedEventSource existing(String name, EventSource source) {
63+
public EventSource existing(EventSource source) {
7964
final var eventSources = sources.get(keyFor(source));
8065
if (eventSources == null || eventSources.isEmpty()) {
8166
return null;
8267
}
83-
return eventSources.get(name);
68+
return eventSources.get(source.name());
8469
}
8570

86-
public void add(NamedEventSource eventSource) {
71+
public void add(EventSource eventSource) {
8772
final var name = eventSource.name();
88-
final var original = eventSource.original();
89-
final var existing = existing(name, original);
73+
final var existing = existing(eventSource);
9074
if (existing != null && !eventSource.scopeEquals(existing)) {
91-
throw new IllegalArgumentException("Event source " + existing.original()
75+
throw new IllegalArgumentException("Event source " + existing
9276
+ " is already registered for the "
93-
+ keyAsString(getResourceType(original), name)
77+
+ keyAsString(getResourceType(eventSource), name)
9478
+ " class/name combination");
9579
}
96-
sources.computeIfAbsent(keyFor(original), k -> new HashMap<>()).put(name, eventSource);
80+
sources.computeIfAbsent(keyFor(existing), k -> new HashMap<>()).put(name, eventSource);
9781
}
9882

9983
@SuppressWarnings("rawtypes")
@@ -104,10 +88,6 @@ private Class<?> getResourceType(EventSource source) {
10488
}
10589

10690
private String keyFor(EventSource source) {
107-
if (source instanceof NamedEventSource) {
108-
source = ((NamedEventSource) source).original();
109-
}
110-
11191
return keyFor(getResourceType(source));
11292
}
11393

@@ -128,7 +108,7 @@ public <S> ResourceEventSource<S, R> get(Class<S> dependentType, String name) {
128108
}
129109

130110
final var size = sourcesForType.size();
131-
NamedEventSource source;
111+
EventSource source;
132112
if (size == 1 && name == null) {
133113
source = sourcesForType.values().stream().findFirst().orElseThrow();
134114
} else {
@@ -146,16 +126,15 @@ public <S> ResourceEventSource<S, R> get(Class<S> dependentType, String name) {
146126
}
147127
}
148128

149-
EventSource original = source.original();
150-
if (!(original instanceof ResourceEventSource)) {
129+
if (!(source instanceof ResourceEventSource)) {
151130
throw new IllegalArgumentException(source + " associated with "
152131
+ keyAsString(dependentType, name) + " is not a "
153132
+ ResourceEventSource.class.getSimpleName());
154133
}
155-
final var res = (ResourceEventSource<S, R>) original;
134+
final var res = (ResourceEventSource<S, R>) source;
156135
final var resourceClass = res.resourceType();
157136
if (!resourceClass.isAssignableFrom(dependentType)) {
158-
throw new IllegalArgumentException(original + " associated with "
137+
throw new IllegalArgumentException(source + " associated with "
159138
+ keyAsString(dependentType, name)
160139
+ " is handling " + resourceClass.getName() + " resources but asked for "
161140
+ dependentType.getName());
@@ -178,7 +157,6 @@ public <S> List<ResourceEventSource<S, R>> getEventSources(Class<S> dependentTyp
178157
}
179158

180159
return sourcesForType.values().stream()
181-
.map(NamedEventSource::original)
182160
.filter(ResourceEventSource.class::isInstance)
183161
.map(es -> (ResourceEventSource<S, R>) es)
184162
.collect(Collectors.toList());

0 commit comments

Comments
 (0)