Skip to content

Retry Support #252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.javaoperatorsdk.operator.processing.EventDispatcher;
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
import io.javaoperatorsdk.operator.processing.retry.Retry;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -35,19 +36,33 @@ public Operator(KubernetesClient k8sClient) {
this.k8sClient = k8sClient;
}

public <R extends CustomResource> void registerControllerForAllNamespaces(
ResourceController<R> controller, Retry retry) throws OperatorException {
registerController(controller, true, retry);
}

public <R extends CustomResource> void registerControllerForAllNamespaces(
ResourceController<R> controller) throws OperatorException {
registerController(controller, true);
registerController(controller, true, null);
}

public <R extends CustomResource> void registerController(
ResourceController<R> controller, Retry retry, String... targetNamespaces)
throws OperatorException {
registerController(controller, false, retry, targetNamespaces);
}

public <R extends CustomResource> void registerController(
ResourceController<R> controller, String... targetNamespaces) throws OperatorException {
registerController(controller, false, targetNamespaces);
registerController(controller, false, null, targetNamespaces);
}

@SuppressWarnings("rawtypes")
private <R extends CustomResource> void registerController(
ResourceController<R> controller, boolean watchAllNamespaces, String... targetNamespaces)
ResourceController<R> controller,
boolean watchAllNamespaces,
Retry retry,
String... targetNamespaces)
throws OperatorException {
Class<R> resClass = getCustomResourceClass(controller);
CustomResourceDefinitionContext crd = getCustomResourceDefinitionForController(controller);
Expand All @@ -66,10 +81,10 @@ private <R extends CustomResource> void registerController(
CustomResourceCache customResourceCache = new CustomResourceCache();
DefaultEventHandler defaultEventHandler =
new DefaultEventHandler(
customResourceCache, eventDispatcher, controller.getClass().getName());
customResourceCache, eventDispatcher, controller.getClass().getName(), retry);
DefaultEventSourceManager eventSourceManager =
new DefaultEventSourceManager(defaultEventHandler);
defaultEventHandler.setDefaultEventSourceManager(eventSourceManager);
new DefaultEventSourceManager(defaultEventHandler, retry != null);
defaultEventHandler.setEventSourceManager(eventSourceManager);
eventDispatcher.setEventSourceManager(eventSourceManager);

customResourceClients.put(resClass, (CustomResourceOperationsImpl) client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.processing.event.EventList;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import java.util.Optional;

public interface Context<T extends CustomResource> {

EventSourceManager getEventSourceManager();

EventList getEvents();

Optional<RetryInfo> getRetryInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.processing.event.EventList;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import java.util.Optional;

public class DefaultContext<T extends CustomResource> implements Context<T> {

private final RetryInfo retryInfo;
private final EventList events;
private final EventSourceManager eventSourceManager;

public DefaultContext(EventSourceManager eventSourceManager, EventList events) {
public DefaultContext(
EventSourceManager eventSourceManager, EventList events, RetryInfo retryInfo) {
this.retryInfo = retryInfo;
this.events = events;
this.eventSourceManager = eventSourceManager;
}
Expand All @@ -23,4 +27,9 @@ public EventSourceManager getEventSourceManager() {
public EventList getEvents() {
return events;
}

@Override
public Optional<RetryInfo> getRetryInfo() {
return Optional.ofNullable(retryInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@

public class RetryInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about turning it into an interface extended by RetryExecution and implemented by GenericRetryExecution?
so the execution can be returned as info and no need to instantiate a new RetryInfo on every call.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hehe, I like to work with Pojos, but will check how that would look like, I change it in case its nice.


private int retryNumber;
private int attemptIndex;
private boolean lastAttempt;

public RetryInfo(int retryNumber, boolean lastAttempt) {
this.retryNumber = retryNumber;
this.attemptIndex = retryNumber;
this.lastAttempt = lastAttempt;
}

public int getRetryNumber() {
return retryNumber;
public int getAttemptIndex() {
return attemptIndex;
}

public boolean isLastAttempt() {
return lastAttempt;
}

@Override
public String toString() {
return "RetryInfo{" + "attemptIndex=" + attemptIndex + ", lastAttempt=" + lastAttempt + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;

import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.RetryInfo;
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.retry.Retry;
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
import java.util.*;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand All @@ -30,16 +34,20 @@ public class DefaultEventHandler implements EventHandler {
private final Set<String> underProcessing = new HashSet<>();
private final ScheduledThreadPoolExecutor executor;
private final EventDispatcher eventDispatcher;
private DefaultEventSourceManager defaultEventSourceManager;
private final Retry retry;
private final Map<String, RetryExecution> retryState = new HashMap<>();
private DefaultEventSourceManager eventSourceManager;

private final ReentrantLock lock = new ReentrantLock();

public DefaultEventHandler(
CustomResourceCache customResourceCache,
EventDispatcher eventDispatcher,
String relatedControllerName) {
String relatedControllerName,
Retry retry) {
this.customResourceCache = customResourceCache;
this.eventDispatcher = eventDispatcher;
this.retry = retry;
eventBuffer = new EventBuffer();
executor =
new ScheduledThreadPoolExecutor(
Expand All @@ -52,8 +60,8 @@ public Thread newThread(Runnable runnable) {
});
}

public void setDefaultEventSourceManager(DefaultEventSourceManager defaultEventSourceManager) {
this.defaultEventSourceManager = defaultEventSourceManager;
public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) {
this.eventSourceManager = eventSourceManager;
}

@Override
Expand All @@ -79,7 +87,8 @@ private void executeBufferedEvents(String customResourceUid) {
ExecutionScope executionScope =
new ExecutionScope(
eventBuffer.getAndRemoveEventsForExecution(customResourceUid),
latestCustomResource.get());
latestCustomResource.get(),
retryInfo(customResourceUid));
log.debug("Executing events for custom resource. Scope: {}", executionScope);
executor.execute(new ExecutionConsumer(executionScope, eventDispatcher, this));
} else {
Expand All @@ -93,12 +102,30 @@ private void executeBufferedEvents(String customResourceUid) {
}
}

private RetryInfo retryInfo(String customResourceUid) {
RetryExecution retryExecution = retryState.get(customResourceUid);
if (retryExecution != null) {
return new RetryInfo(retryExecution.getCurrentAttemptIndex(), retryExecution.isLastAttempt());
} else {
return null;
}
}

void eventProcessingFinished(
ExecutionScope executionScope, PostExecutionControl postExecutionControl) {
try {
lock.lock();
log.debug("Event processing finished. Scope: {}", executionScope);
unsetUnderExecution(executionScope.getCustomResourceUid());

if (retry != null && postExecutionControl.exceptionDuringExecution()) {
handleRetryOnException(executionScope);
return;
}

if (retry != null) {
markSuccessfulExecutionRegardingRetry(executionScope);
}
if (containsCustomResourceDeletedEvent(executionScope.getEvents())) {
cleanupAfterDeletedEvent(executionScope.getCustomResourceUid());
} else {
Expand All @@ -110,6 +137,44 @@ void eventProcessingFinished(
}
}

/**
* Regarding the events there are 2 approaches we can take. Either retry always when there are new
* events (received meanwhile retry is in place or already in buffer) instantly or always wait
* according to the retry timing if there was an exception.
*/
private void handleRetryOnException(ExecutionScope executionScope) {
RetryExecution execution = getOrInitRetryExecution(executionScope);
boolean newEventsExists = eventBuffer.newEventsExists(executionScope.getCustomResourceUid());
eventBuffer.putBackEvents(executionScope.getCustomResourceUid(), executionScope.getEvents());

if (newEventsExists) {
executeBufferedEvents(executionScope.getCustomResourceUid());
return;
}
Optional<Long> nextDelay = execution.nextDelay();
nextDelay.ifPresent(
delay ->
eventSourceManager
.getRetryTimerEventSource()
.scheduleOnce(executionScope.getCustomResource(), delay));
}

private void markSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) {
retryState.remove(executionScope.getCustomResourceUid());
eventSourceManager
.getRetryTimerEventSource()
.cancelOnceSchedule(executionScope.getCustomResourceUid());
}

private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) {
RetryExecution retryExecution = retryState.get(executionScope.getCustomResourceUid());
if (retryExecution == null) {
retryExecution = retry.initExecution();
retryState.put(executionScope.getCustomResourceUid(), retryExecution);
}
return retryExecution;
}

/**
* Here we try to cache the latest resource after an update. The goal is to solve a concurrency
* issue we've seen: If an execution is finished, where we updated a custom resource, but there
Expand Down Expand Up @@ -146,7 +211,7 @@ private void cacheUpdatedResourceIfChanged(
}

private void cleanupAfterDeletedEvent(String customResourceUid) {
defaultEventSourceManager.cleanup(customResourceUid);
eventSourceManager.cleanup(customResourceUid);
eventBuffer.cleanup(customResourceUid);
customResourceCache.cleanup(customResourceUid);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ public void addEvent(Event event) {
crEvents.add(event);
}

public boolean newEventsExists(String resourceId) {
return events.get(resourceId) != null && !events.get(resourceId).isEmpty();
}

public void putBackEvents(String resourceUid, List<Event> oldEvents) {
List<Event> crEvents =
events.computeIfAbsent(resourceUid, (id) -> new ArrayList<>(oldEvents.size()));
crEvents.addAll(0, oldEvents);
}

public boolean containsEvents(String customResourceId) {
return events.get(customResourceId) != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ public void setEventSourceManager(EventSourceManager eventSourceManager) {
this.eventSourceManager = eventSourceManager;
}

public PostExecutionControl handleEvent(ExecutionScope event) {
public PostExecutionControl handleExecution(ExecutionScope executionScope) {
try {
return handDispatch(event);
return handleDispatch(executionScope);
} catch (RuntimeException e) {
log.error("Error during event processing {} failed.", event, e);
return PostExecutionControl.defaultDispatch();
log.error("Error during event processing {} failed.", executionScope, e);
return PostExecutionControl.exceptionDuringExecution(e);
}
}

private PostExecutionControl handDispatch(ExecutionScope executionScope) {
private PostExecutionControl handleDispatch(ExecutionScope executionScope) {
CustomResource resource = executionScope.getCustomResource();
log.debug(
"Handling events: {} for resource {}", executionScope.getEvents(), resource.getMetadata());
Expand All @@ -66,7 +66,10 @@ private PostExecutionControl handDispatch(ExecutionScope executionScope) {
return PostExecutionControl.defaultDispatch();
}
Context context =
new DefaultContext(eventSourceManager, new EventList(executionScope.getEvents()));
new DefaultContext(
eventSourceManager,
new EventList(executionScope.getEvents()),
executionScope.getRetryInfo());
if (markedForDeletion(resource)) {
return handleDelete(resource, context);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ExecutionConsumer implements Runnable {

@Override
public void run() {
PostExecutionControl postExecutionControl = eventDispatcher.handleEvent(executionScope);
PostExecutionControl postExecutionControl = eventDispatcher.handleExecution(executionScope);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just loud thinking, wouldn't the code be simpler if we had 2 ExecutionConsumers called
SimpleConsumer and RetryableConsumer and let the consumer to take care of the retrying rather than the handler?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@psycho-ir thing is this is closely relates with the events are comming in while the controller is executed, that layer does not have enough info basically now. (Also its just really a wraper around executor).

defaultEventHandler.eventProcessingFinished(executionScope, postExecutionControl);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.javaoperatorsdk.operator.processing;

import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.RetryInfo;
import io.javaoperatorsdk.operator.processing.event.Event;
import java.util.List;

Expand All @@ -10,9 +11,12 @@ public class ExecutionScope {
// the latest custom resource from cache
private CustomResource customResource;

public ExecutionScope(List<Event> list, CustomResource customResource) {
private RetryInfo retryInfo;

public ExecutionScope(List<Event> list, CustomResource customResource, RetryInfo retryInfo) {
this.events = list;
this.customResource = customResource;
this.retryInfo = retryInfo;
}

public List<Event> getEvents() {
Expand All @@ -38,4 +42,8 @@ public String toString() {
+ customResource.getMetadata().getResourceVersion()
+ '}';
}

public RetryInfo getRetryInfo() {
return retryInfo;
}
}
Loading