Skip to content

OWLS-88571 - Potential fixes for domain startup issues in large k8s cluster when watch events are not delivered. #2305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

package oracle.kubernetes.operator;

import java.util.Set;
import java.util.stream.Stream;

import io.kubernetes.client.openapi.models.CoreV1Event;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1Pod;
Expand Down Expand Up @@ -64,5 +67,17 @@ public interface DomainProcessor {
/**
* If the logging level is high enough, reports on any fibers which may currently be suspended.
*/
void reportSuspendedFibers();
default void reportSuspendedFibers() {
// no-op
}

/**
* Finds stranded cached domain presence infos that are not identified by the key set.
* @param namespace namespace
* @param domainUids domain UID key set
* @return stream of cached domain presence infos.
*/
default Stream<DomainPresenceInfo> findStrandedDomainPresenceInfos(String namespace, Set<String> domainUids) {
return Stream.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@

package oracle.kubernetes.operator;

import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;

import io.kubernetes.client.openapi.models.CoreV1Event;
import io.kubernetes.client.openapi.models.V1ConfigMap;
Expand Down Expand Up @@ -346,6 +349,12 @@ public void reportSuspendedFibers() {
}
}

@Override
public Stream<DomainPresenceInfo> findStrandedDomainPresenceInfos(String namespace, Set<String> domainUids) {
return Optional.ofNullable(DOMAINS.get(namespace)).orElse(Collections.emptyMap())
.entrySet().stream().filter(e -> !domainUids.contains(e.getKey())).map(Map.Entry::getValue);
}

private String getDomainUid(Fiber fiber) {
return Optional.ofNullable(fiber)
.map(Fiber::getPacket)
Expand Down Expand Up @@ -730,6 +739,26 @@ class MakeRightDomainOperationImpl implements MakeRightDomainOperation {
*/
MakeRightDomainOperationImpl(DomainPresenceInfo liveInfo) {
this.liveInfo = liveInfo;
DomainPresenceInfo cachedInfo = getExistingDomainPresenceInfo(getNamespace(), getDomainUid());
if (!isNewDomain(cachedInfo)
&& isAfter(getCreationTimestamp(liveInfo), getCreationTimestamp(cachedInfo))) {
willInterrupt = true;
}
}

private OffsetDateTime getCreationTimestamp(DomainPresenceInfo dpi) {
return Optional.ofNullable(dpi.getDomain())
.map(Domain::getMetadata).map(V1ObjectMeta::getCreationTimestamp).orElse(null);
}

private boolean isAfter(OffsetDateTime one, OffsetDateTime two) {
if (two == null) {
return true;
}
if (one == null) {
return false;
}
return one.isAfter(two);
}

/**
Expand Down Expand Up @@ -777,7 +806,7 @@ private MakeRightDomainOperation withDeleting(boolean deleting) {
* Modifies the factory to indicate that it should interrupt any current make-right thread.
* @return the updated factory
*/
MakeRightDomainOperation interrupt() {
public MakeRightDomainOperation interrupt() {
willInterrupt = true;
return this;
}
Expand Down Expand Up @@ -1091,6 +1120,7 @@ public void onThrowable(Packet packet, Throwable throwable) {
}
};

LOGGER.fine("Starting fiber for domainUid -> " + domainUid + ", isWillInterrupt -> " + isWillInterrupt);
if (isWillInterrupt) {
gate.startFiber(domainUid, plan.step, plan.packet, cc);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.kubernetes.client.openapi.models.CoreV1Event;
import io.kubernetes.client.openapi.models.CoreV1EventList;
Expand Down Expand Up @@ -71,7 +70,7 @@ Consumer<DomainList> getDomainListProcessing() {
@Override
void completeProcessing(Packet packet) {
DomainProcessor dp = Optional.ofNullable(packet.getSpi(DomainProcessor.class)).orElse(processor);
getStrandedDomainPresenceInfos().forEach(info -> removeStrandedDomainPresenceInfo(dp, info));
getStrandedDomainPresenceInfos(dp).forEach(info -> removeStrandedDomainPresenceInfo(dp, info));
getActiveDomainPresenceInfos().forEach(info -> activateDomain(dp, info));
}
};
Expand Down Expand Up @@ -131,8 +130,9 @@ private void addDomain(Domain domain) {
getDomainPresenceInfo(domain.getDomainUid()).setDomain(domain);
}

private Set<DomainPresenceInfo> getStrandedDomainPresenceInfos() {
return domainPresenceInfoMap.values().stream().filter(this::isStranded).collect(Collectors.toSet());
private Stream<DomainPresenceInfo> getStrandedDomainPresenceInfos(DomainProcessor dp) {
return Stream.concat(domainPresenceInfoMap.values().stream().filter(this::isStranded),
dp.findStrandedDomainPresenceInfos(namespace, domainPresenceInfoMap.keySet()));
}

private boolean isStranded(DomainPresenceInfo dpi) {
Expand All @@ -145,8 +145,8 @@ private static void removeStrandedDomainPresenceInfo(DomainProcessor dp, DomainP
dp.createMakeRightOperation(info).withExplicitRecheck().forDeletion().execute();
}

private Set<DomainPresenceInfo> getActiveDomainPresenceInfos() {
return domainPresenceInfoMap.values().stream().filter(this::isActive).collect(Collectors.toSet());
private Stream<DomainPresenceInfo> getActiveDomainPresenceInfos() {
return domainPresenceInfoMap.values().stream().filter(this::isActive);
}

private boolean isActive(DomainPresenceInfo dpi) {
Expand All @@ -155,7 +155,11 @@ private boolean isActive(DomainPresenceInfo dpi) {

private static void activateDomain(DomainProcessor dp, DomainPresenceInfo info) {
info.setPopulated(true);
dp.createMakeRightOperation(info).withExplicitRecheck().execute();
MakeRightDomainOperation makeRight = dp.createMakeRightOperation(info).withExplicitRecheck();
if (info.getDomain().getStatus() == null) {
makeRight = makeRight.interrupt();
}
makeRight.execute();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public interface MakeRightDomainOperation {

MakeRightDomainOperation withEventData(EventItem eventItem, String message);

MakeRightDomainOperation interrupt();

void execute();

Step createSteps();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,12 @@ void onFailure(AsyncFiber fiber, ApiException ae, int statusCode, Map<String, Li
logFailure(ae, statusCode, responseHeaders);
}

helper.recycle(client);
if (ae.getCause() instanceof java.net.ProtocolException) {
helper.discard(client);
} else {
helper.recycle(client);
}

addResponseComponent(Component.createFor(
RetryStrategy.class, retryStrategy,
createFailure(requestParams, ae, statusCode).withResponseHeaders(responseHeaders)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ protected ApiClient create() {
return instance.updateAndGet(prev -> prev != null ? prev : getApiClient());
}

@Override
public void discard(ApiClient client) {
client = null;
instance.updateAndGet(newClient -> getApiClient());
}

private ApiClient getApiClient() {
LOGGER.entering();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,11 @@ protected T onRecycle(T instance) {
* @return Created instance
*/
protected abstract T create();

/**
* Discards the object instance. This method will cause {@link #take()
* take} to return a different object from pool.
*
*/
protected abstract void discard(T client);
}
Loading