Skip to content

Commit 6d15d38

Browse files
authored
List continuation and watch bookmark support (#1881)
* Allow watch bookmarks * Work in progress * Work in progress * Work in progress * Work in progress * AsyncRequestStep tests * Fix tests * Bookmark tests * Clarify names * Work in progress * Better support for namespace lists spanning REST calls * Revert changes to charts * Bug fixes * Disassociate CRD creation from namespace startup * Make unit-test more generic * Clarify continue pattern * Save continue for reinvoke of async request * Correct method name
1 parent 5c7c22e commit 6d15d38

28 files changed

+555
-190
lines changed

operator/src/main/java/oracle/kubernetes/operator/DomainProcessorImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,8 @@ public NextAction onSuccess(Packet packet, CallResponse<V1PodList> callResponse)
916916
}
917917
}
918918
}
919-
return doNext(packet);
919+
920+
return doContinueListOrNext(callResponse, packet);
920921
}
921922
}
922923

@@ -972,7 +973,7 @@ public NextAction onSuccess(Packet packet, CallResponse<V1ServiceList> callRespo
972973
}
973974
}
974975

975-
return doNext(packet);
976+
return doContinueListOrNext(callResponse, packet);
976977
}
977978
}
978979

operator/src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 115 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import oracle.kubernetes.operator.logging.MessageKeys;
6363
import oracle.kubernetes.operator.rest.RestConfigImpl;
6464
import oracle.kubernetes.operator.rest.RestServer;
65+
import oracle.kubernetes.operator.steps.ActionResponseStep;
6566
import oracle.kubernetes.operator.steps.ConfigMapAfterStep;
6667
import oracle.kubernetes.operator.work.Component;
6768
import oracle.kubernetes.operator.work.Container;
@@ -207,13 +208,12 @@ private static void begin() {
207208

208209
Step strategy = Step.chain(
209210
new InitializeNamespacesSecurityStep(configuredDomainNamespaces),
210-
new NamespaceRulesReviewStep());
211+
new NamespaceRulesReviewStep(),
212+
CrdHelper.createDomainCrdStep(version, productVersion));
211213
if (!DomainNamespaceSelectionStrategy.Dedicated.equals(selectionStrategy)) {
212214
strategy = Step.chain(strategy, readExistingNamespaces(selectionStrategy, configuredDomainNamespaces, false));
213215
} else {
214-
strategy = Step.chain(strategy, CrdHelper.createDomainCrdStep(
215-
version, productVersion,
216-
new StartNamespacesStep(configuredDomainNamespaces, false)));
216+
strategy = Step.chain(strategy, new StartNamespacesStep(configuredDomainNamespaces, false));
217217
}
218218
runSteps(
219219
strategy,
@@ -322,14 +322,13 @@ static Step createDomainRecheckSteps(DateTime now) {
322322

323323
Step strategy = Step.chain(
324324
new InitializeNamespacesSecurityStep(configuredDomainNamespaces),
325-
new NamespaceRulesReviewStep());
325+
new NamespaceRulesReviewStep(),
326+
CrdHelper.createDomainCrdStep(version, productVersion));
326327
if (!DomainNamespaceSelectionStrategy.Dedicated.equals(selectionStrategy)) {
327328
strategy = Step.chain(strategy, readExistingNamespaces(
328329
selectionStrategy, configuredDomainNamespaces, isFullRecheck));
329330
} else {
330-
strategy = Step.chain(strategy, CrdHelper.createDomainCrdStep(
331-
version, productVersion,
332-
new StartNamespacesStep(configuredDomainNamespaces, isFullRecheck)));
331+
strategy = Step.chain(strategy, new StartNamespacesStep(configuredDomainNamespaces, isFullRecheck));
333332
}
334333
return strategy;
335334
}
@@ -369,14 +368,31 @@ private static Step readExistingPods(String ns) {
369368
}
370369

371370
static Step readExistingNamespaces(DomainNamespaceSelectionStrategy selectionStrategy,
372-
Collection<String> domainNamespaces,
373-
boolean isFullRecheck) {
371+
Collection<String> domainNamespaces,
372+
boolean isFullRecheck) {
374373
CallBuilder builder = new CallBuilder();
375374
String selector = selectionStrategy.getLabelSelector();
376375
if (selector != null) {
377376
builder.withLabelSelectors(selector);
378377
}
379-
return builder.listNamespaceAsync(new NamespaceListStep(selectionStrategy, domainNamespaces, isFullRecheck));
378+
return builder.listNamespaceAsync(
379+
new ActionResponseStep<V1NamespaceList>(new NamespaceListAfterStep(selectionStrategy)) {
380+
private Step startNamespaces(Collection<String> namespacesToStart, boolean isFullRecheck) {
381+
return new StartNamespacesStep(namespacesToStart, isFullRecheck);
382+
}
383+
384+
@Override
385+
protected NextAction onFailureNoRetry(Packet packet, CallResponse<V1NamespaceList> callResponse) {
386+
return !selectionStrategy.isRequireList() && isNotAuthorizedOrForbidden(callResponse)
387+
? doNext(startNamespaces(domainNamespaces, isFullRecheck), packet) :
388+
super.onFailureNoRetry(packet, callResponse);
389+
}
390+
391+
@Override
392+
public Step createSuccessStep(V1NamespaceList result, Step next) {
393+
return new NamespaceListStep(result, selectionStrategy, domainNamespaces, isFullRecheck, next);
394+
}
395+
});
380396
}
381397

382398
private static ConfigMapAfterStep createConfigMapStep(String ns) {
@@ -682,7 +698,7 @@ private abstract static class ForEachNamespaceStep extends Step {
682698

683699
@Override
684700
protected String getDetail() {
685-
return String.join(",", domainNamespaces);
701+
return Optional.ofNullable(domainNamespaces).map(d -> String.join(",", d)).orElse(null);
686702
}
687703

688704
protected abstract Step action(String ns);
@@ -873,7 +889,8 @@ public NextAction onSuccess(Packet packet, CallResponse<DomainList> callResponse
873889
domainWatchers.put(
874890
ns, createDomainWatcher(ns, getResourceVersion(callResponse.getResult())));
875891
}
876-
return doNext(packet);
892+
893+
return doContinueListOrNext(callResponse, packet);
877894
}
878895

879896
String getResourceVersion(DomainList result) {
@@ -916,7 +933,8 @@ public NextAction onSuccess(Packet packet, CallResponse<V1ServiceList> callRespo
916933
if (!serviceWatchers.containsKey(ns)) {
917934
serviceWatchers.put(ns, createServiceWatcher(ns, getInitialResourceVersion(result)));
918935
}
919-
return doNext(packet);
936+
937+
return doContinueListOrNext(callResponse, packet);
920938
}
921939

922940
private String getInitialResourceVersion(V1ServiceList result) {
@@ -946,7 +964,8 @@ public NextAction onSuccess(Packet packet, CallResponse<V1EventList> callRespons
946964
if (!eventWatchers.containsKey(ns)) {
947965
eventWatchers.put(ns, createEventWatcher(ns, getInitialResourceVersion(result)));
948966
}
949-
return doNext(packet);
967+
968+
return doContinueListOrNext(callResponse, packet);
950969
}
951970

952971
private String getInitialResourceVersion(V1EventList result) {
@@ -990,103 +1009,86 @@ public NextAction onSuccess(Packet packet, CallResponse<V1PodList> callResponse)
9901009
if (!podWatchers.containsKey(ns)) {
9911010
podWatchers.put(ns, createPodWatcher(ns, getInitialResourceVersion(result)));
9921011
}
993-
return doNext(packet);
1012+
1013+
return doContinueListOrNext(callResponse, packet);
9941014
}
9951015

9961016
private String getInitialResourceVersion(V1PodList result) {
9971017
return result != null ? result.getMetadata().getResourceVersion() : "";
9981018
}
9991019
}
10001020

1001-
private static class NamespaceListStep extends ResponseStep<V1NamespaceList> {
1021+
private static final String ALL_DOMAIN_NAMESPACES = "ALL_DOMAIN_NAMESPACES";
1022+
1023+
private static class NamespaceListStep extends Step {
1024+
private final V1NamespaceList list;
10021025
private final DomainNamespaceSelectionStrategy selectionStrategy;
10031026
private final Collection<String> configuredDomainNamespaces;
10041027
private final boolean isFullRecheck;
10051028

1006-
NamespaceListStep(DomainNamespaceSelectionStrategy selectionStrategy,
1007-
Collection<String> configuredDomainNamespaces,
1008-
boolean isFullRecheck) {
1029+
NamespaceListStep(V1NamespaceList list,
1030+
DomainNamespaceSelectionStrategy selectionStrategy,
1031+
Collection<String> configuredDomainNamespaces,
1032+
boolean isFullRecheck,
1033+
Step next) {
1034+
super(next);
1035+
this.list = list;
10091036
this.selectionStrategy = selectionStrategy;
10101037
this.configuredDomainNamespaces = configuredDomainNamespaces;
10111038
this.isFullRecheck = isFullRecheck;
10121039
}
10131040

10141041
@Override
1015-
public NextAction onFailure(Packet packet, CallResponse<V1NamespaceList> callResponse) {
1016-
return callResponse.getStatusCode() == CallBuilder.NOT_FOUND
1017-
? onSuccess(packet, callResponse)
1018-
: super.onFailure(packet, callResponse);
1019-
}
1020-
1021-
@Override
1022-
protected NextAction onFailureNoRetry(Packet packet, CallResponse<V1NamespaceList> callResponse) {
1023-
return !selectionStrategy.isRequireList() && isNotAuthorizedOrForbidden(callResponse)
1024-
? doNext(createDomainCrdAndStartNamespaces(configuredDomainNamespaces, isFullRecheck), packet) :
1025-
super.onFailureNoRetry(packet, callResponse);
1026-
}
1027-
1028-
@Override
1029-
public NextAction onSuccess(Packet packet, CallResponse<V1NamespaceList> callResponse) {
1030-
V1NamespaceList result = callResponse.getResult();
1042+
public NextAction apply(Packet packet) {
10311043
// don't bother processing pre-existing events
1032-
String intialResourceVersion = getInitialResourceVersion(result);
1033-
List<String> nsList = getExistingNamespaces(result);
1044+
String intialResourceVersion = getInitialResourceVersion(list);
1045+
List<String> nsPossiblyPartialList = getExistingNamespaces(list);
10341046

1035-
Set<String> namespacesToStart;
1047+
Set<String> namespacesToStartNow;
10361048
if (selectionStrategy.isRequireList()) {
1037-
namespacesToStart = new TreeSet<>(nsList);
1049+
namespacesToStartNow = new TreeSet<>(nsPossiblyPartialList);
10381050
String regexp = selectionStrategy.getRegExp();
10391051
if (regexp != null) {
10401052
try {
1041-
namespacesToStart = namespacesToStart.stream().filter(
1053+
namespacesToStartNow = namespacesToStartNow.stream().filter(
10421054
Pattern.compile(regexp).asPredicate()).collect(Collectors.toSet());
10431055
} catch (PatternSyntaxException pse) {
10441056
LOGGER.severe(MessageKeys.EXCEPTION, pse);
10451057
}
10461058
}
10471059
} else {
1048-
namespacesToStart = new TreeSet<>(configuredDomainNamespaces);
1049-
for (String ns : configuredDomainNamespaces) {
1050-
if (!nsList.contains(ns)) {
1051-
try (LoggingContext stack = LoggingContext.setThreadContext().namespace(ns)) {
1052-
LOGGER.warning(MessageKeys.NAMESPACE_IS_MISSING, ns);
1053-
}
1054-
namespacesToStart.remove(ns);
1055-
}
1056-
}
1060+
namespacesToStartNow = new TreeSet<>(configuredDomainNamespaces);
1061+
namespacesToStartNow.retainAll(nsPossiblyPartialList);
10571062
}
1063+
10581064
Step strategy = null;
1059-
if (!namespacesToStart.isEmpty()) {
1060-
strategy = Step.chain(createDomainCrdAndStartNamespaces(namespacesToStart, isFullRecheck),
1061-
new CreateNamespaceWatcherStep(selectionStrategy, intialResourceVersion));
1065+
if (!namespacesToStartNow.isEmpty()) {
1066+
strategy = Step.chain(
1067+
startNamespaces(namespacesToStartNow, isFullRecheck),
1068+
new CreateNamespaceWatcherStep(selectionStrategy, intialResourceVersion),
1069+
getNext());
10621070

10631071
if (configuredDomainNamespaces == null) {
1064-
strategy = new InitializeNamespacesSecurityStep(namespacesToStart, strategy);
1072+
strategy = new InitializeNamespacesSecurityStep(namespacesToStartNow, strategy);
10651073
}
10661074
} else {
1067-
strategy = CrdHelper.createDomainCrdStep(
1068-
version, productVersion,
1069-
new CreateNamespaceWatcherStep(selectionStrategy, intialResourceVersion));
1075+
strategy = Step.chain(
1076+
new CreateNamespaceWatcherStep(selectionStrategy, intialResourceVersion),
1077+
getNext());
10701078
}
10711079

1072-
// Check for namespaces that are removed from the operator's
1073-
// domainNamespaces list, or that are deleted from the Kubernetes cluster.
1074-
Set<String> namespacesToStop = new TreeSet<>(namespaceStoppingMap.keySet());
1075-
for (String ns : namespacesToStart) {
1076-
// the active namespaces are the ones that will not be stopped
1077-
if (delegate.isNamespaceRunning(ns)) {
1078-
namespacesToStop.remove(ns);
1079-
}
1080+
Collection<String> allDomainNamespaces = (Collection<String>) packet.get(ALL_DOMAIN_NAMESPACES);
1081+
if (allDomainNamespaces == null) {
1082+
allDomainNamespaces = new HashSet<>();
1083+
packet.put(ALL_DOMAIN_NAMESPACES, allDomainNamespaces);
10801084
}
1081-
stopNamespaces(namespacesToStart, namespacesToStop);
1085+
allDomainNamespaces.addAll(namespacesToStartNow);
10821086

10831087
return doNext(strategy, packet);
10841088
}
1085-
1086-
private Step createDomainCrdAndStartNamespaces(Collection<String> namespacesToStart, boolean isFullRecheck) {
1087-
return CrdHelper.createDomainCrdStep(
1088-
version, productVersion,
1089-
new StartNamespacesStep(namespacesToStart, isFullRecheck));
1089+
1090+
private Step startNamespaces(Collection<String> namespacesToStart, boolean isFullRecheck) {
1091+
return new StartNamespacesStep(namespacesToStart, isFullRecheck);
10901092
}
10911093

10921094
private String getInitialResourceVersion(V1NamespaceList result) {
@@ -1103,7 +1105,48 @@ private List<String> getExistingNamespaces(V1NamespaceList result) {
11031105
return namespaces;
11041106
}
11051107
}
1106-
1108+
1109+
private static class NamespaceListAfterStep extends Step {
1110+
private final DomainNamespaceSelectionStrategy selectionStrategy;
1111+
1112+
public NamespaceListAfterStep(DomainNamespaceSelectionStrategy selectionStrategy) {
1113+
this.selectionStrategy = selectionStrategy;
1114+
}
1115+
1116+
@Override
1117+
public NextAction apply(Packet packet) {
1118+
Collection<String> allDomainNamespaces = (Collection<String>) packet.get(ALL_DOMAIN_NAMESPACES);
1119+
if (allDomainNamespaces == null) {
1120+
allDomainNamespaces = new HashSet<>();
1121+
}
1122+
1123+
Collection<String> configuredDomainNamespaces = selectionStrategy.getConfiguredList();
1124+
if (configuredDomainNamespaces != null) {
1125+
for (String ns : configuredDomainNamespaces) {
1126+
if (!allDomainNamespaces.contains(ns)) {
1127+
try (LoggingContext stack = LoggingContext.setThreadContext().namespace(ns)) {
1128+
LOGGER.warning(MessageKeys.NAMESPACE_IS_MISSING, ns);
1129+
}
1130+
}
1131+
}
1132+
}
1133+
1134+
// Check for namespaces that are removed from the operator's
1135+
// domainNamespaces list, or that are deleted from the Kubernetes cluster.
1136+
Set<String> namespacesToStop = new TreeSet<>(namespaceStoppingMap.keySet());
1137+
for (String ns : allDomainNamespaces) {
1138+
// the active namespaces are the ones that will not be stopped
1139+
if (delegate.isNamespaceRunning(ns)) {
1140+
namespacesToStop.remove(ns);
1141+
}
1142+
}
1143+
1144+
stopNamespaces(allDomainNamespaces, namespacesToStop);
1145+
1146+
return doNext(packet);
1147+
}
1148+
}
1149+
11071150
private static class CreateNamespaceWatcherStep extends Step {
11081151
private final DomainNamespaceSelectionStrategy selectionStrategy;
11091152
private final String initialResourceVersion;

operator/src/main/java/oracle/kubernetes/operator/TuningParametersImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ private void update() {
5959

6060
CallBuilderTuning callBuilder =
6161
new CallBuilderTuning(
62-
(int) readTuningParameter("callRequestLimit", 500),
62+
(int) readTuningParameter("callRequestLimit", 50),
6363
(int) readTuningParameter("callMaxRetryCount", 5),
6464
(int) readTuningParameter("callTimeoutSeconds", 10));
6565

operator/src/main/java/oracle/kubernetes/operator/Watcher.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ void waitForExit() {
9393
}
9494
}
9595

96+
// for test
97+
String getResourceVersion() {
98+
return resourceVersion;
99+
}
100+
96101
/**
97102
* Sets the listener for watch events.
98103
*

operator/src/main/java/oracle/kubernetes/operator/builders/CallParamsImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
package oracle.kubernetes.operator.builders;
55

66
/** An object which encapsulates common parameters for Kubernetes API calls. */
7-
class CallParamsImpl implements CallParams {
8-
private static final int DEFAULT_LIMIT = 500;
7+
public class CallParamsImpl implements CallParams {
8+
private static final int DEFAULT_LIMIT = 50;
99
private static final int DEFAULT_TIMEOUT = 30;
1010

1111
private Integer limit = CallParamsImpl.DEFAULT_LIMIT;
@@ -20,7 +20,7 @@ public Integer getLimit() {
2020
return limit;
2121
}
2222

23-
void setLimit(Integer limit) {
23+
public void setLimit(Integer limit) {
2424
this.limit = limit;
2525
}
2626

@@ -29,7 +29,7 @@ public Integer getTimeoutSeconds() {
2929
return timeoutSeconds;
3030
}
3131

32-
void setTimeoutSeconds(Integer timeoutSeconds) {
32+
public void setTimeoutSeconds(Integer timeoutSeconds) {
3333
this.timeoutSeconds = timeoutSeconds;
3434
}
3535

operator/src/main/java/oracle/kubernetes/operator/builders/WatchBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class WatchBuilder {
2929
/** Ignored for watches. */
3030
private static final String START_LIST = null;
3131

32-
private static final Boolean ALLOW_BOOKMARKS = false;
32+
private static final Boolean ALLOW_BOOKMARKS = true;
3333

3434
@SuppressWarnings("FieldMayBeFinal") // Leave non-final for unit test
3535
private static WatchFactory FACTORY = new WatchFactoryImpl();

0 commit comments

Comments
 (0)