Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
*/
package org.apache.pinot.query.context;

import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.routing.RoutingManager;
Expand Down Expand Up @@ -80,9 +84,16 @@ public PhysicalPlannerContext() {
_liteModeServerStageLimit = CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT;
}

public PhysicalPlannerContext(RoutingManager routingManager, String hostName, int port, long requestId,
String instanceId, Map<String, String> queryOptions) {
this(routingManager, hostName, port, requestId, instanceId, queryOptions,
CommonConstants.Broker.DEFAULT_USE_LITE_MODE, CommonConstants.Broker.DEFAULT_RUN_IN_BROKER,
CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING, CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT);
}

public PhysicalPlannerContext(RoutingManager routingManager, String hostName, int port, long requestId,
String instanceId, Map<String, String> queryOptions, boolean defaultUseLiteMode, boolean defaultRunInBroker,
boolean defaultUseBrokerPruning, int defaultLiteModeServerStageLimit) {
boolean defaultUseBrokerPruning, int defaultLiteModeLeafStageLimit) {
_routingManager = routingManager;
_hostName = hostName;
_port = port;
Expand All @@ -93,7 +104,7 @@ public PhysicalPlannerContext(RoutingManager routingManager, String hostName, in
_runInBroker = QueryOptionsUtils.isRunInBroker(_queryOptions, defaultRunInBroker);
_useBrokerPruning = QueryOptionsUtils.isUseBrokerPruning(_queryOptions, defaultUseBrokerPruning);
_liteModeServerStageLimit = QueryOptionsUtils.getLiteModeServerStageLimit(_queryOptions,
defaultLiteModeServerStageLimit);
defaultLiteModeLeafStageLimit);
_instanceIdToQueryServerInstance.put(instanceId, getBrokerQueryServerInstance());
}

Expand Down Expand Up @@ -146,6 +157,24 @@ public int getLiteModeServerStageLimit() {
return _liteModeServerStageLimit;
}

/**
* Gets a random instance id from the registered instances in the context.
* <p>
* <b>Important:</b> This method will always return a server instanceId, unless no server has yet been registered
* with the context, which could happen for queries which don't consist of any table-scans.
* </p>
*/
public String getRandomInstanceId() {
Preconditions.checkState(!_instanceIdToQueryServerInstance.isEmpty(), "No instances present in context");
if (_instanceIdToQueryServerInstance.size() == 1) {
return _instanceIdToQueryServerInstance.keySet().iterator().next();
}
int numCandidates = _instanceIdToQueryServerInstance.size() - 1;
Random random = ThreadLocalRandom.current();
return _instanceIdToQueryServerInstance.keySet().stream().filter(instanceId -> !_instanceId.equals(instanceId))
.collect(Collectors.toList()).get(numCandidates == 1 ? 0 : random.nextInt(numCandidates - 1));
}

private QueryServerInstance getBrokerQueryServerInstance() {
return new QueryServerInstance(_instanceId, _hostName, _port, _port);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@
*/
package org.apache.pinot.query.planner.physical.v2.opt.rules;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
Expand All @@ -50,7 +45,6 @@
* plan nodes.
*/
public class LiteModeWorkerAssignmentRule implements PRelNodeTransformer {
private static final Random RANDOM = new Random();
private final PhysicalPlannerContext _context;
private final boolean _runInBroker;

Expand All @@ -61,13 +55,11 @@ public LiteModeWorkerAssignmentRule(PhysicalPlannerContext context) {

@Override
public PRelNode execute(PRelNode currentNode) {
Set<String> workerSet = new HashSet<>();
List<String> workers;
if (_runInBroker) {
workers = List.of(String.format("0@%s", _context.getInstanceId()));
workers = List.of("0@" + _context.getInstanceId());
} else {
accumulateWorkers(currentNode, workerSet);
workers = List.of(sampleWorker(new ArrayList<>(workerSet)));
workers = List.of("0@" + _context.getRandomInstanceId());
}
return addExchangeAndWorkers(currentNode, null, workers);
}
Expand Down Expand Up @@ -98,36 +90,6 @@ public PRelNode addExchangeAndWorkers(PRelNode currentNode, @Nullable PRelNode p
return currentNode;
}

/**
* Stores workers assigned to the leaf stage nodes into the provided Set. Note that each worker has an integer prefix
* which denotes the "workerId". We remove that prefix before storing them in the set.
*/
@VisibleForTesting
static void accumulateWorkers(PRelNode currentNode, Set<String> workerSink) {
if (currentNode.isLeafStage()) {
workerSink.addAll(currentNode.getPinotDataDistributionOrThrow().getWorkers().stream()
.map(LiteModeWorkerAssignmentRule::stripIdPrefixFromWorker).collect(Collectors.toList()));
return;
}
for (PRelNode input : currentNode.getPRelInputs()) {
accumulateWorkers(input, workerSink);
}
}

/**
* Samples a worker from the given list.
*/
@VisibleForTesting
static String sampleWorker(List<String> instanceIds) {
Preconditions.checkState(!instanceIds.isEmpty(), "No workers in leaf stage");
return String.format("0@%s", instanceIds.get(RANDOM.nextInt(instanceIds.size())));
}

@VisibleForTesting
static String stripIdPrefixFromWorker(String worker) {
return worker.split("@")[1];
}

/**
* Infers Exchange to be added on top of the leaf stage.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.Values;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
Expand Down Expand Up @@ -394,12 +395,18 @@ private Optional<HashDistributionDesc> getLeftInputHashDistributionDesc(Join joi
* Computes the PinotDataDistribution of the given node from the input node. This assumes that all traits of the
* input node are already satisfied.
*/
private static PinotDataDistribution computeCurrentNodeDistribution(PRelNode currentNode, @Nullable PRelNode parent) {
private PinotDataDistribution computeCurrentNodeDistribution(PRelNode currentNode, @Nullable PRelNode parent) {
if (currentNode.getPinotDataDistribution() != null) {
Preconditions.checkState(isLeafStageBoundary(currentNode, parent),
"current node should not have assigned data distribution unless it's a boundary");
return currentNode.getPinotDataDistributionOrThrow();
}
if (currentNode.getPRelInputs().isEmpty()) {
Preconditions.checkState(currentNode.unwrap() instanceof Values, "Expected Values node. Found: %s",
currentNode.unwrap());
List<String> workers = List.of(String.format("0@%s", _physicalPlannerContext.getRandomInstanceId()));
return new PinotDataDistribution(RelDistribution.Type.SINGLETON, workers, workers.hashCode(), null, null);
}
PinotDataDistribution inputDistribution = currentNode.getPRelInput(0).getPinotDataDistributionOrThrow();
PinotDataDistribution newDistribution = inputDistribution.apply(DistMappingGenerator.compute(
currentNode.unwrap().getInput(0), currentNode.unwrap(), null),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.context;

import java.util.Map;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.mockito.Mockito.mock;

public class PhysicalPlannerContextTest {

@Test
public void testGetRandomInstanceIdWithNoInstances() {
// Test case: No instances present in context (should throw IllegalStateException)
PhysicalPlannerContext context = createPhysicalPlannerContext();

// Clear the instances map to simulate no instances
context.getInstanceIdToQueryServerInstance().clear();

try {
context.getRandomInstanceId();
Assert.fail("Expected IllegalStateException when no instances are present");
} catch (IllegalStateException e) {
Assert.assertEquals(e.getMessage(), "No instances present in context");
}
}

@Test
public void testGetRandomInstanceIdWithSingleInstance() {
// Test case: Only one instance present (should return that instance)
PhysicalPlannerContext context = createPhysicalPlannerContext();

// The constructor automatically adds the broker instance, so we should have exactly one
String randomInstanceId = context.getRandomInstanceId();
Assert.assertEquals(randomInstanceId, "broker_instance_1");
}

@Test
public void testGetRandomInstanceIdWithMultipleInstances() {
// Test case: Multiple instances present (should return one that's not the current instance)
PhysicalPlannerContext context = createPhysicalPlannerContext();

// Add additional server instances
QueryServerInstance serverInstance2 = new QueryServerInstance("server_instance_2", "host2", 8081, 8081);
QueryServerInstance serverInstance3 = new QueryServerInstance("server_instance_3", "host3", 8082, 8082);

context.getInstanceIdToQueryServerInstance().put("server_instance_2", serverInstance2);
context.getInstanceIdToQueryServerInstance().put("server_instance_3", serverInstance3);

// Call getRandomInstanceId multiple times to verify it returns different server instances
// but never the broker instance
for (int i = 0; i < 10; i++) {
String randomInstanceId = context.getRandomInstanceId();
Assert.assertNotEquals(randomInstanceId, "broker_instance_1",
"Random instance should not be the current broker instance");
Assert.assertTrue(randomInstanceId.equals("server_instance_2") || randomInstanceId.equals("server_instance_3"),
"Random instance should be one of the server instances");
}
}

@Test
public void testGetRandomInstanceIdWithTwoInstances() {
// Test case: Two instances (broker + one server) - should return the server
PhysicalPlannerContext context = createPhysicalPlannerContext();

// Add one server instance
QueryServerInstance serverInstance = new QueryServerInstance("server_instance_1", "host1", 8081, 8081);
context.getInstanceIdToQueryServerInstance().put("server_instance_1", serverInstance);

String randomInstanceId = context.getRandomInstanceId();
Assert.assertEquals(randomInstanceId, "server_instance_1",
"With two instances, should return the non-broker instance");
}

private PhysicalPlannerContext createPhysicalPlannerContext() {
RoutingManager mockRoutingManager = mock(RoutingManager.class);
return new PhysicalPlannerContext(mockRoutingManager, "localhost", 8080, 12345L, "broker_instance_1", Map.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,15 @@
*/
package org.apache.pinot.query.planner.physical.v2.opt.rules;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.*;


public class LiteModeWorkerAssignmentRuleTest {
@Test
public void testAccumulateWorkers() {
PRelNode leafOne = create(List.of(), true, List.of("0@server-1", "1@server-2"));
PRelNode leafTwo = create(List.of(), true, List.of("0@server-2", "1@server-1"));
PRelNode intermediateNode = create(List.of(leafOne, leafTwo), false, List.of("0@server-3", "1@server-4"));
Set<String> workers = new HashSet<>();
LiteModeWorkerAssignmentRule.accumulateWorkers(intermediateNode, workers);
assertEquals(workers, Set.of("server-1", "server-2"));
}

@Test
public void testSampleWorker() {
List<String> workers = List.of("worker-0", "worker-1", "worker-2");
Set<String> selectionCandidates = Set.of("0@worker-0", "0@worker-1", "0@worker-2");
Set<String> selectedWorkers = new HashSet<>();
for (int iteration = 0; iteration < 1000; iteration++) {
selectedWorkers.add(LiteModeWorkerAssignmentRule.sampleWorker(workers));
}
assertEquals(selectedWorkers, selectionCandidates);
}

private PRelNode create(List<PRelNode> inputs, boolean isLeafStage, List<String> workers) {
// Setup mock pinot data distribution.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,61 @@
{
"physical_opt_constant_queries": {
"queries": [
{
"description": "Select 1",
"sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT 1",
"output": [
"Execution Plan",
"\nPhysicalValues(tuples=[[{ 1 }]])",
"\n"
]
},
{
"description": "Constant only join query",
"sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR WITH tmp1(id, name) AS (VALUES(1, 'foo')), tmp2(user_id, nm2) AS (VALUES(1, 'bar')) SELECT * FROM tmp1 JOIN tmp2 ON 1=1",
"output": [
"Execution Plan",
"\nPhysicalJoin(condition=[true], joinType=[inner])",
"\n PhysicalValues(tuples=[[{ 1, _UTF-8'foo' }]])",
"\n PhysicalValues(tuples=[[{ 1, _UTF-8'bar' }]])",
"\n"
]
},
{
"description": "Query that gets optimized to a Values node",
"sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM a WHERE col1 IS NULL LIMIT 1",
"output": [
"Execution Plan",
"\nPhysicalSort(fetch=[1])",
"\n PhysicalValues(tuples=[[]])",
"\n"
]
},
{
"description": "Constant only join query",
"sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT COUNT(*) FROM a WHERE col1 IN (WITH tmp1(id, name) AS (VALUES(1, 'foo')), tmp2(user_id, nm) AS (VALUES(2, 'bar')) SELECT A.name FROM tmp1 AS A JOIN tmp2 AS B ON A.id = B.user_id)",
"output": [
"Execution Plan",
"\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalAggregate(group=[{}], agg#0=[COUNT($0)], aggType=[FINAL])",
"\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalAggregate(group=[{}], agg#0=[COUNT()], aggType=[LEAF])",
"\n PhysicalJoin(condition=[=($0, $1)], joinType=[semi])",
"\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
"\n PhysicalProject(col1=[$0])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
"\n PhysicalProject(name=[$1])",
"\n PhysicalJoin(condition=[=($0, $2)], joinType=[inner])",
"\n PhysicalValues(tuples=[[]])",
"\n PhysicalProject(EXPR$0=[$0])",
"\n PhysicalValues(tuples=[[]])",
"\n"
]
}
]
},
"physical_opt_chained_subqueries": {
"queries": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ private <E> void execute(long requestId, Set<DispatchablePlanFragment> stagePlan
serializePlanFragments(stagePlans, serverInstancesOut, deadline);

if (serverInstancesOut.isEmpty()) {
throw new RuntimeException("No server instances to dispatch query to");
return;
}

Map<String, String> requestMetadata =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
{
"description": "basic test with literal",
"sql": "SELECT 1 AS int, CAST(2 AS DOUBLE) AS double"
},
{
"description": "select 1 but alias to a reserved column name",
"sql": "SELECT 1 as \"timestamp\""
}
]
},
Expand Down
Loading