Skip to content

fix: max reconciliation interval applies after retry exhaustion #1491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix: max reconciliation interval applies after retry exhaustion
  • Loading branch information
csviri committed Sep 20, 2022
commit 95afaee4038b80e9a8fa2510ed480ae61279dad8
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public Controller(Reconciler<P> reconciler,
managedWorkflow =
ManagedWorkflow.workflowFor(kubernetesClient, configuration.getDependentResources());
eventSourceManager = new EventSourceManager<>(this);
eventProcessor = new EventProcessor<>(eventSourceManager);
eventProcessor = new EventProcessor<>(getConfiguration(), eventSourceManager);
eventSourceManager.postProcessDefaultEventSourcesAfterProcessorInitializer();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
Expand All @@ -37,6 +38,7 @@ public class EventProcessor<R extends HasMetadata> implements EventHandler, Life
private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50;

private volatile boolean running;
private final ControllerConfiguration<?> controllerConfiguration;
private final ReconciliationDispatcher<R> reconciliationDispatcher;
private final Retry retry;
private final ExecutorService executor;
Expand All @@ -48,60 +50,55 @@ public class EventProcessor<R extends HasMetadata> implements EventHandler, Life
private final ResourceStateManager resourceStateManager = new ResourceStateManager();
private final Map<String, Object> metricsMetadata;

public EventProcessor(EventSourceManager<R> eventSourceManager) {

public EventProcessor(ControllerConfiguration<?> controllerConfiguration,
EventSourceManager<R> eventSourceManager) {
this(
controllerConfiguration,
eventSourceManager.getControllerResourceEventSource(),
ExecutorServiceManager.instance().executorService(),
eventSourceManager.getController().getConfiguration().getName(),
new ReconciliationDispatcher<>(eventSourceManager.getController()),
eventSourceManager.getController().getConfiguration().getRetry(),
ConfigurationServiceProvider.instance().getMetrics(),
eventSourceManager.getController().getConfiguration().getRateLimiter(),
eventSourceManager);
}

@SuppressWarnings("rawtypes")
EventProcessor(
ControllerConfiguration controllerConfiguration,
ReconciliationDispatcher<R> reconciliationDispatcher,
EventSourceManager<R> eventSourceManager,
String relatedControllerName,
Retry retry,
RateLimiter rateLimiter,
Metrics metrics) {
this(
controllerConfiguration,
eventSourceManager.getControllerResourceEventSource(),
null,
relatedControllerName,
reconciliationDispatcher,
retry,
metrics,
rateLimiter,
eventSourceManager);
}

@SuppressWarnings({"rawtypes", "unchecked"})
private EventProcessor(
ControllerConfiguration controllerConfiguration,
Cache<R> cache,
ExecutorService executor,
String relatedControllerName,
ReconciliationDispatcher<R> reconciliationDispatcher,
Retry retry,
Metrics metrics,
RateLimiter rateLimiter,
EventSourceManager<R> eventSourceManager) {
this.controllerConfiguration = controllerConfiguration;
this.running = false;
this.executor =
executor == null
? new ScheduledThreadPoolExecutor(
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER)
: executor;
this.controllerName = relatedControllerName;
this.controllerName = controllerConfiguration.getName();
this.reconciliationDispatcher = reconciliationDispatcher;
this.retry = retry;
this.retry = controllerConfiguration.getRetry();
this.cache = cache;
this.metrics = metrics != null ? metrics : Metrics.NOOP;
this.eventSourceManager = eventSourceManager;
this.rateLimiter = rateLimiter;
this.rateLimiter = controllerConfiguration.getRateLimiter();

metricsMetadata = Optional.ofNullable(eventSourceManager.getController())
.map(Controller::getAssociatedGroupVersionKind)
Expand Down Expand Up @@ -272,18 +269,31 @@ synchronized void eventProcessingFinished(
reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource());
}
}

}

private void reScheduleExecutionIfInstructed(
PostExecutionControl<R> postExecutionControl, R customResource) {

postExecutionControl
.getReScheduleDelay()
.ifPresent(delay -> {
.ifPresentOrElse(delay -> {
var resourceID = ResourceID.fromResource(customResource);
log.debug("ReScheduling event for resource: {} with delay: {}",
resourceID, delay);
retryEventSource().scheduleOnce(resourceID, delay);
}, () -> scheduleExecutionForMaxReconciliationInterval(customResource));
}

private void scheduleExecutionForMaxReconciliationInterval(R customResource) {
this.controllerConfiguration
.maxReconciliationInterval()
.ifPresent(m -> {
var resourceID = ResourceID.fromResource(customResource);
var delay = m.toMillis();
log.debug("ReScheduling event for resource because for max reconciliation interval: " +
"{} with delay: {}",
resourceID, delay);
retryEventSource().scheduleOnce(resourceID, delay);
});
}

Expand Down Expand Up @@ -319,7 +329,10 @@ private void handleRetryOnException(
metrics.failedReconciliation(resourceID, exception, metricsMetadata);
retryEventSource().scheduleOnce(resourceID, delay);
},
() -> log.error("Exhausted retries for {}", executionScope));
() -> {
log.error("Exhausted retries for {}", executionScope);
scheduleExecutionForMaxReconciliationInterval(executionScope.getResource());
});
}

private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,7 @@ private PostExecutionControl<P> createPostExecutionControl(P updatedCustomResour
private void updatePostExecutionControlWithReschedule(
PostExecutionControl<P> postExecutionControl,
BaseControl<?> baseControl) {
baseControl.getScheduleDelay().ifPresentOrElse(postExecutionControl::withReSchedule,
() -> controller.getConfiguration().maxReconciliationInterval()
.ifPresent(m -> postExecutionControl.withReSchedule(m.toMillis())));
baseControl.getScheduleDelay().ifPresent(postExecutionControl::withReSchedule);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.RetryConfiguration;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
Expand All @@ -25,6 +26,7 @@
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import io.javaoperatorsdk.operator.processing.retry.Retry;
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;

import static io.javaoperatorsdk.operator.TestUtils.markForDeletion;
Expand Down Expand Up @@ -70,12 +72,15 @@ void setup() {
when(eventSourceManagerMock.getControllerResourceEventSource())
.thenReturn(controllerResourceEventSourceMock);
eventProcessor =
spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null,
rateLimiterMock, null));
spy(new EventProcessor(controllerConfiguration(null, rateLimiterMock),
reconciliationDispatcherMock,
eventSourceManagerMock, null));
eventProcessor.start();
eventProcessorWithRetry =
spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test",
GenericRetry.defaultLimitedExponentialRetry(), rateLimiterMock, null));
spy(new EventProcessor(
controllerConfiguration(GenericRetry.defaultLimitedExponentialRetry(),
rateLimiterMock),
reconciliationDispatcherMock, eventSourceManagerMock, null));
eventProcessorWithRetry.start();
when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock);
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);
Expand Down Expand Up @@ -258,8 +263,9 @@ void cancelScheduleOnceEventsOnSuccessfulExecution() {
void startProcessedMarkedEventReceivedBefore() {
var crID = new ResourceID("test-cr", TEST_NAMESPACE);
eventProcessor =
spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null,
LinearRateLimiter.deactivatedRateLimiter(),
spy(new EventProcessor(controllerConfiguration(null,
LinearRateLimiter.deactivatedRateLimiter()), reconciliationDispatcherMock,
eventSourceManagerMock,
metricsMock));
when(controllerResourceEventSourceMock.get(eq(crID)))
.thenReturn(Optional.of(testCustomResource()));
Expand Down Expand Up @@ -407,4 +413,12 @@ private void overrideData(ResourceID id, HasMetadata applyTo) {
applyTo.getMetadata().setNamespace(id.getNamespace().orElse(null));
}

ControllerConfiguration controllerConfiguration(Retry retry, RateLimiter rateLimiter) {
ControllerConfiguration res = mock(ControllerConfiguration.class);
when(res.getName()).thenReturn("Test");
when(res.getRetry()).thenReturn(retry);
when(res.getRateLimiter()).thenReturn(rateLimiter);
return res;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -600,19 +600,6 @@ void errorStatusHandlerCanPatchResource() {
any(), any());
}

@Test
void schedulesReconciliationIfMaxDelayIsSet() {
testCustomResource.addFinalizer(DEFAULT_FINALIZER);

reconciler.reconcile = (r, c) -> UpdateControl.noUpdate();

PostExecutionControl control =
reconciliationDispatcher.handleExecution(executionScopeWithCREvent(testCustomResource));

assertThat(control.getReScheduleDelay()).isPresent()
.hasValue(TimeUnit.HOURS.toMillis(RECONCILIATION_MAX_INTERVAL));
}

@Test
void canSkipSchedulingMaxDelayIf() {
testCustomResource.addFinalizer(DEFAULT_FINALIZER);
Expand Down