Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.gson.reflect.TypeToken;
import io.kubernetes.client.common.KubernetesListObject;
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.informer.cache.Cache;
import io.kubernetes.client.informer.impl.DefaultSharedIndexInformer;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import okhttp3.Call;
import org.apache.commons.collections4.MapUtils;

Expand Down Expand Up @@ -140,8 +142,19 @@ SharedIndexInformer<ApiType> sharedIndexInformerFor(
ListerWatcher<ApiType, ApiListType> listerWatcher,
Class<ApiType> apiTypeClass,
long resyncPeriodInMillis) {
return sharedIndexInformerFor(listerWatcher, apiTypeClass, resyncPeriodInMillis, null);
}

public synchronized <ApiType extends KubernetesObject, ApiListType extends KubernetesListObject>
SharedIndexInformer<ApiType> sharedIndexInformerFor(
ListerWatcher<ApiType, ApiListType> listerWatcher,
Class<ApiType> apiTypeClass,
long resyncPeriodInMillis,
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {

SharedIndexInformer<ApiType> informer =
new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis);
new DefaultSharedIndexInformer<>(
apiTypeClass, listerWatcher, resyncPeriodInMillis, new Cache<>(), exceptionHandler);
this.informers.putIfAbsent(TypeToken.get(apiTypeClass).getType(), informer);
return informer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.MutablePair;
Expand Down Expand Up @@ -65,19 +66,34 @@ public class Controller<

private ScheduledFuture reflectorFuture;

/* visible for testing */ BiConsumer<Class<ApiType>, Throwable> exceptionHandler;

public Controller(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if these constructor chains are a good strategy long term. The best option I currently see is

  • add a builder for the class
  • use the all params constructor from the builder
  • deprecate all constructors with partial param sets

Also I would try to get rid of the many new calls here via factories. This could improve testability and introduce entry points for people who want to influence behaviour. A larger change but could be done in baby steps. What do you think @brendandburns ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you file this as an issue so we can have a broader discussion? I want to get @yue9944882 to weigh in as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay, have created issue #1995 for discussion. Looking forward to if :)

Class<ApiType> apiTypeClass,
DeltaFIFO queue,
ListerWatcher<ApiType, ApiListType> listerWatcher,
Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> processFunc,
Supplier<Boolean> resyncFunc,
long fullResyncPeriod) {
this(apiTypeClass, queue, listerWatcher, processFunc, resyncFunc, fullResyncPeriod, null);
}

public Controller(
Class<ApiType> apiTypeClass,
DeltaFIFO queue,
ListerWatcher<ApiType, ApiListType> listerWatcher,
Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> processFunc,
Supplier<Boolean> resyncFunc,
long fullResyncPeriod,
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {

this.queue = queue;
this.listerWatcher = listerWatcher;
this.apiTypeClass = apiTypeClass;
this.processFunc = processFunc;
this.resyncFunc = resyncFunc;
this.fullResyncPeriod = fullResyncPeriod;
this.exceptionHandler = exceptionHandler;

// starts one daemon thread for reflector
this.reflectExecutor =
Expand Down Expand Up @@ -113,7 +129,7 @@ public void run() {

synchronized (this) {
// TODO(yue9944882): proper naming for reflector
reflector = new ReflectorRunnable<ApiType, ApiListType>(apiTypeClass, listerWatcher, queue);
reflector = newReflector();
try {
reflectorFuture =
reflectExecutor.scheduleWithFixedDelay(
Expand All @@ -130,6 +146,10 @@ public void run() {
this.processLoop();
}

/* visible for testing */ ReflectorRunnable<ApiType, ApiListType> newReflector() {
return new ReflectorRunnable<>(apiTypeClass, listerWatcher, queue, exceptionHandler);
}

/** stops the resync thread pool firstly, then stop the reflector */
public void stop() {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,25 @@ public class ReflectorRunnable<

private AtomicBoolean isActive = new AtomicBoolean(true);

private final BiConsumer<Class<ApiType>, Throwable> exceptionHandler;
/* visible for testing */ final BiConsumer<Class<ApiType>, Throwable> exceptionHandler;

public ReflectorRunnable(
Class<ApiType> apiTypeClass, ListerWatcher listerWatcher, DeltaFIFO store) {
this(apiTypeClass, listerWatcher, store, ReflectorRunnable::defaultWatchErrorHandler);
Class<ApiType> apiTypeClass,
ListerWatcher<ApiType, ApiListType> listerWatcher,
DeltaFIFO store) {
this(apiTypeClass, listerWatcher, store, null);
}

public ReflectorRunnable(
Class<ApiType> apiTypeClass,
ListerWatcher listerWatcher,
ListerWatcher<ApiType, ApiListType> listerWatcher,
DeltaFIFO store,
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {
this.listerWatcher = listerWatcher;
this.store = store;
this.apiTypeClass = apiTypeClass;
this.exceptionHandler = exceptionHandler;
this.exceptionHandler =
exceptionHandler == null ? ReflectorRunnable::defaultWatchErrorHandler : exceptionHandler;
}

/**
Expand Down Expand Up @@ -277,7 +280,7 @@ private void watchHandler(Watchable<ApiType> watch) {
}
}

private static <ApiType extends KubernetesObject> void defaultWatchErrorHandler(
static <ApiType extends KubernetesObject> void defaultWatchErrorHandler(
Class<ApiType> watchingApiTypeClass, Throwable t) {
log.error(String.format("%s#Reflector loop failed unexpectedly", watchingApiTypeClass), t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.MutablePair;
Expand Down Expand Up @@ -82,7 +83,28 @@ public DefaultSharedIndexInformer(
new DeltaFIFO(
(Function<KubernetesObject, String>) cache.getKeyFunc(),
(Cache<KubernetesObject>) cache),
cache);
cache,
null);
}

public DefaultSharedIndexInformer(
Class<ApiType> apiTypeClass,
ListerWatcher<ApiType, ApiListType> listerWatcher,
long resyncPeriod,
Cache<ApiType> cache,
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {
this(
apiTypeClass,
listerWatcher,
resyncPeriod,
// down-casting should be safe here because one delta FIFO instance only serves one
// resource
// type
new DeltaFIFO(
(Function<KubernetesObject, String>) cache.getKeyFunc(),
(Cache<KubernetesObject>) cache),
cache,
exceptionHandler);
}

public DefaultSharedIndexInformer(
Expand All @@ -91,19 +113,31 @@ public DefaultSharedIndexInformer(
long resyncPeriod,
DeltaFIFO deltaFIFO,
Indexer<ApiType> indexer) {
this(apiTypeClass, listerWatcher, resyncPeriod, deltaFIFO, indexer, null);
}

public DefaultSharedIndexInformer(
Class<ApiType> apiTypeClass,
ListerWatcher<ApiType, ApiListType> listerWatcher,
long resyncPeriod,
DeltaFIFO deltaFIFO,
Indexer<ApiType> indexer,
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {

this.resyncCheckPeriodMillis = resyncPeriod;
this.defaultEventHandlerResyncPeriod = resyncPeriod;

this.processor = new SharedProcessor<>();
this.indexer = indexer;
this.controller =
new Controller<ApiType, ApiListType>(
new Controller<>(
apiTypeClass,
deltaFIFO,
listerWatcher,
this::handleDeltas,
processor::shouldResync,
resyncCheckPeriodMillis);
resyncCheckPeriodMillis,
exceptionHandler);

controllerThread =
new Thread(controller::run, "informer-controller-" + apiTypeClass.getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static org.junit.Assert.*;

import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.informer.EventType;
import io.kubernetes.client.informer.ListerWatcher;
import io.kubernetes.client.openapi.models.V1ListMeta;
Expand All @@ -23,13 +24,36 @@
import io.kubernetes.client.util.Watch;
import java.time.Duration;
import java.util.Arrays;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.MutablePair;
import org.awaitility.Awaitility;
import org.hamcrest.core.IsEqual;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

public class ControllerTest {

@Rule public MockitoRule mockitoRule = MockitoJUnit.rule();

private static final Class<V1Pod> anyApiTypeClass = V1Pod.class;
private static final long anyFullResyncPeriod = 1000L;

@Mock private DeltaFIFO deltaFIFOMock;
@Mock private ListerWatcher<V1Pod, V1PodList> listerWatcherMock;

@Mock
private Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> popProcessFuncMock;

@Mock private Supplier<Boolean> resyncFuncMock;
@Mock private BiConsumer<Class<V1Pod>, Throwable> exceptionHandlerMock;

@Test
public void testControllerProcessDeltas() {

Expand Down Expand Up @@ -75,4 +99,37 @@ public void testControllerProcessDeltas() {
controller.stop();
}
}

@Test
public void testReflectorIsConstructedWithExeptionHandler() {
Controller<V1Pod, V1PodList> controller =
new Controller<>(
anyApiTypeClass,
deltaFIFOMock,
listerWatcherMock,
popProcessFuncMock,
resyncFuncMock,
anyFullResyncPeriod,
exceptionHandlerMock);
assertSame(exceptionHandlerMock, controller.exceptionHandler);

ReflectorRunnable<V1Pod, V1PodList> reflector = controller.newReflector();

assertSame(exceptionHandlerMock, reflector.exceptionHandler);
}

@Test
public void testControllerHasNoExceptionHandlerPerDefault() {

Controller<V1Pod, V1PodList> controller =
new Controller<>(
anyApiTypeClass,
deltaFIFOMock,
listerWatcherMock,
popProcessFuncMock,
resyncFuncMock,
anyFullResyncPeriod);

assertNull(controller.exceptionHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
package io.kubernetes.client.informer.cache;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
Expand All @@ -35,6 +37,7 @@
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.awaitility.Awaitility;
import org.hamcrest.core.IsEqual;
import org.junit.Test;
Expand All @@ -45,10 +48,14 @@
@RunWith(MockitoJUnitRunner.class)
public class ReflectorRunnableTest {

private static final Class<V1Pod> anyApiType = V1Pod.class;

@Mock private DeltaFIFO deltaFIFO;

@Mock private ListerWatcher<V1Pod, V1PodList> listerWatcher;

@Mock private BiConsumer<Class<V1Pod>, Throwable> exceptionHandler;

@Test
public void testReflectorRunOnce() throws ApiException {
String mockResourceVersion = "1000";
Expand Down Expand Up @@ -343,4 +350,20 @@ public void testReflectorListShouldHandleExpiredResourceVersionFromWatchHandler(
reflectorRunnable.stop();
}
}

@Test
public void testDefaultExceptionHandlerSetPerDefault() {
ReflectorRunnable<V1Pod, V1PodList> reflector =
new ReflectorRunnable<>(anyApiType, listerWatcher, deltaFIFO);

assertNotNull(reflector.exceptionHandler);
}

@Test
public void testGivemExceptionHandlerSet() {
ReflectorRunnable<V1Pod, V1PodList> reflector =
new ReflectorRunnable<>(anyApiType, listerWatcher, deltaFIFO, exceptionHandler);

assertSame(exceptionHandler, reflector.exceptionHandler);
}
}
Loading