Skip to content

Commit 1a7ca60

Browse files
ankediarjeberhard
andauthored
OWLS-88571 - Potential fixes for domain startup issues in large k8s cluster when watch events are not delivered. (#2305)
* OWLS-88571 - Potential fix for GBU CNE issue to start a new fiber for a recreated domain when previous fiber is stuck. * Change for the ProtocolException and NPE issue to discard the API client instance and create a new one. * Change to check if the creationTimestamp of the live domain is newer compared to the cached domain creation timestamp. * remove stranded dpi's that are no longer present in recheck domain list (#2314) * Clean-up stranded domain presence info when watches aren't occurring Co-authored-by: Ryan Eberhard <ryan.eberhard@oracle.com>
1 parent a186d14 commit 1a7ca60

14 files changed

+214
-53
lines changed

operator/src/main/java/oracle/kubernetes/operator/DomainProcessor.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44
package oracle.kubernetes.operator;
55

6+
import java.util.Set;
7+
import java.util.stream.Stream;
8+
69
import io.kubernetes.client.openapi.models.CoreV1Event;
710
import io.kubernetes.client.openapi.models.V1ConfigMap;
811
import io.kubernetes.client.openapi.models.V1Pod;
@@ -64,5 +67,17 @@ public interface DomainProcessor {
6467
/**
6568
* If the logging level is high enough, reports on any fibers which may currently be suspended.
6669
*/
67-
void reportSuspendedFibers();
70+
default void reportSuspendedFibers() {
71+
// no-op
72+
}
73+
74+
/**
75+
* Finds stranded cached domain presence infos that are not identified by the key set.
76+
* @param namespace namespace
77+
* @param domainUids domain UID key set
78+
* @return stream of cached domain presence infos.
79+
*/
80+
default Stream<DomainPresenceInfo> findStrandedDomainPresenceInfos(String namespace, Set<String> domainUids) {
81+
return Stream.empty();
82+
}
6883
}

operator/src/main/java/oracle/kubernetes/operator/DomainProcessorImpl.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,22 @@
33

44
package oracle.kubernetes.operator;
55

6+
import java.time.OffsetDateTime;
67
import java.util.ArrayList;
78
import java.util.Collection;
89
import java.util.Collections;
910
import java.util.List;
1011
import java.util.Map;
1112
import java.util.Objects;
1213
import java.util.Optional;
14+
import java.util.Set;
1315
import java.util.concurrent.ConcurrentHashMap;
1416
import java.util.concurrent.ScheduledFuture;
1517
import java.util.concurrent.TimeUnit;
1618
import java.util.concurrent.atomic.AtomicInteger;
1719
import java.util.function.BiConsumer;
1820
import java.util.function.Consumer;
21+
import java.util.stream.Stream;
1922

2023
import io.kubernetes.client.openapi.models.CoreV1Event;
2124
import io.kubernetes.client.openapi.models.V1ConfigMap;
@@ -346,6 +349,12 @@ public void reportSuspendedFibers() {
346349
}
347350
}
348351

352+
@Override
353+
public Stream<DomainPresenceInfo> findStrandedDomainPresenceInfos(String namespace, Set<String> domainUids) {
354+
return Optional.ofNullable(DOMAINS.get(namespace)).orElse(Collections.emptyMap())
355+
.entrySet().stream().filter(e -> !domainUids.contains(e.getKey())).map(Map.Entry::getValue);
356+
}
357+
349358
private String getDomainUid(Fiber fiber) {
350359
return Optional.ofNullable(fiber)
351360
.map(Fiber::getPacket)
@@ -730,6 +739,26 @@ class MakeRightDomainOperationImpl implements MakeRightDomainOperation {
730739
*/
731740
MakeRightDomainOperationImpl(DomainPresenceInfo liveInfo) {
732741
this.liveInfo = liveInfo;
742+
DomainPresenceInfo cachedInfo = getExistingDomainPresenceInfo(getNamespace(), getDomainUid());
743+
if (!isNewDomain(cachedInfo)
744+
&& isAfter(getCreationTimestamp(liveInfo), getCreationTimestamp(cachedInfo))) {
745+
willInterrupt = true;
746+
}
747+
}
748+
749+
private OffsetDateTime getCreationTimestamp(DomainPresenceInfo dpi) {
750+
return Optional.ofNullable(dpi.getDomain())
751+
.map(Domain::getMetadata).map(V1ObjectMeta::getCreationTimestamp).orElse(null);
752+
}
753+
754+
private boolean isAfter(OffsetDateTime one, OffsetDateTime two) {
755+
if (two == null) {
756+
return true;
757+
}
758+
if (one == null) {
759+
return false;
760+
}
761+
return one.isAfter(two);
733762
}
734763

735764
/**
@@ -777,7 +806,7 @@ private MakeRightDomainOperation withDeleting(boolean deleting) {
777806
* Modifies the factory to indicate that it should interrupt any current make-right thread.
778807
* @return the updated factory
779808
*/
780-
MakeRightDomainOperation interrupt() {
809+
public MakeRightDomainOperation interrupt() {
781810
willInterrupt = true;
782811
return this;
783812
}
@@ -1091,6 +1120,7 @@ public void onThrowable(Packet packet, Throwable throwable) {
10911120
}
10921121
};
10931122

1123+
LOGGER.fine("Starting fiber for domainUid -> " + domainUid + ", isWillInterrupt -> " + isWillInterrupt);
10941124
if (isWillInterrupt) {
10951125
gate.startFiber(domainUid, plan.step, plan.packet, cc);
10961126
} else {

operator/src/main/java/oracle/kubernetes/operator/DomainResourcesValidation.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@
55

66
import java.util.Map;
77
import java.util.Optional;
8-
import java.util.Set;
98
import java.util.concurrent.ConcurrentHashMap;
109
import java.util.function.Consumer;
11-
import java.util.stream.Collectors;
10+
import java.util.stream.Stream;
1211

1312
import io.kubernetes.client.openapi.models.CoreV1Event;
1413
import io.kubernetes.client.openapi.models.CoreV1EventList;
@@ -71,7 +70,7 @@ Consumer<DomainList> getDomainListProcessing() {
7170
@Override
7271
void completeProcessing(Packet packet) {
7372
DomainProcessor dp = Optional.ofNullable(packet.getSpi(DomainProcessor.class)).orElse(processor);
74-
getStrandedDomainPresenceInfos().forEach(info -> removeStrandedDomainPresenceInfo(dp, info));
73+
getStrandedDomainPresenceInfos(dp).forEach(info -> removeStrandedDomainPresenceInfo(dp, info));
7574
getActiveDomainPresenceInfos().forEach(info -> activateDomain(dp, info));
7675
}
7776
};
@@ -131,8 +130,9 @@ private void addDomain(Domain domain) {
131130
getDomainPresenceInfo(domain.getDomainUid()).setDomain(domain);
132131
}
133132

134-
private Set<DomainPresenceInfo> getStrandedDomainPresenceInfos() {
135-
return domainPresenceInfoMap.values().stream().filter(this::isStranded).collect(Collectors.toSet());
133+
private Stream<DomainPresenceInfo> getStrandedDomainPresenceInfos(DomainProcessor dp) {
134+
return Stream.concat(domainPresenceInfoMap.values().stream().filter(this::isStranded),
135+
dp.findStrandedDomainPresenceInfos(namespace, domainPresenceInfoMap.keySet()));
136136
}
137137

138138
private boolean isStranded(DomainPresenceInfo dpi) {
@@ -145,8 +145,8 @@ private static void removeStrandedDomainPresenceInfo(DomainProcessor dp, DomainP
145145
dp.createMakeRightOperation(info).withExplicitRecheck().forDeletion().execute();
146146
}
147147

148-
private Set<DomainPresenceInfo> getActiveDomainPresenceInfos() {
149-
return domainPresenceInfoMap.values().stream().filter(this::isActive).collect(Collectors.toSet());
148+
private Stream<DomainPresenceInfo> getActiveDomainPresenceInfos() {
149+
return domainPresenceInfoMap.values().stream().filter(this::isActive);
150150
}
151151

152152
private boolean isActive(DomainPresenceInfo dpi) {
@@ -155,7 +155,11 @@ private boolean isActive(DomainPresenceInfo dpi) {
155155

156156
private static void activateDomain(DomainProcessor dp, DomainPresenceInfo info) {
157157
info.setPopulated(true);
158-
dp.createMakeRightOperation(info).withExplicitRecheck().execute();
158+
MakeRightDomainOperation makeRight = dp.createMakeRightOperation(info).withExplicitRecheck();
159+
if (info.getDomain().getStatus() == null) {
160+
makeRight = makeRight.interrupt();
161+
}
162+
makeRight.execute();
159163
}
160164

161165
}

operator/src/main/java/oracle/kubernetes/operator/MakeRightDomainOperation.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public interface MakeRightDomainOperation {
2828

2929
MakeRightDomainOperation withEventData(EventItem eventItem, String message);
3030

31+
MakeRightDomainOperation interrupt();
32+
3133
void execute();
3234

3335
Step createSteps();

operator/src/main/java/oracle/kubernetes/operator/calls/AsyncRequestStep.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,12 @@ void onFailure(AsyncFiber fiber, ApiException ae, int statusCode, Map<String, Li
197197
logFailure(ae, statusCode, responseHeaders);
198198
}
199199

200-
helper.recycle(client);
200+
if (ae.getCause() instanceof java.net.ProtocolException) {
201+
helper.discard(client);
202+
} else {
203+
helper.recycle(client);
204+
}
205+
201206
addResponseComponent(Component.createFor(
202207
RetryStrategy.class, retryStrategy,
203208
createFailure(requestParams, ae, statusCode).withResponseHeaders(responseHeaders)));

operator/src/main/java/oracle/kubernetes/operator/helpers/ClientPool.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ protected ApiClient create() {
6464
return instance.updateAndGet(prev -> prev != null ? prev : getApiClient());
6565
}
6666

67+
@Override
68+
public void discard(ApiClient client) {
69+
client = null;
70+
instance.updateAndGet(newClient -> getApiClient());
71+
}
72+
6773
private ApiClient getApiClient() {
6874
LOGGER.entering();
6975

operator/src/main/java/oracle/kubernetes/operator/helpers/Pool.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,11 @@ protected T onRecycle(T instance) {
6363
* @return Created instance
6464
*/
6565
protected abstract T create();
66+
67+
/**
68+
* Discards the object instance. This method will cause {@link #take()
69+
* take} to return a different object from pool.
70+
*
71+
*/
72+
protected abstract void discard(T client);
6673
}

0 commit comments

Comments
 (0)