Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ public void start()
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
QueryDispatcher queryDispatcher = null;
if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(_brokerConf);
_multiStageQueryThrottler.init(_spectatorHelixManager);
// multi-stage request handler uses both Netty and GRPC ports.
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.pinot.broker.broker.helix.ClusterChangeHandler;
import org.apache.pinot.common.concurrency.AdjustableSemaphore;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,31 +52,34 @@ public class MultiStageQueryThrottler implements ClusterChangeHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageQueryThrottler.class);

private HelixManager _helixManager;
private HelixAdmin _helixAdmin;
private HelixConfigScope _helixConfigScope;
private int _numBrokers;
private int _numServers;
private final int _maxServerQueryThreadsFromBrokerConfig;
private final AtomicInteger _currentQueryServerThreads = new AtomicInteger();
/**
* If _maxServerQueryThreads is <= 0, it means that the cluster is not configured to limit the number of multi-stage
* queries that can be executed concurrently. In this case, we should not block the query.
*/
private int _maxServerQueryThreads;
private int _numBrokers;
private int _numServers;
private AdjustableSemaphore _semaphore;
private final AtomicInteger _currentQueryServerThreads = new AtomicInteger();

private HelixManager _helixManager;
private HelixAdmin _helixAdmin;
private HelixConfigScope _helixConfigScope;

public MultiStageQueryThrottler(PinotConfiguration brokerConf) {
_maxServerQueryThreadsFromBrokerConfig = brokerConf.getProperty(
CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS,
CommonConstants.Broker.DEFAULT_MSE_MAX_SERVER_QUERY_THREADS);
}

@Override
public void init(HelixManager helixManager) {
_helixManager = helixManager;
_helixAdmin = _helixManager.getClusterManagmentTool();
_helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_helixManager.getClusterName()).build();

_maxServerQueryThreads = Integer.parseInt(
_helixAdmin.getConfig(_helixConfigScope,
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS));
_maxServerQueryThreads = calculateMaxServerQueryThreads();

List<String> clusterInstances = _helixAdmin.getInstancesInCluster(_helixManager.getClusterName());
_numBrokers = Math.max(1, (int) clusterInstances.stream()
Expand All @@ -86,7 +90,8 @@ public void init(HelixManager helixManager) {
.count());

if (_maxServerQueryThreads > 0) {
_semaphore = new AdjustableSemaphore(Math.max(1, _maxServerQueryThreads * _numServers / _numBrokers), true);
int semaphoreLimit = calculateSemaphoreLimit();
_semaphore = new AdjustableSemaphore(semaphoreLimit, true);
}
}

Expand Down Expand Up @@ -114,9 +119,11 @@ public boolean tryAcquire(int numQueryThreads, long timeout, TimeUnit unit)

if (numQueryThreads > _semaphore.getTotalPermits()) {
throw new RuntimeException(
"Can't dispatch query because the estimated number of server threads for this query is too large for the "
+ "configured value of '" + CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS
+ "'. Consider increasing the value of this configuration");
String.format("Can't dispatch query because the estimated number of server threads for this query is too "
+ "large for the configured value of '%s' or '%s'. estimatedThreads=%d configuredLimit=%d",
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS,
numQueryThreads, _semaphore.getTotalPermits()));
}

boolean result = _semaphore.tryAcquire(numQueryThreads, timeout, unit);
Expand Down Expand Up @@ -156,14 +163,12 @@ public void processClusterChange(HelixConstants.ChangeType changeType) {
_numBrokers = numBrokers;
_numServers = numServers;
if (_maxServerQueryThreads > 0) {
_semaphore.setPermits(Math.max(1, _maxServerQueryThreads * _numServers / _numBrokers));
int semaphoreLimit = calculateSemaphoreLimit();
_semaphore.setPermits(semaphoreLimit);
}
}
} else {
int maxServerQueryThreads = Integer.parseInt(_helixAdmin.getConfig(_helixConfigScope,
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS));
int maxServerQueryThreads = calculateMaxServerQueryThreads();

if (_maxServerQueryThreads == maxServerQueryThreads) {
return;
Expand All @@ -179,9 +184,10 @@ public void processClusterChange(HelixConstants.ChangeType changeType) {
}

if (maxServerQueryThreads > 0) {
_semaphore.setPermits(Math.max(1, maxServerQueryThreads * _numServers / _numBrokers));
_maxServerQueryThreads = maxServerQueryThreads;
int semaphoreLimit = calculateSemaphoreLimit();
_semaphore.setPermits(semaphoreLimit);
}
_maxServerQueryThreads = maxServerQueryThreads;
}
}

Expand All @@ -193,4 +199,23 @@ public int currentQueryServerThreads() {
int availablePermits() {
return _semaphore.availablePermits();
}

@VisibleForTesting
int calculateMaxServerQueryThreads() {
if (_maxServerQueryThreadsFromBrokerConfig > 0) {
return _maxServerQueryThreadsFromBrokerConfig;
}
return Integer.parseInt(_helixAdmin.getConfig(_helixConfigScope,
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS));
}

private int calculateSemaphoreLimit() {
int semaphoreLimit = Math.max(1, _maxServerQueryThreads * _numServers / _numBrokers);
LOGGER.info("Calculating estimated server query threads limit: {} for maxServerQueryThreads: {}, "
+ "numBrokers: {}, and numServers: {}",
semaphoreLimit, _maxServerQueryThreads, _numBrokers, _numServers);
return semaphoreLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.pinot.broker.requesthandler;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
Expand Down Expand Up @@ -68,7 +70,7 @@ public void tearDown()
@Test
public void testBasicAcquireRelease()
throws Exception {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(new PinotConfiguration());
_multiStageQueryThrottler.init(_helixManager);

Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100, TimeUnit.MILLISECONDS));
Expand All @@ -83,7 +85,7 @@ public void testAcquireTimeout()
when(_helixAdmin.getConfig(any(),
eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "2"));
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(new PinotConfiguration());
_multiStageQueryThrottler.init(_helixManager);

Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100, TimeUnit.MILLISECONDS));
Expand All @@ -99,7 +101,7 @@ public void testDisabledThrottling()
when(_helixAdmin.getConfig(any(),
eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "-1"));
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(new PinotConfiguration());
_multiStageQueryThrottler.init(_helixManager);

// If maxConcurrentQueries is <= 0, the throttling mechanism should be "disabled" and any attempt to acquire should
Expand All @@ -112,7 +114,7 @@ public void testDisabledThrottling()
@Test
public void testIncreaseNumBrokers()
throws Exception {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(new PinotConfiguration());
_multiStageQueryThrottler.init(_helixManager);

for (int i = 0; i < 2; i++) {
Expand Down Expand Up @@ -140,7 +142,7 @@ public void testIncreaseNumBrokers()
@Test
public void testDecreaseNumBrokers()
throws Exception {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(new PinotConfiguration());
_multiStageQueryThrottler.init(_helixManager);

for (int i = 0; i < 2; i++) {
Expand All @@ -162,7 +164,7 @@ public void testDecreaseNumBrokers()
@Test
public void testIncreaseNumServers()
throws Exception {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(new PinotConfiguration());
_multiStageQueryThrottler.init(_helixManager);

for (int i = 0; i < 2; i++) {
Expand All @@ -185,7 +187,7 @@ public void testIncreaseNumServers()
@Test
public void testDecreaseNumServers()
throws Exception {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(new PinotConfiguration());
_multiStageQueryThrottler.init(_helixManager);

for (int i = 0; i < 2; i++) {
Expand All @@ -212,7 +214,7 @@ public void testDecreaseNumServers()
@Test
public void testIncreaseMaxServerQueryThreads()
throws Exception {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(new PinotConfiguration());
_multiStageQueryThrottler.init(_helixManager);

for (int i = 0; i < 2; i++) {
Expand All @@ -233,7 +235,7 @@ public void testIncreaseMaxServerQueryThreads()
@Test
public void testDecreaseMaxServerQueryThreads()
throws Exception {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(new PinotConfiguration());
_multiStageQueryThrottler.init(_helixManager);

for (int i = 0; i < 2; i++) {
Expand All @@ -260,7 +262,7 @@ public void testDecreaseMaxServerQueryThreads()
@Test
public void testEnabledToDisabledTransitionDisallowed()
throws Exception {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(new PinotConfiguration());
_multiStageQueryThrottler.init(_helixManager);

Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
Expand All @@ -286,7 +288,7 @@ public void testDisabledToEnabledTransitionDisallowed()
when(_helixAdmin.getConfig(any(),
eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "-1"));
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(new PinotConfiguration());
_multiStageQueryThrottler.init(_helixManager);

// If maxServerQueryThreads is <= 0, the throttling mechanism should be "disabled" and any attempt to acquire should
Expand All @@ -307,9 +309,75 @@ public void testDisabledToEnabledTransitionDisallowed()
}
}

@Test
public void testCalculateMaxServerQueryThreads() {
// Neither config is set, both use defaults
when(_helixAdmin.getConfig(any(),
eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
).thenReturn(Map.of());

PinotConfiguration emptyConfig = new PinotConfiguration(); // No MSE_MAX_SERVER_QUERY_THREADS set
_multiStageQueryThrottler = new MultiStageQueryThrottler(emptyConfig);
_multiStageQueryThrottler.init(_helixManager);

Assert.assertEquals(_multiStageQueryThrottler.calculateMaxServerQueryThreads(), -1);

// Only cluster config is set
when(_helixAdmin.getConfig(any(),
eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "10"));

_multiStageQueryThrottler = new MultiStageQueryThrottler(emptyConfig);
_multiStageQueryThrottler.init(_helixManager);

Assert.assertEquals(_multiStageQueryThrottler.calculateMaxServerQueryThreads(), 10);

// Only broker config is set
when(_helixAdmin.getConfig(any(),
eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "-1"));

Map<String, Object> brokerConfigMap = new HashMap<>();
brokerConfigMap.put(CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS, "20");
PinotConfiguration brokerConfig = new PinotConfiguration(brokerConfigMap);

_multiStageQueryThrottler = new MultiStageQueryThrottler(brokerConfig);
_multiStageQueryThrottler.init(_helixManager);

Assert.assertEquals(_multiStageQueryThrottler.calculateMaxServerQueryThreads(), 20);

// Both configs are set. Cluster config is lower. Broker config prioritized.
when(_helixAdmin.getConfig(any(),
eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "15"));

Map<String, Object> brokerConfigMap2 = new HashMap<>();
brokerConfigMap2.put(CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS, "25");
PinotConfiguration brokerConfig2 = new PinotConfiguration(brokerConfigMap2);

_multiStageQueryThrottler = new MultiStageQueryThrottler(brokerConfig2);
_multiStageQueryThrottler.init(_helixManager);

Assert.assertEquals(_multiStageQueryThrottler.calculateMaxServerQueryThreads(), 25);

// Both configs are set. Broker config is lower. Broker config prioritized.
when(_helixAdmin.getConfig(any(),
eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "30"));

Map<String, Object> brokerConfigMap3 = new HashMap<>();
brokerConfigMap3.put(CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS, "5");
PinotConfiguration brokerConfig3 = new PinotConfiguration(brokerConfigMap3);

_multiStageQueryThrottler = new MultiStageQueryThrottler(brokerConfig3);
_multiStageQueryThrottler.init(_helixManager);

Assert.assertEquals(_multiStageQueryThrottler.calculateMaxServerQueryThreads(), 5);
}

@Test
public void testLowMaxServerQueryThreads() {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(new PinotConfiguration());
_multiStageQueryThrottler.init(_helixManager);

Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
Expand All @@ -321,7 +389,7 @@ public void testLowMaxServerQueryThreads() {
@Test
public void testAcquireReleaseWithDifferentQuerySizes()
throws Exception {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler = new MultiStageQueryThrottler(new PinotConfiguration());
_multiStageQueryThrottler.init(_helixManager);

Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
Expand Down
Loading
Loading