Skip to content

Anil: remove stranded dpi's that are no longer present in recheck domain list #2314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

package oracle.kubernetes.operator;

import java.util.Set;
import java.util.stream.Stream;

import io.kubernetes.client.openapi.models.CoreV1Event;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1Pod;
Expand Down Expand Up @@ -64,5 +67,17 @@ public interface DomainProcessor {
/**
* If the logging level is high enough, reports on any fibers which may currently be suspended.
*/
void reportSuspendedFibers();
default void reportSuspendedFibers() {
// no-op
}

/**
* Finds stranded cached domain presence infos that are not identified by the key set.
* @param namespace namespace
* @param domainUids domain UID key set
* @return stream of cached domain presence infos.
*/
default Stream<DomainPresenceInfo> findStrandedDomainPresenceInfos(String namespace, Set<String> domainUids) {
return Stream.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@

package oracle.kubernetes.operator;

import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;

import io.kubernetes.client.openapi.models.CoreV1Event;
import io.kubernetes.client.openapi.models.V1ConfigMap;
Expand Down Expand Up @@ -346,6 +349,12 @@ public void reportSuspendedFibers() {
}
}

@Override
public Stream<DomainPresenceInfo> findStrandedDomainPresenceInfos(String namespace, Set<String> domainUids) {
return Optional.ofNullable(DOMAINS.get(namespace)).orElse(Collections.emptyMap())
.entrySet().stream().filter(e -> !domainUids.contains(e.getKey())).map(Map.Entry::getValue);
}

private String getDomainUid(Fiber fiber) {
return Optional.ofNullable(fiber)
.map(Fiber::getPacket)
Expand Down Expand Up @@ -731,13 +740,28 @@ class MakeRightDomainOperationImpl implements MakeRightDomainOperation {
MakeRightDomainOperationImpl(DomainPresenceInfo liveInfo) {
this.liveInfo = liveInfo;
DomainPresenceInfo cachedInfo = getExistingDomainPresenceInfo(getNamespace(), getDomainUid());
if ((liveInfo.getDomain() != null) && (!isNewDomain(cachedInfo))
&& (liveInfo.getDomain().getMetadata().getCreationTimestamp()
.isAfter(cachedInfo.getDomain().getMetadata().getCreationTimestamp()))) {
if (liveInfo.getDomain() != null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is probably not needed now since the getCreationTimestamp() method uses Optional.ofNullable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to merge in to your original branch and then I'll see if we can remove that check. (I've not looked at what isNewDomain does)

&& !isNewDomain(cachedInfo)
&& isAfter(getCreationTimestamp(liveInfo), getCreationTimestamp(cachedInfo))) {
willInterrupt = true;
}
}

private OffsetDateTime getCreationTimestamp(DomainPresenceInfo dpi) {
return Optional.ofNullable(dpi.getDomain())
.map(Domain::getMetadata).map(V1ObjectMeta::getCreationTimestamp).orElse(null);
}

private boolean isAfter(OffsetDateTime one, OffsetDateTime two) {
if (two == null) {
return true;
}
if (one == null) {
return false;
}
return one.isAfter(two);
}

/**
* Modifies the factory to run even if the domain spec is unchanged.
* @return the updated factory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.kubernetes.client.openapi.models.CoreV1Event;
import io.kubernetes.client.openapi.models.CoreV1EventList;
Expand Down Expand Up @@ -71,7 +70,7 @@ Consumer<DomainList> getDomainListProcessing() {
@Override
void completeProcessing(Packet packet) {
DomainProcessor dp = Optional.ofNullable(packet.getSpi(DomainProcessor.class)).orElse(processor);
getStrandedDomainPresenceInfos().forEach(info -> removeStrandedDomainPresenceInfo(dp, info));
getStrandedDomainPresenceInfos(dp).forEach(info -> removeStrandedDomainPresenceInfo(dp, info));
getActiveDomainPresenceInfos().forEach(info -> activateDomain(dp, info));
}
};
Expand Down Expand Up @@ -131,8 +130,9 @@ private void addDomain(Domain domain) {
getDomainPresenceInfo(domain.getDomainUid()).setDomain(domain);
}

private Set<DomainPresenceInfo> getStrandedDomainPresenceInfos() {
return domainPresenceInfoMap.values().stream().filter(this::isStranded).collect(Collectors.toSet());
private Stream<DomainPresenceInfo> getStrandedDomainPresenceInfos(DomainProcessor dp) {
return Stream.concat(domainPresenceInfoMap.values().stream().filter(this::isStranded),
dp.findStrandedDomainPresenceInfos(namespace, domainPresenceInfoMap.keySet()));
}

private boolean isStranded(DomainPresenceInfo dpi) {
Expand All @@ -145,8 +145,8 @@ private static void removeStrandedDomainPresenceInfo(DomainProcessor dp, DomainP
dp.createMakeRightOperation(info).withExplicitRecheck().forDeletion().execute();
}

private Set<DomainPresenceInfo> getActiveDomainPresenceInfos() {
return domainPresenceInfoMap.values().stream().filter(this::isActive).collect(Collectors.toSet());
private Stream<DomainPresenceInfo> getActiveDomainPresenceInfos() {
return domainPresenceInfoMap.values().stream().filter(this::isActive);
}

private boolean isActive(DomainPresenceInfo dpi) {
Expand All @@ -155,11 +155,11 @@ private boolean isActive(DomainPresenceInfo dpi) {

private static void activateDomain(DomainProcessor dp, DomainPresenceInfo info) {
info.setPopulated(true);
if (info.getDomain().getStatus() != null) {
dp.createMakeRightOperation(info).withExplicitRecheck().execute();
} else {
dp.createMakeRightOperation(info).interrupt().withExplicitRecheck().execute();
MakeRightDomainOperation makeRight = dp.createMakeRightOperation(info).withExplicitRecheck();
if (info.getDomain().getStatus() == null) {
makeRight = makeRight.interrupt();
}
makeRight.execute();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import com.meterware.simplestub.Memento;
import com.meterware.simplestub.StaticStubSupport;
Expand Down Expand Up @@ -42,15 +44,20 @@
import static oracle.kubernetes.operator.helpers.TuningParametersStub.CALL_REQUEST_LIMIT;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.junit.MatcherAssert.assertThat;

@SuppressWarnings("SameParameterValue")
public class DomainPresenceTest extends ThreadFactoryTestBase {

private static final String NS = "default";
private static final String UID = "UID1";
private static final String UID1 = "UID1";
private static final String UID2 = "UID2";
private static final String UID3 = "UID3";
private static final int LAST_DOMAIN_NUM = 2 * CALL_REQUEST_LIMIT - 1;

private final List<Memento> mementos = new ArrayList<>();
Expand Down Expand Up @@ -86,13 +93,33 @@ public void whenNoPreexistingDomains_createEmptyDomainPresenceInfoMap() {

@Test
public void whenPreexistingDomainExistsWithoutPodsOrServices_addToPresenceMap() {
Domain domain = createDomain(UID, NS);
Domain domain = createDomain(UID1, NS);
testSupport.defineResources(domain);

testSupport.addComponent("DP", DomainProcessor.class, dp);
testSupport.runSteps(domainNamespaces.readExistingResources(NS, dp));

assertThat(getDomainPresenceInfo(dp, UID).getDomain(), equalTo(domain));
assertThat(getDomainPresenceInfo(dp, UID1).getDomain(), equalTo(domain));
}

@Test
public void whenDomainsDeletedButAlreadyInPresence_deleteFromPresenceMap() {
Domain domain1 = createDomain(UID1, NS);
Domain domain2 = createDomain(UID2, NS);
Domain domain3 = createDomain(UID3, NS);
testSupport.defineResources(domain1, domain2, domain3);

testSupport.addComponent("DP", DomainProcessor.class, dp);
testSupport.runSteps(domainNamespaces.readExistingResources(NS, dp));

assertThat(getDomainPresenceInfoMap(dp).keySet(), hasSize(3));

testSupport.deleteResources(domain2);

testSupport.runSteps(domainNamespaces.readExistingResources(NS, dp));

assertThat(getDomainPresenceInfoMap(dp).keySet(), hasSize(2));
assertThat(getDomainPresenceInfoMap(dp), not(hasKey(UID2)));
}

private void addDomainResource(String uid, String namespace) {
Expand All @@ -111,6 +138,10 @@ private Domain createDomain(String uid, String namespace) {
.withStatus(new DomainStatus());
}

private Map<String, DomainPresenceInfo> getDomainPresenceInfoMap(DomainProcessorStub dp) {
return dp.getDomainPresenceInfos();
}

private DomainPresenceInfo getDomainPresenceInfo(DomainProcessorStub dp, String uid) {
return dp.getDomainPresenceInfos().get(uid);
}
Expand Down Expand Up @@ -149,26 +180,26 @@ private V1ObjectMeta createMetadata(String uid, String namespace, String name) {

@Test
public void whenK8sHasOneDomain_recordAdminServerService() {
addDomainResource(UID, NS);
V1Service service = createServerService(UID, NS, "admin");
addDomainResource(UID1, NS);
V1Service service = createServerService(UID1, NS, "admin");
testSupport.defineResources(service);

testSupport.addComponent("DP", DomainProcessor.class, dp);
testSupport.runSteps(domainNamespaces.readExistingResources(NS, dp));

assertThat(getDomainPresenceInfo(dp, UID).getServerService("admin"), equalTo(service));
assertThat(getDomainPresenceInfo(dp, UID1).getServerService("admin"), equalTo(service));
}

@Test
public void whenK8sHasOneDomainWithPod_recordPodPresence() {
addDomainResource(UID, NS);
V1Pod pod = createPodResource(UID, NS, "admin");
addDomainResource(UID1, NS);
V1Pod pod = createPodResource(UID1, NS, "admin");
testSupport.defineResources(pod);

testSupport.addComponent("DP", DomainProcessor.class, dp);
testSupport.runSteps(domainNamespaces.readExistingResources(NS, dp));

assertThat(getDomainPresenceInfo(dp, UID).getServerPod("admin"), equalTo(pod));
assertThat(getDomainPresenceInfo(dp, UID1).getServerPod("admin"), equalTo(pod));
}

private V1Pod createPodResource(String uid, String namespace, String serverName) {
Expand All @@ -181,14 +212,14 @@ private void addPodResource(String uid, String namespace, String serverName) {

@Test
public void whenK8sHasOneDomainWithOtherEvent_ignoreIt() {
addDomainResource(UID, NS);
addPodResource(UID, NS, "admin");
addEventResource(UID, "admin", "ignore this event");
addDomainResource(UID1, NS);
addPodResource(UID1, NS, "admin");
addEventResource(UID1, "admin", "ignore this event");

testSupport.addComponent("DP", DomainProcessor.class, dp);
testSupport.runSteps(domainNamespaces.readExistingResources(NS, dp));

assertThat(getDomainPresenceInfo(dp, UID).getLastKnownServerStatus("admin"), nullValue());
assertThat(getDomainPresenceInfo(dp, UID1).getLastKnownServerStatus("admin"), nullValue());
}

private void addEventResource(String uid, String serverName, String message) {
Expand All @@ -204,16 +235,16 @@ private CoreV1Event createEventResource(String uid, String serverName, String me

@Test
public void whenStrandedResourcesExist_removeThem() {
V1Service service1 = createServerService(UID, NS, "admin");
V1Service service2 = createServerService(UID, NS, "ms1");
V1PersistentVolume volume = new V1PersistentVolume().metadata(createMetadata(UID, "volume1"));
V1Service service1 = createServerService(UID1, NS, "admin");
V1Service service2 = createServerService(UID1, NS, "ms1");
V1PersistentVolume volume = new V1PersistentVolume().metadata(createMetadata(UID1, "volume1"));
V1PersistentVolumeClaim claim =
new V1PersistentVolumeClaim().metadata(createMetadata(UID, NS, "claim1"));
new V1PersistentVolumeClaim().metadata(createMetadata(UID1, NS, "claim1"));
testSupport.defineResources(service1, service2, volume, claim);

testSupport.runSteps(domainNamespaces.readExistingResources(NS, dp));

assertThat(dp.isDeletingStrandedResources(UID), is(true));
assertThat(dp.isDeletingStrandedResources(UID1), is(true));
}

@Test
Expand All @@ -238,7 +269,7 @@ private void createDomains(int lastDomainNum) {
}

public abstract static class DomainProcessorStub implements DomainProcessor {
private final Map<String, DomainPresenceInfo> dpis = new HashMap<>();
private final Map<String, DomainPresenceInfo> dpis = new ConcurrentHashMap<>();
private final List<MakeRightDomainOperationStub> operationStubs = new ArrayList<>();

Map<String, DomainPresenceInfo> getDomainPresenceInfos() {
Expand All @@ -251,6 +282,11 @@ boolean isDeletingStrandedResources(String uid) {
.orElse(false);
}

@Override
public Stream<DomainPresenceInfo> findStrandedDomainPresenceInfos(String namespace, Set<String> domainUids) {
return dpis.entrySet().stream().filter(e -> !domainUids.contains(e.getKey())).map(Map.Entry::getValue);
}

private MakeRightDomainOperationStub getMakeRightOperations(String uid) {
return operationStubs.stream().filter(s -> uid.equals(s.getUid())).findFirst().orElse(null);
}
Expand Down Expand Up @@ -306,7 +342,11 @@ public MakeRightDomainOperation forDeletion() {

@Override
public void execute() {
dpis.put(info.getDomainUid(), info);
if (deleting) {
dpis.remove(info.getDomainUid());
} else {
dpis.put(info.getDomainUid(), info);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class NamespaceTest {
private final List<Memento> mementos = new ArrayList<>();
private final Set<String> currentNamespaces = new HashSet<>();
private final DomainNamespaces domainNamespaces = new DomainNamespaces();
private final DomainProcessorStub dp = Stub.createStub(DomainProcessorStub.class);
private final DomainProcessorStub dp = Stub.createNiceStub(DomainProcessorStub.class);
private final MainDelegateStub delegate = createStrictStub(MainDelegateStub.class, dp, domainNamespaces);

@BeforeEach
Expand Down Expand Up @@ -186,9 +186,6 @@ private void unspecifyDomainNamespace(String namespace) {
}

abstract static class DomainProcessorStub implements DomainProcessor {
@Override
public void reportSuspendedFibers() {
}
}

abstract static class MainDelegateStub implements MainDelegate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.stream.Stream;

import com.meterware.simplestub.Memento;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
Expand Down Expand Up @@ -222,6 +224,11 @@ public MakeRightDomainOperation createMakeRightOperation(DomainPresenceInfo info
Optional.ofNullable(info).map(DomainPresenceInfo::getDomain).ifPresent(delegateStub.invocations::add);
return createStrictStub(MakeRightDomainOperationStub.class);
}

@Override
public Stream<DomainPresenceInfo> findStrandedDomainPresenceInfos(String namespace, Set<String> domainUids) {
return Stream.empty();
}
}

abstract static class MakeRightDomainOperationStub implements MakeRightDomainOperation {
Expand Down
Loading