diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java b/presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java index 5ca9b785b15af..be60a5b3701ef 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java @@ -36,9 +36,11 @@ import org.joda.time.DateTimeZone; import java.io.File; +import java.net.URI; import java.nio.file.Path; import java.util.Map; import java.util.Optional; +import java.util.function.BiFunction; import static com.facebook.airlift.log.Level.ERROR; import static com.facebook.airlift.log.Level.WARN; @@ -91,13 +93,13 @@ public static DistributedQueryRunner createQueryRunner( Optional baseDataDir) throws Exception { - return createQueryRunner(tables, extraProperties, extraCoordinatorProperties, "sql-standard", ImmutableMap.of(), baseDataDir); + return createQueryRunner(tables, extraProperties, extraCoordinatorProperties, "sql-standard", ImmutableMap.of(), Optional.empty(), baseDataDir, Optional.empty()); } public static DistributedQueryRunner createQueryRunner(Iterable> tables, Map extraProperties, Optional baseDataDir) throws Exception { - return createQueryRunner(tables, extraProperties, ImmutableMap.of(), "sql-standard", ImmutableMap.of(), baseDataDir); + return createQueryRunner(tables, extraProperties, ImmutableMap.of(), "sql-standard", ImmutableMap.of(), Optional.empty(), baseDataDir, Optional.empty()); } public static DistributedQueryRunner createQueryRunner( @@ -108,7 +110,7 @@ public static DistributedQueryRunner createQueryRunner( Optional baseDataDir) throws Exception { - return createQueryRunner(tables, extraProperties, ImmutableMap.of(), security, extraHiveProperties, baseDataDir); + return createQueryRunner(tables, extraProperties, ImmutableMap.of(), security, extraHiveProperties, Optional.empty(), baseDataDir, Optional.empty()); } public static DistributedQueryRunner createQueryRunner( @@ -117,7 +119,9 @@ public static DistributedQueryRunner createQueryRunner( Map extraCoordinatorProperties, String security, Map extraHiveProperties, - Optional baseDataDir) + Optional workerCount, + Optional baseDataDir, + Optional> externalWorkerLauncher) throws Exception { assertEquals(DateTimeZone.getDefault(), TIME_ZONE, "Timezone not configured correctly. Add -Duser.timezone=America/Bahia_Banderas to your JVM arguments"); @@ -131,10 +135,11 @@ public static DistributedQueryRunner createQueryRunner( DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(createSession(Optional.of(new SelectedRole(ROLE, Optional.of("admin"))))) - .setNodeCount(4) + .setNodeCount(workerCount.orElse(4)) .setExtraProperties(systemProperties) .setCoordinatorProperties(extraCoordinatorProperties) .setBaseDataDir(baseDataDir) + .setExternalWorkerLauncher(externalWorkerLauncher) .build(); try { queryRunner.installPlugin(new TpchPlugin()); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveExternalWorkersQueries.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveExternalWorkersQueries.java new file mode 100644 index 0000000000000..66f7f8b510830 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveExternalWorkersQueries.java @@ -0,0 +1,127 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.spi.api.Experimental; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.facebook.presto.tests.DistributedQueryRunner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.tpch.TpchTable.NATION; +import static java.lang.String.format; + +@Experimental +public class TestHiveExternalWorkersQueries + extends AbstractTestQueryFramework +{ + protected TestHiveExternalWorkersQueries() + { + super(TestHiveExternalWorkersQueries::createQueryRunner); + } + + private static QueryRunner createQueryRunner() + throws Exception + { + String prestoServerPath = System.getenv("PRESTO_SERVER"); + String baseDataDir = System.getenv("DATA_DIR"); + + return createQueryRunner(Optional.ofNullable(prestoServerPath), Optional.ofNullable(baseDataDir).map(Paths::get)); + } + + private static QueryRunner createQueryRunner(Optional prestoServerPath, Optional baseDataDir) + throws Exception + { + if (!prestoServerPath.isPresent()) { + return HiveQueryRunner.createQueryRunner( + ImmutableList.of(NATION), + ImmutableMap.of(), + "sql-standard", + ImmutableMap.of("hive.storage-format", "DWRF"), + baseDataDir); + } + + checkArgument(baseDataDir.isPresent(), "Path to data files must be specified when testing external workers"); + + // Make TPC-H tables in DWRF format using Java-based workers + HiveQueryRunner.createQueryRunner( + ImmutableList.of(NATION), + ImmutableMap.of(), + "sql-standard", + ImmutableMap.of("hive.storage-format", "DWRF"), + baseDataDir).close(); + + Path tempDirectoryPath = Files.createTempDirectory(TestHiveExternalWorkersQueries.class.getSimpleName()); + + // Make query runner with external workers for tests + DistributedQueryRunner queryRunner = HiveQueryRunner.createQueryRunner(ImmutableList.of(NATION), + ImmutableMap.of("optimizer.optimize-hash-generation", "false"), + ImmutableMap.of(), + "sql-standard", + ImmutableMap.of(), + Optional.of(1), + baseDataDir, + Optional.of((workerIndex, discoveryUri) -> { + try { + if (workerIndex == 0) { + // Write discovery URL to /tmp/config.properties + Files.write(tempDirectoryPath.resolve("config.properties"), + format("discovery.uri=%s\n", discoveryUri).getBytes()); + } + return new ProcessBuilder(prestoServerPath.get(), "--logtostderr=1", "--v=1") + .directory(tempDirectoryPath.toFile()) + .redirectErrorStream(true) + .redirectOutput(ProcessBuilder.Redirect.INHERIT) + .redirectError(ProcessBuilder.Redirect.INHERIT) + .start(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + })); + + return queryRunner; + } + + @Test + public void testFiltersAndProjections() + { + assertQuery("SELECT * FROM nation"); + assertQuery("SELECT * FROM nation WHERE nationkey = 4"); + assertQuery("SELECT * FROM nation WHERE nationkey <> 4"); + assertQuery("SELECT * FROM nation WHERE nationkey < 4"); + assertQuery("SELECT * FROM nation WHERE nationkey <= 4"); + assertQuery("SELECT * FROM nation WHERE nationkey > 4"); + assertQuery("SELECT * FROM nation WHERE nationkey >= 4"); + assertQuery("SELECT nationkey * 10, nationkey % 5, -nationkey, nationkey / 3 FROM nation"); + assertQuery("SELECT *, nationkey / 3 FROM nation"); + } + + @Test + public void testAggregations() + { + assertQuery("SELECT count(*) FROM nation"); + assertQuery("SELECT regionkey, count(*) FROM nation GROUP BY regionkey"); + } +} diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java b/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java index ebb2535b0ffc1..47bd542026fad 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java @@ -66,6 +66,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiFunction; import java.util.function.Function; import static com.facebook.presto.testing.TestingSession.TESTING_CATALOG; @@ -91,6 +92,7 @@ public class DistributedQueryRunner private final TestingDiscoveryServer discoveryServer; private final TestingPrestoServer coordinator; private final List servers; + private final List externalWorkers; private final Closer closer = Closer.create(); @@ -111,7 +113,7 @@ public DistributedQueryRunner(Session defaultSession, int nodeCount) public DistributedQueryRunner(Session defaultSession, int nodeCount, Map extraProperties) throws Exception { - this(defaultSession, nodeCount, extraProperties, ImmutableMap.of(), DEFAULT_SQL_PARSER_OPTIONS, ENVIRONMENT, Optional.empty()); + this(defaultSession, nodeCount, extraProperties, ImmutableMap.of(), DEFAULT_SQL_PARSER_OPTIONS, ENVIRONMENT, Optional.empty(), Optional.empty()); } public static Builder builder(Session defaultSession) @@ -126,7 +128,8 @@ private DistributedQueryRunner( Map coordinatorProperties, SqlParserOptions parserOptions, String environment, - Optional baseDataDir) + Optional baseDataDir, + Optional> externalWorkerLauncher) throws Exception { requireNonNull(defaultSession, "defaultSession is null"); @@ -136,19 +139,40 @@ private DistributedQueryRunner( discoveryServer = new TestingDiscoveryServer(environment); closer.register(() -> closeUnchecked(discoveryServer)); log.info("Created TestingDiscoveryServer in %s", nanosSince(start).convertToMostSuccinctTimeUnit()); + URI discoveryUrl = discoveryServer.getBaseUrl(); + log.info("Discovery URL %s", discoveryUrl); ImmutableList.Builder servers = ImmutableList.builder(); + Map extraCoordinatorProperties = new HashMap<>(); - for (int i = 1; i < nodeCount; i++) { - TestingPrestoServer worker = closer.register(createTestingPrestoServer(discoveryServer.getBaseUrl(), false, extraProperties, parserOptions, environment, baseDataDir)); - servers.add(worker); + if (externalWorkerLauncher.isPresent()) { + ImmutableList.Builder externalWorkersBuilder = ImmutableList.builder(); + for (int i = 0; i < nodeCount; i++) { + externalWorkersBuilder.add(externalWorkerLauncher.get().apply(i, discoveryUrl)); + } + externalWorkers = externalWorkersBuilder.build(); + closer.register(() -> { + for (Process worker : externalWorkers) { + worker.destroyForcibly(); + } + }); + + // Don't use coordinator as worker + extraCoordinatorProperties.put("node-scheduler.include-coordinator", "false"); + } + else { + externalWorkers = ImmutableList.of(); + + for (int i = 1; i < nodeCount; i++) { + TestingPrestoServer worker = closer.register(createTestingPrestoServer(discoveryUrl, false, extraProperties, parserOptions, environment, baseDataDir)); + servers.add(worker); + } } - Map extraCoordinatorProperties = new HashMap<>(); extraCoordinatorProperties.put("experimental.iterative-optimizer-enabled", "true"); extraCoordinatorProperties.putAll(extraProperties); extraCoordinatorProperties.putAll(coordinatorProperties); - coordinator = closer.register(createTestingPrestoServer(discoveryServer.getBaseUrl(), true, extraCoordinatorProperties, parserOptions, environment, baseDataDir)); + coordinator = closer.register(createTestingPrestoServer(discoveryUrl, true, extraCoordinatorProperties, parserOptions, environment, baseDataDir)); servers.add(coordinator); this.servers = servers.build(); @@ -217,10 +241,11 @@ private static TestingPrestoServer createTestingPrestoServer(URI discoveryUri, b private boolean allNodesGloballyVisible() { + int expectedActiveNodes = externalWorkers.size() + servers.size(); for (TestingPrestoServer server : servers) { AllNodes allNodes = server.refreshNodes(); if (!allNodes.getInactiveNodes().isEmpty() || - (allNodes.getActiveNodes().size() != servers.size())) { + (allNodes.getActiveNodes().size() != expectedActiveNodes)) { return false; } } @@ -336,7 +361,7 @@ public void createCatalog(String catalogName, String connectorName, Map activeNodesWithConnector = server.getActiveNodesWithConnector(connectorId); @@ -540,6 +569,7 @@ public static class Builder private SqlParserOptions parserOptions = DEFAULT_SQL_PARSER_OPTIONS; private String environment = ENVIRONMENT; private Optional baseDataDir = Optional.empty(); + private Optional> externalWorkerLauncher = Optional.empty(); protected Builder(Session defaultSession) { @@ -609,10 +639,16 @@ public Builder setBaseDataDir(Optional baseDataDir) return this; } + public Builder setExternalWorkerLauncher(Optional> externalWorkerLauncher) + { + this.externalWorkerLauncher = requireNonNull(externalWorkerLauncher, "externalWorkerLauncher is null"); + return this; + } + public DistributedQueryRunner build() throws Exception { - return new DistributedQueryRunner(defaultSession, nodeCount, extraProperties, coordinatorProperties, parserOptions, environment, baseDataDir); + return new DistributedQueryRunner(defaultSession, nodeCount, extraProperties, coordinatorProperties, parserOptions, environment, baseDataDir, externalWorkerLauncher); } } }