From b4303290d97778ada1fe5b563e6d73b650b69ee0 Mon Sep 17 00:00:00 2001 From: Alpar Torok Date: Mon, 1 Jul 2019 11:29:54 +0300 Subject: [PATCH] Testclusters: improove timeout handling (#43440) --- .../testclusters/ElasticsearchCluster.java | 4 +- .../testclusters/ElasticsearchNode.java | 60 ++++++++-- .../TestClusterCleanupOnShutdown.java | 59 +++++++++ .../TestClustersCleanupExtension.java | 74 ++++++++++++ .../testclusters/TestClustersPlugin.java | 112 ++++++------------ gradle.properties | 2 +- 6 files changed, 224 insertions(+), 87 deletions(-) create mode 100644 buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClusterCleanupOnShutdown.java create mode 100644 buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersCleanupExtension.java diff --git a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchCluster.java b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchCluster.java index 9cc03dd7e371d..af9d3cd4da9a4 100644 --- a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchCluster.java +++ b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchCluster.java @@ -229,6 +229,7 @@ public void start() { if (Version.fromString(node.getVersion()).getMajor() >= 7) { node.defaultConfig.put("cluster.initial_master_nodes", "[" + nodeNames + "]"); node.defaultConfig.put("discovery.seed_providers", "file"); + node.defaultConfig.put("discovery.seed_hosts", "[]"); } } node.start(); @@ -286,14 +287,13 @@ public List getAllTransportPortURI() { } public void waitForAllConditions() { - long startedAt = System.currentTimeMillis(); LOGGER.info("Waiting for nodes"); nodes.forEach(ElasticsearchNode::waitForAllConditions); writeUnicastHostsFiles(); LOGGER.info("Starting to wait for cluster to form"); - waitForConditions(waitConditions, startedAt, CLUSTER_UP_TIMEOUT, CLUSTER_UP_TIMEOUT_UNIT, this); + waitForConditions(waitConditions, System.currentTimeMillis(), CLUSTER_UP_TIMEOUT, CLUSTER_UP_TIMEOUT_UNIT, this); } @Override diff --git a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java index 1641ef3dac4a3..8c13c66e0f12b 100644 --- a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java +++ b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java @@ -37,6 +37,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -65,8 +67,10 @@ public class ElasticsearchNode implements TestClusterConfiguration { private static final Logger LOGGER = Logging.getLogger(ElasticsearchNode.class); private static final int ES_DESTROY_TIMEOUT = 20; private static final TimeUnit ES_DESTROY_TIMEOUT_UNIT = TimeUnit.SECONDS; - private static final int NODE_UP_TIMEOUT = 60; - private static final TimeUnit NODE_UP_TIMEOUT_UNIT = TimeUnit.SECONDS; + private static final int NODE_UP_TIMEOUT = 2; + private static final TimeUnit NODE_UP_TIMEOUT_UNIT = TimeUnit.MINUTES; + private static final int ADDITIONAL_CONFIG_TIMEOUT = 15; + private static final TimeUnit ADDITIONAL_CONFIG_TIMEOUT_UNIT = TimeUnit.SECONDS; private static final List OVERRIDABLE_SETTINGS = Arrays.asList( "path.repo", "discovery.seed_providers" @@ -310,6 +314,7 @@ public synchronized void start() { try { if (isWorkingDirConfigured == false) { + logToProcessStdout("Configuring working directory: " + workingDir); // Only configure working dir once so we don't loose data on restarts isWorkingDirConfigured = true; createWorkingDir(distroArtifact); @@ -319,12 +324,16 @@ public synchronized void start() { } createConfiguration(); - plugins.forEach(plugin -> runElaticsearchBinScript( - "elasticsearch-plugin", - "install", "--batch", plugin.toString()) - ); + if(plugins.isEmpty() == false) { + logToProcessStdout("Installing " + plugins.size() + " plugins"); + plugins.forEach(plugin -> runElaticsearchBinScript( + "elasticsearch-plugin", + "install", "--batch", plugin.toString()) + ); + } if (keystoreSettings.isEmpty() == false || keystoreFiles.isEmpty() == false) { + logToProcessStdout("Adding " + keystoreSettings.size() + " keystore settings and " + keystoreFiles.size() + " keystore files"); runElaticsearchBinScript("elasticsearch-keystore", "create"); checkSuppliers("Keystore", keystoreSettings.values()); @@ -347,6 +356,7 @@ public synchronized void start() { copyExtraConfigFiles(); if (isSettingMissingOrTrue("xpack.security.enabled")) { + logToProcessStdout("Setting up " + credentials.size() + " users"); if (credentials.isEmpty()) { user(Collections.emptyMap()); } @@ -358,9 +368,25 @@ public synchronized void start() { )); } + logToProcessStdout("Starting Elasticsearch process"); startElasticsearchProcess(); } + private void logToProcessStdout(String message) { + try { + if (Files.exists(esStdoutFile.getParent()) == false) { + Files.createDirectories(esStdoutFile.getParent()); + } + Files.write( + esStdoutFile, + ("[" + Instant.now().toString() + "] [BUILD] " + message + "\n").getBytes(StandardCharsets.UTF_8), + StandardOpenOption.CREATE, StandardOpenOption.APPEND + ); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + @Override public void restart() { LOGGER.info("Restarting {}", this); @@ -380,6 +406,9 @@ private boolean isSettingMissingOrTrue(String name) { } private void copyExtraConfigFiles() { + if (extraConfigFiles.isEmpty() == false) { + logToProcessStdout("Setting up " + extraConfigFiles.size() + " additional config files"); + } extraConfigFiles.forEach((destination, from) -> { if (Files.exists(from.toPath()) == false) { throw new TestClustersException("Can't create extra config file from " + from + " for " + this + @@ -398,6 +427,7 @@ private void copyExtraConfigFiles() { private void installModules() { if (distribution == Distribution.INTEG_TEST) { + logToProcessStdout("Installing " + modules.size() + "modules"); for (File module : modules) { Path destination = workingDir.resolve("modules").resolve(module.getName().replace(".zip", "").replace("-" + version, "")); @@ -843,7 +873,23 @@ public boolean isProcessAlive() { } void waitForAllConditions() { - waitForConditions(waitConditions, System.currentTimeMillis(), NODE_UP_TIMEOUT, NODE_UP_TIMEOUT_UNIT, this); + waitForConditions( + waitConditions, + System.currentTimeMillis(), + NODE_UP_TIMEOUT_UNIT.toMillis(NODE_UP_TIMEOUT) + + // Installing plugins at config time and loading them when nods start requires additional time we need to + // account for + ADDITIONAL_CONFIG_TIMEOUT_UNIT.toMillis(ADDITIONAL_CONFIG_TIMEOUT * + ( + plugins.size() + + keystoreFiles.size() + + keystoreSettings.size() + + credentials.size() + ) + ), + TimeUnit.MILLISECONDS, + this + ); } @Override diff --git a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClusterCleanupOnShutdown.java b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClusterCleanupOnShutdown.java new file mode 100644 index 0000000000000..0381cece108e2 --- /dev/null +++ b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClusterCleanupOnShutdown.java @@ -0,0 +1,59 @@ +package org.elasticsearch.gradle.testclusters; + +import org.gradle.api.logging.Logger; +import org.gradle.api.logging.Logging; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +/** + * Keep an inventory of all running Clusters and stop them when interrupted + * + * This takes advantage of the fact that Gradle interrupts all the threads in the daemon when the build completes. + */ +public class TestClusterCleanupOnShutdown implements Runnable { + + private final Logger logger = Logging.getLogger(TestClusterCleanupOnShutdown.class); + + private Set clustersToWatch = new HashSet<>(); + + public void watch(Collection cluster) { + synchronized (clustersToWatch) { + clustersToWatch.addAll(clustersToWatch); + } + } + + public void unWatch(Collection cluster) { + synchronized (clustersToWatch) { + clustersToWatch.removeAll(clustersToWatch); + } + } + + @Override + public void run() { + try { + while (true) { + Thread.sleep(Long.MAX_VALUE); + } + } catch (InterruptedException interrupted) { + synchronized (clustersToWatch) { + if (clustersToWatch.isEmpty()) { + return; + } + logger.info("Cleanup thread was interrupted, shutting down all clusters"); + Iterator iterator = clustersToWatch.iterator(); + while (iterator.hasNext()) { + ElasticsearchCluster cluster = iterator.next(); + iterator.remove(); + try { + cluster.stop(false); + } catch (Exception e) { + logger.warn("Could not shut down {}", cluster, e); + } + } + } + } + } +} diff --git a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersCleanupExtension.java b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersCleanupExtension.java new file mode 100644 index 0000000000000..14bdfa952db0f --- /dev/null +++ b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersCleanupExtension.java @@ -0,0 +1,74 @@ +package org.elasticsearch.gradle.testclusters; + +import org.gradle.api.Project; +import org.gradle.api.logging.Logger; +import org.gradle.api.logging.Logging; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * This extensions was meant to be used internally by testclusters + * + * It holds synchronization primitives needed to implement the rate limiting. + * This is tricky because we can't use Gradle workers as there's no way to make sure that tests and their clusters are + * allocated atomically, so we could be in a situation where all workers are tests waiting for clusters to start up. + * + * Also auto configures cleanup of executors to make sure we don't leak threads in the daemon. + */ +public class TestClustersCleanupExtension { + + private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1; + private static final TimeUnit EXECUTOR_SHUTDOWN_TIMEOUT_UNIT = TimeUnit.MINUTES; + + private static final Logger logger = Logging.getLogger(TestClustersCleanupExtension.class); + + private final ExecutorService executorService; + private final TestClusterCleanupOnShutdown cleanupThread; + + public TestClustersCleanupExtension() { + executorService = Executors.newSingleThreadExecutor(); + cleanupThread = new TestClusterCleanupOnShutdown(); + executorService.submit(cleanupThread); + } + + + public static void createExtension(Project project) { + if (project.getRootProject().getExtensions().findByType(TestClustersCleanupExtension.class) != null) { + return; + } + // Configure the extension on the root project so we have a single instance per run + TestClustersCleanupExtension ext = project.getRootProject().getExtensions().create( + "__testclusters_rate_limit", + TestClustersCleanupExtension.class + ); + Thread shutdownHook = new Thread(ext.cleanupThread::run); + Runtime.getRuntime().addShutdownHook(shutdownHook); + project.getGradle().buildFinished(buildResult -> { + ext.executorService.shutdownNow(); + try { + if (ext.executorService.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIMEOUT_UNIT) == false) { + throw new IllegalStateException( + "Failed to shut down executor service after " + + EXECUTOR_SHUTDOWN_TIMEOUT + " " + EXECUTOR_SHUTDOWN_TIMEOUT_UNIT + ); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + try { + if (false == Runtime.getRuntime().removeShutdownHook(shutdownHook)) { + logger.warn("Trying to deregister shutdown hook when it was not registered."); + } + } catch (IllegalStateException ese) { + // Thrown when shutdown is in progress + logger.warn("Can't remove shutdown hook", ese); + } + }); + } + + public TestClusterCleanupOnShutdown getCleanupThread() { + return cleanupThread; + } +} diff --git a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java index 3f2a7b4dcc744..47f2eb675b151 100644 --- a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java +++ b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java @@ -43,13 +43,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class TestClustersPlugin implements Plugin { @@ -58,18 +54,14 @@ public class TestClustersPlugin implements Plugin { public static final String EXTENSION_NAME = "testClusters"; private static final String HELPER_CONFIGURATION_PREFIX = "testclusters"; private static final String SYNC_ARTIFACTS_TASK_NAME = "syncTestClustersArtifacts"; - private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1; - private static final TimeUnit EXECUTOR_SHUTDOWN_TIMEOUT_UNIT = TimeUnit.MINUTES; private static final Logger logger = Logging.getLogger(TestClustersPlugin.class); private static final String TESTCLUSTERS_INSPECT_FAILURE = "testclusters.inspect.failure"; private final Map> usedClusters = new HashMap<>(); private final Map claimsInventory = new HashMap<>(); - private final Set runningClusters =new HashSet<>(); - private final Thread shutdownHook = new Thread(this::shutDownAllClusters); + private final Set runningClusters = new HashSet<>(); private final Boolean allowClusterToSurvive = Boolean.valueOf(System.getProperty(TESTCLUSTERS_INSPECT_FAILURE, "false")); - private ExecutorService executorService = Executors.newSingleThreadExecutor(); public static String getHelperConfigurationName(String version) { return HELPER_CONFIGURATION_PREFIX + "-" + version; @@ -82,6 +74,8 @@ public void apply(Project project) { // enable the DSL to describe clusters NamedDomainObjectContainer container = createTestClustersContainerExtension(project); + TestClustersCleanupExtension.createExtension(project); + // provide a task to be able to list defined clusters. createListClustersTask(project, container); @@ -100,9 +94,6 @@ public void apply(Project project) { // After each task we determine if there are clusters that are no longer needed. configureStopClustersHook(project); - // configure hooks to make sure no test cluster processes survive the build - configureCleanupHooks(project); - // Since we have everything modeled in the DSL, add all the required dependencies e.x. the distribution to the // configuration so the user doesn't have to repeat this. autoConfigureClusterDependencies(project, rootProject, container); @@ -196,8 +187,19 @@ private void configureStartClustersHook(Project project) { @Override public void beforeActions(Task task) { // we only start the cluster before the actions, so we'll not start it if the task is up-to-date - usedClusters.getOrDefault(task, Collections.emptyList()).stream() + List neededButNotRunning = usedClusters.getOrDefault( + task, + Collections.emptyList() + ) + .stream() .filter(cluster -> runningClusters.contains(cluster) == false) + .collect(Collectors.toList()); + + project.getRootProject().getExtensions() + .getByType(TestClustersCleanupExtension.class) + .getCleanupThread() + .watch(neededButNotRunning); + neededButNotRunning .forEach(elasticsearchCluster -> { elasticsearchCluster.start(); runningClusters.add(elasticsearchCluster); @@ -220,22 +222,36 @@ public void afterExecute(Task task, TaskState state) { task, Collections.emptyList() ); + if (clustersUsedByTask.isEmpty()) { + return; + } + logger.info("Clusters were used, stopping and releasing permits"); + final int permitsToRelease; if (state.getFailure() != null) { // If the task fails, and other tasks use this cluster, the other task will likely never be - // executed at all, so we will never get to un-claim and terminate it. + // executed at all, so we will never be called again to un-claim and terminate it. clustersUsedByTask.forEach(cluster -> stopCluster(cluster, true)); + permitsToRelease = clustersUsedByTask.stream() + .map(cluster -> cluster.getNumberOfNodes()) + .reduce(Integer::sum).get(); } else { clustersUsedByTask.forEach( cluster -> claimsInventory.put(cluster, claimsInventory.getOrDefault(cluster, 0) - 1) ); - claimsInventory.entrySet().stream() + List stoppingClusers = claimsInventory.entrySet().stream() .filter(entry -> entry.getValue() == 0) .filter(entry -> runningClusters.contains(entry.getKey())) .map(Map.Entry::getKey) - .forEach(cluster -> { - stopCluster(cluster, false); - runningClusters.remove(cluster); - }); + .collect(Collectors.toList()); + stoppingClusers.forEach(cluster -> { + stopCluster(cluster, false); + runningClusters.remove(cluster); + }); + + project.getRootProject().getExtensions() + .getByType(TestClustersCleanupExtension.class) + .getCleanupThread() + .unWatch(stoppingClusers); } } @Override @@ -406,62 +422,4 @@ public void execute(Task task) { }))); } - private void configureCleanupHooks(Project project) { - // When the Gradle daemon is used, it will interrupt all threads when the build concludes. - // This is our signal to clean up - executorService.submit(() -> { - while (true) { - try { - Thread.sleep(Long.MAX_VALUE); - } catch (InterruptedException interrupted) { - shutDownAllClusters(); - Thread.currentThread().interrupt(); - return; - } - } - }); - - // When the Daemon is not used, or runs into issues, rely on a shutdown hook - // When the daemon is used, but does not work correctly and eventually dies off (e.x. due to non interruptible - // thread in the build) process will be stopped eventually when the daemon dies. - Runtime.getRuntime().addShutdownHook(shutdownHook); - - // When we don't run into anything out of the ordinary, and the build completes, makes sure to clean up - project.getGradle().buildFinished(buildResult -> { - shutdownExecutorService(); - if (false == Runtime.getRuntime().removeShutdownHook(shutdownHook)) { - logger.info("Trying to deregister shutdown hook when it was not registered."); - } - }); - } - - private void shutdownExecutorService() { - executorService.shutdownNow(); - try { - if (executorService.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIMEOUT_UNIT) == false) { - throw new IllegalStateException( - "Failed to shut down executor service after " + - EXECUTOR_SHUTDOWN_TIMEOUT + " " + EXECUTOR_SHUTDOWN_TIMEOUT_UNIT - ); - } - } catch (InterruptedException e) { - logger.info("Wait for testclusters shutdown interrupted", e); - Thread.currentThread().interrupt(); - } - } - - private void shutDownAllClusters() { - synchronized (runningClusters) { - if (runningClusters.isEmpty()) { - return; - } - Iterator iterator = runningClusters.iterator(); - while (iterator.hasNext()) { - ElasticsearchCluster next = iterator.next(); - iterator.remove(); - next.stop(false); - } - } - } - } diff --git a/gradle.properties b/gradle.properties index 491770edd7c52..63b1dc3cd7288 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ org.gradle.daemon=true -org.gradle.jvmargs=-Xmx2g -XX:+HeapDumpOnOutOfMemoryError -Xss2m +org.gradle.jvmargs=-Xmx3g -XX:+HeapDumpOnOutOfMemoryError -Xss2m options.forkOptions.memoryMaximumSize=2g