Skip to content

Commit

Permalink
Testclusters: improove timeout handling (#43440)
Browse files Browse the repository at this point in the history
  • Loading branch information
alpar-t committed Jul 1, 2019
1 parent a58d231 commit b430329
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ public void start() {
if (Version.fromString(node.getVersion()).getMajor() >= 7) {
node.defaultConfig.put("cluster.initial_master_nodes", "[" + nodeNames + "]");
node.defaultConfig.put("discovery.seed_providers", "file");
node.defaultConfig.put("discovery.seed_hosts", "[]");
}
}
node.start();
Expand Down Expand Up @@ -286,14 +287,13 @@ public List<String> getAllTransportPortURI() {
}

public void waitForAllConditions() {
long startedAt = System.currentTimeMillis();
LOGGER.info("Waiting for nodes");
nodes.forEach(ElasticsearchNode::waitForAllConditions);

writeUnicastHostsFiles();

LOGGER.info("Starting to wait for cluster to form");
waitForConditions(waitConditions, startedAt, CLUSTER_UP_TIMEOUT, CLUSTER_UP_TIMEOUT_UNIT, this);
waitForConditions(waitConditions, System.currentTimeMillis(), CLUSTER_UP_TIMEOUT, CLUSTER_UP_TIMEOUT_UNIT, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -65,8 +67,10 @@ public class ElasticsearchNode implements TestClusterConfiguration {
private static final Logger LOGGER = Logging.getLogger(ElasticsearchNode.class);
private static final int ES_DESTROY_TIMEOUT = 20;
private static final TimeUnit ES_DESTROY_TIMEOUT_UNIT = TimeUnit.SECONDS;
private static final int NODE_UP_TIMEOUT = 60;
private static final TimeUnit NODE_UP_TIMEOUT_UNIT = TimeUnit.SECONDS;
private static final int NODE_UP_TIMEOUT = 2;
private static final TimeUnit NODE_UP_TIMEOUT_UNIT = TimeUnit.MINUTES;
private static final int ADDITIONAL_CONFIG_TIMEOUT = 15;
private static final TimeUnit ADDITIONAL_CONFIG_TIMEOUT_UNIT = TimeUnit.SECONDS;
private static final List<String> OVERRIDABLE_SETTINGS = Arrays.asList(
"path.repo",
"discovery.seed_providers"
Expand Down Expand Up @@ -310,6 +314,7 @@ public synchronized void start() {

try {
if (isWorkingDirConfigured == false) {
logToProcessStdout("Configuring working directory: " + workingDir);
// Only configure working dir once so we don't loose data on restarts
isWorkingDirConfigured = true;
createWorkingDir(distroArtifact);
Expand All @@ -319,12 +324,16 @@ public synchronized void start() {
}
createConfiguration();

plugins.forEach(plugin -> runElaticsearchBinScript(
"elasticsearch-plugin",
"install", "--batch", plugin.toString())
);
if(plugins.isEmpty() == false) {
logToProcessStdout("Installing " + plugins.size() + " plugins");
plugins.forEach(plugin -> runElaticsearchBinScript(
"elasticsearch-plugin",
"install", "--batch", plugin.toString())
);
}

if (keystoreSettings.isEmpty() == false || keystoreFiles.isEmpty() == false) {
logToProcessStdout("Adding " + keystoreSettings.size() + " keystore settings and " + keystoreFiles.size() + " keystore files");
runElaticsearchBinScript("elasticsearch-keystore", "create");

checkSuppliers("Keystore", keystoreSettings.values());
Expand All @@ -347,6 +356,7 @@ public synchronized void start() {
copyExtraConfigFiles();

if (isSettingMissingOrTrue("xpack.security.enabled")) {
logToProcessStdout("Setting up " + credentials.size() + " users");
if (credentials.isEmpty()) {
user(Collections.emptyMap());
}
Expand All @@ -358,9 +368,25 @@ public synchronized void start() {
));
}

logToProcessStdout("Starting Elasticsearch process");
startElasticsearchProcess();
}

private void logToProcessStdout(String message) {
try {
if (Files.exists(esStdoutFile.getParent()) == false) {
Files.createDirectories(esStdoutFile.getParent());
}
Files.write(
esStdoutFile,
("[" + Instant.now().toString() + "] [BUILD] " + message + "\n").getBytes(StandardCharsets.UTF_8),
StandardOpenOption.CREATE, StandardOpenOption.APPEND
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void restart() {
LOGGER.info("Restarting {}", this);
Expand All @@ -380,6 +406,9 @@ private boolean isSettingMissingOrTrue(String name) {
}

private void copyExtraConfigFiles() {
if (extraConfigFiles.isEmpty() == false) {
logToProcessStdout("Setting up " + extraConfigFiles.size() + " additional config files");
}
extraConfigFiles.forEach((destination, from) -> {
if (Files.exists(from.toPath()) == false) {
throw new TestClustersException("Can't create extra config file from " + from + " for " + this +
Expand All @@ -398,6 +427,7 @@ private void copyExtraConfigFiles() {

private void installModules() {
if (distribution == Distribution.INTEG_TEST) {
logToProcessStdout("Installing " + modules.size() + "modules");
for (File module : modules) {
Path destination = workingDir.resolve("modules").resolve(module.getName().replace(".zip", "").replace("-" + version, ""));

Expand Down Expand Up @@ -843,7 +873,23 @@ public boolean isProcessAlive() {
}

void waitForAllConditions() {
waitForConditions(waitConditions, System.currentTimeMillis(), NODE_UP_TIMEOUT, NODE_UP_TIMEOUT_UNIT, this);
waitForConditions(
waitConditions,
System.currentTimeMillis(),
NODE_UP_TIMEOUT_UNIT.toMillis(NODE_UP_TIMEOUT) +
// Installing plugins at config time and loading them when nods start requires additional time we need to
// account for
ADDITIONAL_CONFIG_TIMEOUT_UNIT.toMillis(ADDITIONAL_CONFIG_TIMEOUT *
(
plugins.size() +
keystoreFiles.size() +
keystoreSettings.size() +
credentials.size()
)
),
TimeUnit.MILLISECONDS,
this
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.elasticsearch.gradle.testclusters;

import org.gradle.api.logging.Logger;
import org.gradle.api.logging.Logging;

import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

/**
* Keep an inventory of all running Clusters and stop them when interrupted
*
* This takes advantage of the fact that Gradle interrupts all the threads in the daemon when the build completes.
*/
public class TestClusterCleanupOnShutdown implements Runnable {

private final Logger logger = Logging.getLogger(TestClusterCleanupOnShutdown.class);

private Set<ElasticsearchCluster> clustersToWatch = new HashSet<>();

public void watch(Collection<ElasticsearchCluster> cluster) {
synchronized (clustersToWatch) {
clustersToWatch.addAll(clustersToWatch);
}
}

public void unWatch(Collection<ElasticsearchCluster> cluster) {
synchronized (clustersToWatch) {
clustersToWatch.removeAll(clustersToWatch);
}
}

@Override
public void run() {
try {
while (true) {
Thread.sleep(Long.MAX_VALUE);
}
} catch (InterruptedException interrupted) {
synchronized (clustersToWatch) {
if (clustersToWatch.isEmpty()) {
return;
}
logger.info("Cleanup thread was interrupted, shutting down all clusters");
Iterator<ElasticsearchCluster> iterator = clustersToWatch.iterator();
while (iterator.hasNext()) {
ElasticsearchCluster cluster = iterator.next();
iterator.remove();
try {
cluster.stop(false);
} catch (Exception e) {
logger.warn("Could not shut down {}", cluster, e);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.elasticsearch.gradle.testclusters;

import org.gradle.api.Project;
import org.gradle.api.logging.Logger;
import org.gradle.api.logging.Logging;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* This extensions was meant to be used internally by testclusters
*
* It holds synchronization primitives needed to implement the rate limiting.
* This is tricky because we can't use Gradle workers as there's no way to make sure that tests and their clusters are
* allocated atomically, so we could be in a situation where all workers are tests waiting for clusters to start up.
*
* Also auto configures cleanup of executors to make sure we don't leak threads in the daemon.
*/
public class TestClustersCleanupExtension {

private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1;
private static final TimeUnit EXECUTOR_SHUTDOWN_TIMEOUT_UNIT = TimeUnit.MINUTES;

private static final Logger logger = Logging.getLogger(TestClustersCleanupExtension.class);

private final ExecutorService executorService;
private final TestClusterCleanupOnShutdown cleanupThread;

public TestClustersCleanupExtension() {
executorService = Executors.newSingleThreadExecutor();
cleanupThread = new TestClusterCleanupOnShutdown();
executorService.submit(cleanupThread);
}


public static void createExtension(Project project) {
if (project.getRootProject().getExtensions().findByType(TestClustersCleanupExtension.class) != null) {
return;
}
// Configure the extension on the root project so we have a single instance per run
TestClustersCleanupExtension ext = project.getRootProject().getExtensions().create(
"__testclusters_rate_limit",
TestClustersCleanupExtension.class
);
Thread shutdownHook = new Thread(ext.cleanupThread::run);
Runtime.getRuntime().addShutdownHook(shutdownHook);
project.getGradle().buildFinished(buildResult -> {
ext.executorService.shutdownNow();
try {
if (ext.executorService.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIMEOUT_UNIT) == false) {
throw new IllegalStateException(
"Failed to shut down executor service after " +
EXECUTOR_SHUTDOWN_TIMEOUT + " " + EXECUTOR_SHUTDOWN_TIMEOUT_UNIT
);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
try {
if (false == Runtime.getRuntime().removeShutdownHook(shutdownHook)) {
logger.warn("Trying to deregister shutdown hook when it was not registered.");
}
} catch (IllegalStateException ese) {
// Thrown when shutdown is in progress
logger.warn("Can't remove shutdown hook", ese);
}
});
}

public TestClusterCleanupOnShutdown getCleanupThread() {
return cleanupThread;
}
}
Loading

0 comments on commit b430329

Please sign in to comment.