Skip to content

Changes for OWLS-83431 #1855

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import oracle.kubernetes.weblogic.domain.model.ServerSpec;
import oracle.kubernetes.weblogic.domain.model.Shutdown;

import static oracle.kubernetes.operator.LabelConstants.CLUSTERNAME_LABEL;
import static oracle.kubernetes.operator.ProcessingConstants.SERVERS_TO_ROLL;

public class PodHelper {
Expand Down Expand Up @@ -85,6 +86,63 @@ public static boolean isReady(V1Pod pod) {
return ready;
}

/**
* Get list of scheduled pods for a particular cluster or non-clustered servers.
* @param info Domain presence info
* @param clusterName cluster name of the pod server
* @return list containing scheduled pods
*/
public static List<String> getScheduledPods(DomainPresenceInfo info, String clusterName) {
// These are presently scheduled servers
List<String> scheduledServers = new ArrayList<>();
for (Map.Entry<String, ServerKubernetesObjects> entry : info.getServers().entrySet()) {
V1Pod pod = entry.getValue().getPod().get();
if (pod != null && !PodHelper.isDeleting(pod) && PodHelper.getScheduledStatus(pod)) {
String wlsClusterName = pod.getMetadata().getLabels().get(CLUSTERNAME_LABEL);
if ((wlsClusterName == null) || (wlsClusterName.contains(clusterName))) {
scheduledServers.add(entry.getKey());
}
}
}
return scheduledServers;
}

/**
* Get list of ready pods for a particular cluster or non-clustered servers.
* @param info Domain presence info
* @param clusterName cluster name of the pod server
* @return list containing ready pods
*/
public static List<String> getReadyPods(DomainPresenceInfo info, String clusterName) {
// These are presently Ready servers
List<String> readyServers = new ArrayList<>();
for (Map.Entry<String, ServerKubernetesObjects> entry : info.getServers().entrySet()) {
V1Pod pod = entry.getValue().getPod().get();
if (pod != null && !PodHelper.isDeleting(pod) && PodHelper.getReadyStatus(pod)) {
String wlsClusterName = pod.getMetadata().getLabels().get(CLUSTERNAME_LABEL);
if ((wlsClusterName == null) || (wlsClusterName.contains(clusterName))) {
readyServers.add(entry.getKey());
}
}
}
return readyServers;
}

/**
* get if pod is in scheduled state.
* @param pod pod
* @return true, if pod is scheduled
*/
public static boolean getScheduledStatus(V1Pod pod) {
V1PodSpec spec = pod.getSpec();
if (spec != null) {
if (spec.getNodeName() != null) {
return true;
}
}
return false;
}

/**
* get if pod is in ready state.
* @param pod pod
Expand Down Expand Up @@ -444,7 +502,7 @@ protected String getPodReplacedMessageKey() {
protected V1ObjectMeta createMetadata() {
V1ObjectMeta metadata = super.createMetadata();
if (getClusterName() != null) {
metadata.putLabelsItem(LabelConstants.CLUSTERNAME_LABEL, getClusterName());
metadata.putLabelsItem(CLUSTERNAME_LABEL, getClusterName());
}
return metadata;
}
Expand Down Expand Up @@ -502,7 +560,7 @@ public NextAction apply(Packet packet) {
if (oldPod != null) {
Map<String, String> labels = oldPod.getMetadata().getLabels();
if (labels != null) {
clusterName = labels.get(LabelConstants.CLUSTERNAME_LABEL);
clusterName = labels.get(CLUSTERNAME_LABEL);
}

ServerSpec serverSpec = info.getDomain().getServer(serverName, clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
package oracle.kubernetes.operator.steps;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import oracle.kubernetes.operator.DomainStatusUpdater;
import oracle.kubernetes.operator.ProcessingConstants;
Expand Down Expand Up @@ -43,9 +45,6 @@ public class ManagedServerUpIteratorStep extends Step {

private final Collection<ServerStartupInfo> startupInfos;

private static NextStepFactory NEXT_STEP_FACTORY =
(next) -> DomainStatusUpdater.createStatusUpdateStep(next);

public ManagedServerUpIteratorStep(Collection<ServerStartupInfo> startupInfos, Step next) {
super(next);
this.startupInfos = startupInfos;
Expand Down Expand Up @@ -89,12 +88,26 @@ public NextAction apply(Packet packet) {
.filter(ssi -> !isServerInCluster(ssi))
.map(ssi -> createManagedServerUpDetails(packet, ssi)).collect(Collectors.toList());

getStartClusteredServersStepFactories(startupInfos, packet).values()
.forEach(factory -> startDetails.addAll(factory.getServerStartsStepAndPackets()));
Collection<StepAndPacket> work = new ArrayList<>();
if (!startDetails.isEmpty()) {
work.add(
new StepAndPacket(
new StartManagedServersStep(null, 0, startDetails, null), packet));
}

for (Map.Entry<String, StartClusteredServersStepFactory> entry
: getStartClusteredServersStepFactories(startupInfos, packet).entrySet()) {
work.add(
new StepAndPacket(
new StartManagedServersStep(entry.getKey(), entry.getValue().getMaxConcurrency(),
entry.getValue().getServerStartsStepAndPackets(), null), packet.clone()));
}

if (!work.isEmpty()) {
return doForkJoin(DomainStatusUpdater.createStatusUpdateStep(getNext()), packet, work);
}

return doNext(
NEXT_STEP_FACTORY.createStatusUpdateStep(new StartManagedServersStep(startDetails, getNext())),
packet);
return doNext(DomainStatusUpdater.createStatusUpdateStep(getNext()), packet);
}


Expand Down Expand Up @@ -142,19 +155,52 @@ private boolean isServerInCluster(ServerStartupInfo ssi) {

static class StartManagedServersStep extends Step {
final Collection<StepAndPacket> startDetails;
final Queue<StepAndPacket> startDetailsQueue = new ConcurrentLinkedQueue<>();
final String clusterName;
final int maxConcurrency;
final AtomicInteger numStarted = new AtomicInteger(0);

StartManagedServersStep(Collection<StepAndPacket> startDetails, Step next) {
StartManagedServersStep(String clusterName, int maxConcurrency, Collection<StepAndPacket> startDetails, Step next) {
super(next);
this.clusterName = clusterName;
this.startDetails = startDetails;
this.maxConcurrency = maxConcurrency;
startDetails.forEach(this::add);
}

Collection<StepAndPacket> getStartDetails() {
return startDetails;
void add(StepAndPacket serverToStart) {
startDetailsQueue.add(new StepAndPacket(serverToStart.step, serverToStart.packet));
}

@Override
public NextAction apply(Packet packet) {
return doForkJoin(new ManagedServerUpAfterStep(getNext()), packet, startDetails);

if (startDetailsQueue.isEmpty()) {
return doNext(new ManagedServerUpAfterStep(getNext()), packet);
} else if (isServiceOnlyOrShuttingDown()) {
Collection<StepAndPacket> servers = Collections.singletonList(startDetailsQueue.poll());
return doForkJoin(this, packet, servers);
} else if (serverAvailableToStart(packet.getSpi(DomainPresenceInfo.class))) {
numStarted.getAndIncrement();
return doForkJoin(this, packet, Collections.singletonList(startDetailsQueue.poll()));
} else {
return doDelay(this, packet, 100, TimeUnit.MILLISECONDS);
}
}

private boolean isServiceOnlyOrShuttingDown() {
return Optional.ofNullable(startDetailsQueue.peek().step)
.map(step -> step.getNext() instanceof ServerDownStep).orElse(false);
}

private boolean serverAvailableToStart(DomainPresenceInfo info) {
return ((numStarted.get() < PodHelper.getScheduledPods(info, clusterName).size())
&& (canStartConcurrently(PodHelper.getReadyPods(info, clusterName).size())));
}

private boolean canStartConcurrently(int numReady) {
return ((this.maxConcurrency > 0) && (numStarted.get() < (this.maxConcurrency + numReady - 1)))
|| (this.maxConcurrency == 0);
}
}

Expand All @@ -171,58 +217,17 @@ private static class StartClusteredServersStepFactory {
this.maxConcurrency = maxConcurrency;
}

public int getMaxConcurrency() {
return this.maxConcurrency;
}

void add(StepAndPacket serverToStart) {
serversToStart.add(serverToStart);
}

Collection<StepAndPacket> getServerStartsStepAndPackets() {
if (maxConcurrency == 0 || serversToStart.size() <= maxConcurrency) {
return serversToStart;
}
ArrayList<StepAndPacket> steps = new ArrayList<>(maxConcurrency);
IntStream.range(0, maxConcurrency)
.forEach(i -> steps.add(StartClusteredServersStep.createStepAndPacket(serversToStart)));
return steps;
}

}

static class StartClusteredServersStep extends Step {

private final Queue<StepAndPacket> serversToStart;

static StepAndPacket createStepAndPacket(Queue<StepAndPacket> serversToStart) {
return new StepAndPacket(new StartClusteredServersStep(serversToStart), null);
}

StartClusteredServersStep(Queue<StepAndPacket> serversToStart) {
super(null);
this.serversToStart = serversToStart;
serversToStart.forEach(stepAndPacket -> setupSequentialStartPacket(stepAndPacket.packet));
}

Collection<StepAndPacket> getServersToStart() {
return serversToStart;
}

private void setupSequentialStartPacket(Packet packet) {
packet.put(ProcessingConstants.WAIT_FOR_POD_READY, true);
}

@Override
public NextAction apply(Packet packet) {
Collection<StepAndPacket> servers = Arrays.asList(serversToStart.poll());
if (servers.isEmpty()) {
return doNext(packet);
} else {
return doForkJoin(this, packet, servers);
}
}
}

// an interface to provide a hook for unit testing.
interface NextStepFactory {
Step createStatusUpdateStep(Step next);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public String getServiceAccountName() {
return serverPod.getServiceAccountName();
}

void setNodeName(String nodeName) {
public void setNodeName(String nodeName) {
serverPod.setNodeName(nodeName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class DomainProcessorTestSetup {
public static final String NS = "namespace";
public static final String SECRET_NAME = "secret-name";
public static final String KUBERNETES_UID = "12345";
public static final String NODE_NAME = "Node1";

private static final String INTROSPECTION_JOB = LegalNames.toJobIntrospectorName(UID);
private static final String INTROSPECT_RESULT =
Expand Down Expand Up @@ -88,13 +89,14 @@ private static V1ObjectMeta withTimestamps(V1ObjectMeta meta) {
* @return a domain
*/
public static Domain createTestDomain() {
DomainSpec ds = new DomainSpec()
.withWebLogicCredentialsSecret(new V1SecretReference().name(SECRET_NAME).namespace(NS));
ds.setNodeName(NODE_NAME);
return new Domain()
.withApiVersion(KubernetesConstants.DOMAIN_GROUP + "/" + KubernetesConstants.DOMAIN_VERSION)
.withKind(KubernetesConstants.DOMAIN)
.withMetadata(withTimestamps(new V1ObjectMeta().name(UID).namespace(NS).uid(KUBERNETES_UID)))
.withSpec(
new DomainSpec()
.withWebLogicCredentialsSecret(new V1SecretReference().name(SECRET_NAME).namespace(NS)));
.withSpec(ds);
}

/**
Expand Down
Loading