diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerEventListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerEventListener.java index 0b4702e3dd0..6bda783673f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerEventListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerEventListener.java @@ -16,6 +16,11 @@ package io.fabric8.kubernetes.client.informers; public interface SharedInformerEventListener { + + /** + * @deprecated Use {@link #onException(SharedIndexInformer, Exception)} instead + */ + @Deprecated void onException(Exception exception); default void onException(SharedIndexInformer informer, Exception e) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index e44bf825633..1e58746b48b 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -54,8 +54,8 @@ public class DefaultSharedIndexInformer reflector; private final Class apiTypeClass; private final ProcessorStore processorStore; @@ -63,11 +63,11 @@ public class DefaultSharedIndexInformer processor; private final Executor informerExecutor; - private AtomicBoolean started = new AtomicBoolean(); + private final AtomicBoolean started = new AtomicBoolean(); private volatile boolean stopped = false; private ScheduledFuture resyncFuture; - + public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context, Executor informerExecutor) { if (resyncPeriod < 0) { throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value"); @@ -81,7 +81,7 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis this.processor = new SharedProcessor<>(new SerialExecutor(informerExecutor)); this.indexer = new Cache<>(); this.indexer.setIsRunning(this::isRunning); - + processorStore = new ProcessorStore<>(this.indexer, this.processor); this.reflector = new Reflector<>(apiTypeClass, listerWatcher, processorStore, context); } @@ -121,9 +121,9 @@ public void addEventHandlerWithResyncPeriod(ResourceEventHandler handler, lon } } } - + ProcessorListener listener = this.processor.addProcessorListener(handler, determineResyncPeriod(resyncPeriodMillis, this.resyncCheckPeriodMillis)); - + if (!started.get()) { return; } @@ -151,7 +151,7 @@ public void run() { log.info("informer#Controller: ready to run resync and reflector for {} with resync {}", apiTypeClass, resyncCheckPeriodMillis); scheduleResync(processor::shouldResync); - + reflector.listSyncAndWatch(); // stop called while run is called could be ineffective, check for it afterwards synchronized (this) { @@ -204,9 +204,9 @@ private long determineResyncPeriod(long desired, long check) { @Override public boolean isRunning() { - return !stopped && started.get() && reflector.isRunning(); + return !stopped && started.get() && reflector.isRunning(); } - + synchronized void scheduleResync(Supplier resyncFunc) { // schedule the resync runnable if (resyncCheckPeriodMillis > 0) { @@ -216,15 +216,15 @@ synchronized void scheduleResync(Supplier resyncFunc) { log.debug("informer#Controller: resync skipped due to 0 full resync period {}", apiTypeClass); } } - + public long getFullResyncPeriod() { return resyncCheckPeriodMillis; } - + ScheduledFuture getResyncFuture() { return resyncFuture; } - + @Override public Class getApiTypeClass() { return apiTypeClass; diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java index ecd15b2a5fa..bfce1d4357e 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java @@ -18,44 +18,27 @@ import io.fabric8.kubernetes.api.model.ListOptions; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; -import okhttp3.OkHttpClient; -import okhttp3.Request; +import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.WebSocket; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; class AbstractWatchManagerTest { - private MockedStatic executors; - private ScheduledExecutorService executorService; - - @BeforeEach - void setUp() { - executorService = mock(ScheduledExecutorService.class, RETURNS_DEEP_STUBS); - executors = mockStatic(Executors.class); - executors.when(() -> Executors.newSingleThreadScheduledExecutor(any())).thenReturn(executorService); - } - - @AfterEach - void tearDown() { - executors.close(); - } - @Test @DisplayName("closeEvent, is idempotent, multiple calls only close watcher once") void closeEventIsIdempotent() { @@ -71,7 +54,7 @@ void closeEventIsIdempotent() { } @Test - @DisplayName("closeEvent with Exception, is idempotent, multiple calls only close watcher once") + @DisplayName("closeEvent, with Exception, is idempotent, multiple calls only close watcher once") void closeEventWithExceptionIsIdempotent() { // Given final WatcherAdapter watcher = new WatcherAdapter<>(); @@ -100,7 +83,7 @@ void closeWebSocket() { void nextReconnectInterval() { // Given final WatchManager awm = new WatchManager<>( - null, mock(ListOptions.class), 0, 10, 5, null); + null, mock(ListOptions.class), 0, 10, 5); // When-Then assertThat(awm.nextReconnectInterval()).isEqualTo(10); assertThat(awm.nextReconnectInterval()).isEqualTo(20); @@ -111,10 +94,65 @@ void nextReconnectInterval() { assertThat(awm.nextReconnectInterval()).isEqualTo(320); } + @Test + @DisplayName("cancelReconnect, with null attempt, should do nothing") + void cancelReconnectNullAttempt() { + // Given + final ScheduledFuture sf = spy(ScheduledFuture.class); + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); + // When + awm.cancelReconnect(); + // Then + verify(sf, times(0)).cancel(true); + } + + @Test + @DisplayName("cancelReconnect, with non-null attempt, should cancel") + void cancelReconnectNonNullAttempt() { + // Given + final ScheduledFuture sf = mock(ScheduledFuture.class); + final MockedStatic utils = mockStatic(Utils.class); + utils.when(() -> Utils.schedule(any(), any(), anyLong(), any())).thenReturn(sf); + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); + awm.scheduleReconnect(null, false); + // When + awm.cancelReconnect(); + // Then + verify(sf, times(1)).cancel(true); + } + + @Test + @DisplayName("isClosed, after close invocation, should return true") + void isForceClosedWhenClosed() { + // Given + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); + awm.initRunner(mock(AbstractWatchManager.ClientRunner.class)); + // When + awm.close(); + // Then + assertThat(awm.isForceClosed()).isTrue(); + } + + @Test + @DisplayName("close, after close invocation, should return true") + void closeWithNonNullRunnerShouldCancelRunner() { + // Given + final AbstractWatchManager.ClientRunner clientRunner = mock(AbstractWatchManager.ClientRunner.class); + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); + awm.initRunner(clientRunner); + // When + awm.close(); + // Then + verify(clientRunner, times(1)).close(); + } + private static WatchManager withDefaultWatchManager(Watcher watcher) { return new WatchManager<>( - watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 0, 0, 0, - mock(OkHttpClient.class)); + watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 0, 0, 0); } private static class WatcherAdapter implements Watcher { @@ -122,7 +160,7 @@ private static class WatcherAdapter implements Watcher { @Override public void eventReceived(Action action, T resource) {} - + @Override public void onClose(WatcherException cause) { closeCount.addAndGet(1); @@ -136,20 +174,8 @@ public void onClose() { private static final class WatchManager extends AbstractWatchManager { - public WatchManager(Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, OkHttpClient clonedClient) { + public WatchManager(Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) { super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, resourceVersion -> null); - initRunner(new ClientRunner(clonedClient) { - @Override - void run(Request request) {} - - @Override - OkHttpClient cloneAndCustomize(OkHttpClient client) { - return clonedClient; - } - }); - } - @Override - public void close() { } } } diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index 6fc6febf70b..b19015b057b 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -64,6 +64,7 @@ import java.util.function.BiFunction; import java.util.function.Function; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -851,9 +852,14 @@ public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) { @Test void testRunAfterStop() { + // Given SharedIndexInformer podInformer = factory.sharedIndexInformerFor(Pod.class, 0); podInformer.stop(); - assertThrows(IllegalStateException.class, podInformer::run); + // When + final IllegalStateException result = assertThrows(IllegalStateException.class, podInformer::run); + // Then + assertThat(result) + .hasMessage("Cannot restart a stopped informer"); } private KubernetesResource getAnimal(String name, String order, String resourceVersion) {