Skip to content

List continuation and watch bookmark support #1881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ee2b134
Allow watch bookmarks
rjeberhard Aug 13, 2020
a98a366
Merge remote-tracking branch 'origin/develop' into owls-83432
rjeberhard Aug 13, 2020
8056b45
Work in progress
rjeberhard Aug 17, 2020
6ea6874
Work in progress
rjeberhard Aug 17, 2020
5fa913e
Work in progress
rjeberhard Aug 17, 2020
a642f8a
Merge remote-tracking branch 'origin/develop' into owls-83432
rjeberhard Aug 17, 2020
a94ab5e
Work in progress
rjeberhard Aug 18, 2020
604f41a
AsyncRequestStep tests
rjeberhard Aug 18, 2020
1e3b392
Fix tests
rjeberhard Aug 19, 2020
8a43b85
Merge remote-tracking branch 'origin/develop' into owls-83432
rjeberhard Aug 19, 2020
af2f2b3
Bookmark tests
rjeberhard Aug 19, 2020
80ddf89
Clarify names
rjeberhard Aug 19, 2020
94aafd6
Merge remote-tracking branch 'origin/develop' into owls-83432
rjeberhard Aug 31, 2020
6873ba6
Work in progress
rjeberhard Aug 31, 2020
b95eccc
Better support for namespace lists spanning REST calls
rjeberhard Aug 31, 2020
a933fb4
Merge remote-tracking branch 'origin/develop' into owls-83432
rjeberhard Sep 2, 2020
61b5ba9
Merge remote-tracking branch 'origin/develop' into owls-83432
rjeberhard Sep 2, 2020
39ae629
Merge remote-tracking branch 'origin/develop' into owls-83432
rjeberhard Sep 3, 2020
5cf2335
Merge remote-tracking branch 'origin/develop' into owls-83432
rjeberhard Sep 4, 2020
c5c93f4
Merge remote-tracking branch 'origin/develop' into owls-83432
rjeberhard Sep 4, 2020
608af60
Merge remote-tracking branch 'origin/develop' into owls-83432
rjeberhard Sep 8, 2020
cf46795
Revert changes to charts
rjeberhard Sep 8, 2020
f3ef12a
Bug fixes
rjeberhard Sep 8, 2020
093e373
Disassociate CRD creation from namespace startup
rjeberhard Sep 8, 2020
fb4b6a8
Make unit-test more generic
rjeberhard Sep 9, 2020
ee58238
Clarify continue pattern
rjeberhard Sep 9, 2020
704c4d3
Merge remote-tracking branch 'origin/develop' into owls-83432
rjeberhard Sep 9, 2020
9314171
Save continue for reinvoke of async request
rjeberhard Sep 10, 2020
7619b35
Merge remote-tracking branch 'origin/develop' into owls-83432
rjeberhard Sep 10, 2020
ecdb291
Correct method name
rjeberhard Sep 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,8 @@ public NextAction onSuccess(Packet packet, CallResponse<V1PodList> callResponse)
}
}
}
return doNext(packet);

return doContinueListOrNext(callResponse, packet);
}
}

Expand Down Expand Up @@ -972,7 +973,7 @@ public NextAction onSuccess(Packet packet, CallResponse<V1ServiceList> callRespo
}
}

return doNext(packet);
return doContinueListOrNext(callResponse, packet);
}
}

Expand Down
187 changes: 115 additions & 72 deletions operator/src/main/java/oracle/kubernetes/operator/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import oracle.kubernetes.operator.logging.MessageKeys;
import oracle.kubernetes.operator.rest.RestConfigImpl;
import oracle.kubernetes.operator.rest.RestServer;
import oracle.kubernetes.operator.steps.ActionResponseStep;
import oracle.kubernetes.operator.steps.ConfigMapAfterStep;
import oracle.kubernetes.operator.work.Component;
import oracle.kubernetes.operator.work.Container;
Expand Down Expand Up @@ -207,13 +208,12 @@ private static void begin() {

Step strategy = Step.chain(
new InitializeNamespacesSecurityStep(configuredDomainNamespaces),
new NamespaceRulesReviewStep());
new NamespaceRulesReviewStep(),
CrdHelper.createDomainCrdStep(version, productVersion));
if (!DomainNamespaceSelectionStrategy.Dedicated.equals(selectionStrategy)) {
strategy = Step.chain(strategy, readExistingNamespaces(selectionStrategy, configuredDomainNamespaces, false));
} else {
strategy = Step.chain(strategy, CrdHelper.createDomainCrdStep(
version, productVersion,
new StartNamespacesStep(configuredDomainNamespaces, false)));
strategy = Step.chain(strategy, new StartNamespacesStep(configuredDomainNamespaces, false));
}
runSteps(
strategy,
Expand Down Expand Up @@ -322,14 +322,13 @@ static Step createDomainRecheckSteps(DateTime now) {

Step strategy = Step.chain(
new InitializeNamespacesSecurityStep(configuredDomainNamespaces),
new NamespaceRulesReviewStep());
new NamespaceRulesReviewStep(),
CrdHelper.createDomainCrdStep(version, productVersion));
if (!DomainNamespaceSelectionStrategy.Dedicated.equals(selectionStrategy)) {
strategy = Step.chain(strategy, readExistingNamespaces(
selectionStrategy, configuredDomainNamespaces, isFullRecheck));
} else {
strategy = Step.chain(strategy, CrdHelper.createDomainCrdStep(
version, productVersion,
new StartNamespacesStep(configuredDomainNamespaces, isFullRecheck)));
strategy = Step.chain(strategy, new StartNamespacesStep(configuredDomainNamespaces, isFullRecheck));
}
return strategy;
}
Expand Down Expand Up @@ -369,14 +368,31 @@ private static Step readExistingPods(String ns) {
}

static Step readExistingNamespaces(DomainNamespaceSelectionStrategy selectionStrategy,
Collection<String> domainNamespaces,
boolean isFullRecheck) {
Collection<String> domainNamespaces,
boolean isFullRecheck) {
CallBuilder builder = new CallBuilder();
String selector = selectionStrategy.getLabelSelector();
if (selector != null) {
builder.withLabelSelectors(selector);
}
return builder.listNamespaceAsync(new NamespaceListStep(selectionStrategy, domainNamespaces, isFullRecheck));
return builder.listNamespaceAsync(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is becoming increasingly complex. The coupling between the listNamespace calls and the CRD processing should not be done at this level. One of the great things about the Step/Fiber architecture is that you have those separate steps, and let the code that sets them up define the chain. Putting the dependency directly in the response to the namespace list makes future maintenance ever more difficult.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair, but the complexity here really isn't new -- I just moved this code here. I'll see if I can separate them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I pulled the CRD steps out separately and earlier in the overall flow.

new ActionResponseStep<V1NamespaceList>(new NamespaceListAfterStep(selectionStrategy)) {
private Step startNamespaces(Collection<String> namespacesToStart, boolean isFullRecheck) {
return new StartNamespacesStep(namespacesToStart, isFullRecheck);
}

@Override
protected NextAction onFailureNoRetry(Packet packet, CallResponse<V1NamespaceList> callResponse) {
return !selectionStrategy.isRequireList() && isNotAuthorizedOrForbidden(callResponse)
? doNext(startNamespaces(domainNamespaces, isFullRecheck), packet) :
super.onFailureNoRetry(packet, callResponse);
}

@Override
public Step createSuccessStep(V1NamespaceList result, Step next) {
return new NamespaceListStep(result, selectionStrategy, domainNamespaces, isFullRecheck, next);
}
});
}

private static ConfigMapAfterStep createConfigMapStep(String ns) {
Expand Down Expand Up @@ -682,7 +698,7 @@ private abstract static class ForEachNamespaceStep extends Step {

@Override
protected String getDetail() {
return String.join(",", domainNamespaces);
return Optional.ofNullable(domainNamespaces).map(d -> String.join(",", d)).orElse(null);
}

protected abstract Step action(String ns);
Expand Down Expand Up @@ -873,7 +889,8 @@ public NextAction onSuccess(Packet packet, CallResponse<DomainList> callResponse
domainWatchers.put(
ns, createDomainWatcher(ns, getResourceVersion(callResponse.getResult())));
}
return doNext(packet);

return doContinueListOrNext(callResponse, packet);
}

String getResourceVersion(DomainList result) {
Expand Down Expand Up @@ -916,7 +933,8 @@ public NextAction onSuccess(Packet packet, CallResponse<V1ServiceList> callRespo
if (!serviceWatchers.containsKey(ns)) {
serviceWatchers.put(ns, createServiceWatcher(ns, getInitialResourceVersion(result)));
}
return doNext(packet);

return doContinueListOrNext(callResponse, packet);
}

private String getInitialResourceVersion(V1ServiceList result) {
Expand Down Expand Up @@ -946,7 +964,8 @@ public NextAction onSuccess(Packet packet, CallResponse<V1EventList> callRespons
if (!eventWatchers.containsKey(ns)) {
eventWatchers.put(ns, createEventWatcher(ns, getInitialResourceVersion(result)));
}
return doNext(packet);

return doContinueListOrNext(callResponse, packet);
}

private String getInitialResourceVersion(V1EventList result) {
Expand Down Expand Up @@ -990,103 +1009,86 @@ public NextAction onSuccess(Packet packet, CallResponse<V1PodList> callResponse)
if (!podWatchers.containsKey(ns)) {
podWatchers.put(ns, createPodWatcher(ns, getInitialResourceVersion(result)));
}
return doNext(packet);

return doContinueListOrNext(callResponse, packet);
}

private String getInitialResourceVersion(V1PodList result) {
return result != null ? result.getMetadata().getResourceVersion() : "";
}
}

private static class NamespaceListStep extends ResponseStep<V1NamespaceList> {
private static final String ALL_DOMAIN_NAMESPACES = "ALL_DOMAIN_NAMESPACES";

private static class NamespaceListStep extends Step {
private final V1NamespaceList list;
private final DomainNamespaceSelectionStrategy selectionStrategy;
private final Collection<String> configuredDomainNamespaces;
private final boolean isFullRecheck;

NamespaceListStep(DomainNamespaceSelectionStrategy selectionStrategy,
Collection<String> configuredDomainNamespaces,
boolean isFullRecheck) {
NamespaceListStep(V1NamespaceList list,
DomainNamespaceSelectionStrategy selectionStrategy,
Collection<String> configuredDomainNamespaces,
boolean isFullRecheck,
Step next) {
super(next);
this.list = list;
this.selectionStrategy = selectionStrategy;
this.configuredDomainNamespaces = configuredDomainNamespaces;
this.isFullRecheck = isFullRecheck;
}

@Override
public NextAction onFailure(Packet packet, CallResponse<V1NamespaceList> callResponse) {
return callResponse.getStatusCode() == CallBuilder.NOT_FOUND
? onSuccess(packet, callResponse)
: super.onFailure(packet, callResponse);
}

@Override
protected NextAction onFailureNoRetry(Packet packet, CallResponse<V1NamespaceList> callResponse) {
return !selectionStrategy.isRequireList() && isNotAuthorizedOrForbidden(callResponse)
? doNext(createDomainCrdAndStartNamespaces(configuredDomainNamespaces, isFullRecheck), packet) :
super.onFailureNoRetry(packet, callResponse);
}

@Override
public NextAction onSuccess(Packet packet, CallResponse<V1NamespaceList> callResponse) {
V1NamespaceList result = callResponse.getResult();
public NextAction apply(Packet packet) {
// don't bother processing pre-existing events
String intialResourceVersion = getInitialResourceVersion(result);
List<String> nsList = getExistingNamespaces(result);
String intialResourceVersion = getInitialResourceVersion(list);
List<String> nsPossiblyPartialList = getExistingNamespaces(list);

Set<String> namespacesToStart;
Set<String> namespacesToStartNow;
if (selectionStrategy.isRequireList()) {
namespacesToStart = new TreeSet<>(nsList);
namespacesToStartNow = new TreeSet<>(nsPossiblyPartialList);
String regexp = selectionStrategy.getRegExp();
if (regexp != null) {
try {
namespacesToStart = namespacesToStart.stream().filter(
namespacesToStartNow = namespacesToStartNow.stream().filter(
Pattern.compile(regexp).asPredicate()).collect(Collectors.toSet());
} catch (PatternSyntaxException pse) {
LOGGER.severe(MessageKeys.EXCEPTION, pse);
}
}
} else {
namespacesToStart = new TreeSet<>(configuredDomainNamespaces);
for (String ns : configuredDomainNamespaces) {
if (!nsList.contains(ns)) {
try (LoggingContext stack = LoggingContext.setThreadContext().namespace(ns)) {
LOGGER.warning(MessageKeys.NAMESPACE_IS_MISSING, ns);
}
namespacesToStart.remove(ns);
}
}
namespacesToStartNow = new TreeSet<>(configuredDomainNamespaces);
namespacesToStartNow.retainAll(nsPossiblyPartialList);
}

Step strategy = null;
if (!namespacesToStart.isEmpty()) {
strategy = Step.chain(createDomainCrdAndStartNamespaces(namespacesToStart, isFullRecheck),
new CreateNamespaceWatcherStep(selectionStrategy, intialResourceVersion));
if (!namespacesToStartNow.isEmpty()) {
strategy = Step.chain(
startNamespaces(namespacesToStartNow, isFullRecheck),
new CreateNamespaceWatcherStep(selectionStrategy, intialResourceVersion),
getNext());

if (configuredDomainNamespaces == null) {
strategy = new InitializeNamespacesSecurityStep(namespacesToStart, strategy);
strategy = new InitializeNamespacesSecurityStep(namespacesToStartNow, strategy);
}
} else {
strategy = CrdHelper.createDomainCrdStep(
version, productVersion,
new CreateNamespaceWatcherStep(selectionStrategy, intialResourceVersion));
strategy = Step.chain(
new CreateNamespaceWatcherStep(selectionStrategy, intialResourceVersion),
getNext());
}

// Check for namespaces that are removed from the operator's
// domainNamespaces list, or that are deleted from the Kubernetes cluster.
Set<String> namespacesToStop = new TreeSet<>(namespaceStoppingMap.keySet());
for (String ns : namespacesToStart) {
// the active namespaces are the ones that will not be stopped
if (delegate.isNamespaceRunning(ns)) {
namespacesToStop.remove(ns);
}
Collection<String> allDomainNamespaces = (Collection<String>) packet.get(ALL_DOMAIN_NAMESPACES);
if (allDomainNamespaces == null) {
allDomainNamespaces = new HashSet<>();
packet.put(ALL_DOMAIN_NAMESPACES, allDomainNamespaces);
}
stopNamespaces(namespacesToStart, namespacesToStop);
allDomainNamespaces.addAll(namespacesToStartNow);

return doNext(strategy, packet);
}

private Step createDomainCrdAndStartNamespaces(Collection<String> namespacesToStart, boolean isFullRecheck) {
return CrdHelper.createDomainCrdStep(
version, productVersion,
new StartNamespacesStep(namespacesToStart, isFullRecheck));

private Step startNamespaces(Collection<String> namespacesToStart, boolean isFullRecheck) {
return new StartNamespacesStep(namespacesToStart, isFullRecheck);
}

private String getInitialResourceVersion(V1NamespaceList result) {
Expand All @@ -1103,7 +1105,48 @@ private List<String> getExistingNamespaces(V1NamespaceList result) {
return namespaces;
}
}


private static class NamespaceListAfterStep extends Step {
private final DomainNamespaceSelectionStrategy selectionStrategy;

public NamespaceListAfterStep(DomainNamespaceSelectionStrategy selectionStrategy) {
this.selectionStrategy = selectionStrategy;
}

@Override
public NextAction apply(Packet packet) {
Collection<String> allDomainNamespaces = (Collection<String>) packet.get(ALL_DOMAIN_NAMESPACES);
if (allDomainNamespaces == null) {
allDomainNamespaces = new HashSet<>();
}

Collection<String> configuredDomainNamespaces = selectionStrategy.getConfiguredList();
if (configuredDomainNamespaces != null) {
for (String ns : configuredDomainNamespaces) {
if (!allDomainNamespaces.contains(ns)) {
try (LoggingContext stack = LoggingContext.setThreadContext().namespace(ns)) {
LOGGER.warning(MessageKeys.NAMESPACE_IS_MISSING, ns);
}
}
}
}

// Check for namespaces that are removed from the operator's
// domainNamespaces list, or that are deleted from the Kubernetes cluster.
Set<String> namespacesToStop = new TreeSet<>(namespaceStoppingMap.keySet());
for (String ns : allDomainNamespaces) {
// the active namespaces are the ones that will not be stopped
if (delegate.isNamespaceRunning(ns)) {
namespacesToStop.remove(ns);
}
}

stopNamespaces(allDomainNamespaces, namespacesToStop);

return doNext(packet);
}
}

private static class CreateNamespaceWatcherStep extends Step {
private final DomainNamespaceSelectionStrategy selectionStrategy;
private final String initialResourceVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private void update() {

CallBuilderTuning callBuilder =
new CallBuilderTuning(
(int) readTuningParameter("callRequestLimit", 500),
(int) readTuningParameter("callRequestLimit", 50),
(int) readTuningParameter("callMaxRetryCount", 5),
(int) readTuningParameter("callTimeoutSeconds", 10));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ void waitForExit() {
}
}

// for test
Copy link
Member

@ankedia ankedia Sep 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that getNewResourceVersion() method of Watcher.java will check if event type is BOOKMARK and then update the resource version sent as part of bookmark event. Please let me know if my understanding is incorrect or if I missed something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you follow the logic through, you will see that this is what happens. I'm working on another branch, but tomorrow I'll find the existing code.

Copy link
Member

@ankedia ankedia Sep 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, got it. I wonder if we need to test it functionally (in integration test or stress test) to verify if bookmark events helps in mitigating the impact of short event history window. In any case, this looks fine to me.

String getResourceVersion() {
return resourceVersion;
}

/**
* Sets the listener for watch events.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
package oracle.kubernetes.operator.builders;

/** An object which encapsulates common parameters for Kubernetes API calls. */
class CallParamsImpl implements CallParams {
private static final int DEFAULT_LIMIT = 500;
public class CallParamsImpl implements CallParams {
private static final int DEFAULT_LIMIT = 50;
private static final int DEFAULT_TIMEOUT = 30;

private Integer limit = CallParamsImpl.DEFAULT_LIMIT;
Expand All @@ -20,7 +20,7 @@ public Integer getLimit() {
return limit;
}

void setLimit(Integer limit) {
public void setLimit(Integer limit) {
this.limit = limit;
}

Expand All @@ -29,7 +29,7 @@ public Integer getTimeoutSeconds() {
return timeoutSeconds;
}

void setTimeoutSeconds(Integer timeoutSeconds) {
public void setTimeoutSeconds(Integer timeoutSeconds) {
this.timeoutSeconds = timeoutSeconds;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class WatchBuilder {
/** Ignored for watches. */
private static final String START_LIST = null;

private static final Boolean ALLOW_BOOKMARKS = false;
private static final Boolean ALLOW_BOOKMARKS = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ALLOW_BOOKMARKS is set to true in WatchBuilder but I don't see BOOKMARK watch event handled in Watcher.java. My understanding of Watch bookmark feature is that it will send an event of type BOOKMARK to mark that all changes up to a given resourceVersion the client is requesting have already been sent. I was imagining that we'll update the resource version in Watcher.java with resource version set in BOOKMARK event . https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that the behavior of the default branch of the switch was identical to what we would do for a specific BOOKMARK event, so there was no need to make further code change.


@SuppressWarnings("FieldMayBeFinal") // Leave non-final for unit test
private static WatchFactory FACTORY = new WatchFactoryImpl();
Expand Down
Loading