Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ public void sampleUsageMSE() {
}
}

@Override
public boolean throttleQuerySubmission() {
return getWatcherTask().getHeapUsageBytes() > getWatcherTask().getQueryMonitorConfig().getAlarmingLevel();
}

@Override
public boolean isAnchorThreadInterrupted() {
ThreadExecutionContext context = _threadLocalEntry.get().getCurrentThreadTaskStatus();
Expand Down Expand Up @@ -632,6 +637,10 @@ public QueryMonitorConfig getQueryMonitorConfig() {
return _queryMonitorConfig.get();
}

public long getHeapUsageBytes() {
return _usedBytes;
}

@Override
public synchronized void onChange(Set<String> changedConfigs, Map<String, String> clusterConfigs) {
// Filter configs that have CommonConstants.PREFIX_SCHEDULER_PREFIX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.resources.BinaryWorkloadResourceManager;
import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.slf4j.Logger;
Expand Down Expand Up @@ -73,8 +74,9 @@ public class BinaryWorkloadScheduler extends QueryScheduler {
Thread _scheduler;

public BinaryWorkloadScheduler(PinotConfiguration config, QueryExecutor queryExecutor, ServerMetrics metrics,
LongAccumulator latestQueryTime) {
super(config, queryExecutor, new BinaryWorkloadResourceManager(config), metrics, latestQueryTime);
LongAccumulator latestQueryTime, ThreadResourceUsageAccountant resourceUsageAccountant) {
super(config, queryExecutor, new BinaryWorkloadResourceManager(config, resourceUsageAccountant), metrics,
latestQueryTime);

_secondaryQueryQ = new SecondaryWorkloadQueue(config, _resourceManager);
_numSecondaryRunners = config.getProperty(MAX_SECONDARY_QUERIES, DEFAULT_MAX_SECONDARY_QUERIES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pinot.core.query.scheduler.fcfs.BoundedFCFSScheduler;
import org.apache.pinot.core.query.scheduler.fcfs.FCFSQueryScheduler;
import org.apache.pinot.core.query.scheduler.tokenbucket.TokenPriorityScheduler;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.plugin.PluginManager;
import org.slf4j.Logger;
Expand Down Expand Up @@ -58,24 +59,29 @@ private QuerySchedulerFactory() {
* @return returns an instance of query scheduler
*/
public static QueryScheduler create(PinotConfiguration schedulerConfig, QueryExecutor queryExecutor,
ServerMetrics serverMetrics, LongAccumulator latestQueryTime) {
ServerMetrics serverMetrics, LongAccumulator latestQueryTime,
ThreadResourceUsageAccountant resourceUsageAccountant) {
Preconditions.checkNotNull(schedulerConfig);
Preconditions.checkNotNull(queryExecutor);

String schedulerName = schedulerConfig.getProperty(ALGORITHM_NAME_CONFIG_KEY, DEFAULT_QUERY_SCHEDULER_ALGORITHM);
QueryScheduler scheduler;
switch (schedulerName.toLowerCase()) {
case FCFS_ALGORITHM:
scheduler = new FCFSQueryScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
scheduler = new FCFSQueryScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime,
resourceUsageAccountant);
break;
case TOKEN_BUCKET_ALGORITHM:
scheduler = TokenPriorityScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
scheduler = TokenPriorityScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime,
resourceUsageAccountant);
break;
case BOUNDED_FCFS_ALGORITHM:
scheduler = BoundedFCFSScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
scheduler = BoundedFCFSScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime,
resourceUsageAccountant);
break;
case BINARY_WORKLOAD_ALGORITHM:
scheduler = new BinaryWorkloadScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
scheduler = new BinaryWorkloadScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime,
resourceUsageAccountant);
break;
default:
scheduler =
Expand All @@ -93,7 +99,8 @@ public static QueryScheduler create(PinotConfiguration schedulerConfig, QueryExe
// Failure on bad configuration will cause outage vs an inferior algorithm that
// will provide degraded service
LOGGER.warn("Scheduler {} not found. Using default FCFS query scheduler", schedulerName);
return new FCFSQueryScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime);
return new FCFSQueryScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime,
resourceUsageAccountant);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pinot.core.query.scheduler.TableBasedGroupMapper;
import org.apache.pinot.core.query.scheduler.resources.PolicyBasedResourceManager;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;


Expand All @@ -39,8 +40,9 @@
*/
public class BoundedFCFSScheduler extends PriorityScheduler {
public static BoundedFCFSScheduler create(PinotConfiguration config, QueryExecutor queryExecutor,
ServerMetrics serverMetrics, LongAccumulator latestQueryTime) {
final ResourceManager rm = new PolicyBasedResourceManager(config);
ServerMetrics serverMetrics, LongAccumulator latestQueryTime,
ThreadResourceUsageAccountant resourceUsageAccountant) {
final ResourceManager rm = new PolicyBasedResourceManager(config, resourceUsageAccountant);
final SchedulerGroupFactory groupFactory = new SchedulerGroupFactory() {
@Override
public SchedulerGroup create(PinotConfiguration config, String groupName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
import org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.query.QueryThreadContext;

Expand All @@ -41,8 +42,9 @@
public class FCFSQueryScheduler extends QueryScheduler {

public FCFSQueryScheduler(PinotConfiguration config, QueryExecutor queryExecutor, ServerMetrics serverMetrics,
LongAccumulator latestQueryTime) {
super(config, queryExecutor, new UnboundedResourceManager(config), serverMetrics, latestQueryTime);
LongAccumulator latestQueryTime, ThreadResourceUsageAccountant resourceUsageAccountant) {
super(config, queryExecutor, new UnboundedResourceManager(config, resourceUsageAccountant), serverMetrics,
latestQueryTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,8 +35,9 @@ public class BinaryWorkloadResourceManager extends ResourceManager {
private static final Logger LOGGER = LoggerFactory.getLogger(BinaryWorkloadResourceManager.class);
private final ResourceLimitPolicy _secondaryWorkloadPolicy;

public BinaryWorkloadResourceManager(PinotConfiguration config) {
super(config);
public BinaryWorkloadResourceManager(PinotConfiguration config,
ThreadResourceUsageAccountant resourceUsageAccountant) {
super(config, resourceUsageAccountant);
_secondaryWorkloadPolicy = new ResourceLimitPolicy(config, _numQueryWorkerThreads);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,8 +35,8 @@ public class PolicyBasedResourceManager extends ResourceManager {

private final ResourceLimitPolicy _resourcePolicy;

public PolicyBasedResourceManager(PinotConfiguration config) {
super(config);
public PolicyBasedResourceManager(PinotConfiguration config, ThreadResourceUsageAccountant resourceUsageAccountant) {
super(config, resourceUsageAccountant);
_resourcePolicy = new ResourceLimitPolicy(config, _numQueryWorkerThreads);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.executor.ThrottleOnCriticalHeapUsageExecutor;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,7 +78,7 @@ public abstract class ResourceManager {
/**
* @param config configuration for initializing resource manager
*/
public ResourceManager(PinotConfiguration config) {
public ResourceManager(PinotConfiguration config, ThreadResourceUsageAccountant resourceUsageAccountant) {
_numQueryRunnerThreads = config.getProperty(QUERY_RUNNER_CONFIG_KEY, DEFAULT_QUERY_RUNNER_THREADS);
_numQueryWorkerThreads = config.getProperty(QUERY_WORKER_CONFIG_KEY, DEFAULT_QUERY_WORKER_THREADS);

Expand All @@ -85,14 +87,23 @@ public ResourceManager(PinotConfiguration config) {
// pqr -> pinot query runner (to give short names)
ThreadFactory queryRunnerFactory = new TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false,
CommonConstants.ExecutorService.PINOT_QUERY_RUNNER_NAME_FORMAT);
_queryRunners =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory));

ExecutorService runnerService = Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
if (config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
CommonConstants.Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE)) {
runnerService = new ThrottleOnCriticalHeapUsageExecutor(runnerService, resourceUsageAccountant);
}
_queryRunners = MoreExecutors.listeningDecorator(runnerService);

// pqw -> pinot query workers
ThreadFactory queryWorkersFactory = new TracedThreadFactory(Thread.NORM_PRIORITY, false,
CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
_queryWorkers =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory));
ExecutorService workerService = Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
if (config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
CommonConstants.Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE)) {
workerService = new ThrottleOnCriticalHeapUsageExecutor(workerService, resourceUsageAccountant);
}
_queryWorkers = MoreExecutors.listeningDecorator(workerService);
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;


Expand All @@ -30,8 +31,8 @@
*/
public class UnboundedResourceManager extends ResourceManager {

public UnboundedResourceManager(PinotConfiguration config) {
super(config);
public UnboundedResourceManager(PinotConfiguration config, ThreadResourceUsageAccountant resourceUsageAccountant) {
super(config, resourceUsageAccountant);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pinot.core.query.scheduler.TableBasedGroupMapper;
import org.apache.pinot.core.query.scheduler.resources.PolicyBasedResourceManager;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;


Expand All @@ -42,8 +43,8 @@ public class TokenPriorityScheduler extends PriorityScheduler {
private static final int DEFAULT_TOKEN_LIFETIME_MS = 100;

public static TokenPriorityScheduler create(PinotConfiguration config, QueryExecutor queryExecutor,
ServerMetrics metrics, LongAccumulator latestQueryTime) {
final ResourceManager rm = new PolicyBasedResourceManager(config);
ServerMetrics metrics, LongAccumulator latestQueryTime, ThreadResourceUsageAccountant resourceUsageAccountant) {
final ResourceManager rm = new PolicyBasedResourceManager(config, resourceUsageAccountant);
final SchedulerGroupFactory groupFactory = new SchedulerGroupFactory() {
@Override
public SchedulerGroup create(PinotConfiguration config, String groupName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ public void testThreadMemory()
private ResourceManager getResourceManager(int runners, int workers, final int softLimit, final int hardLimit,
Map<String, Object> map) {

return new ResourceManager(getConfig(runners, workers, map)) {
return new ResourceManager(getConfig(runners, workers, map), new Tracing.DefaultThreadResourceUsageAccountant()) {

@Override
public QueryExecutorService getExecutorService(ServerQueryRequest query, SchedulerGroupAccountant accountant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.trace.Tracing;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -81,7 +82,8 @@ public void testPutOutOfCapacity()

PinotConfiguration configuration = new PinotConfiguration(properties);

ResourceManager rm = new UnboundedResourceManager(configuration);
ResourceManager rm =
new UnboundedResourceManager(configuration, new Tracing.DefaultThreadResourceUsageAccountant());
MultiLevelPriorityQueue queue = createQueue(configuration, rm);
queue.put(createQueryRequest(GROUP_ONE, METRICS));
GROUP_FACTORY._groupMap.get(GROUP_ONE).addReservedThreads(rm.getTableThreadsHardLimit());
Expand Down Expand Up @@ -129,7 +131,8 @@ public void testTakeWithLimits()

PinotConfiguration configuration = new PinotConfiguration(properties);

PolicyBasedResourceManager rm = new PolicyBasedResourceManager(configuration);
PolicyBasedResourceManager rm =
new PolicyBasedResourceManager(configuration, new Tracing.DefaultThreadResourceUsageAccountant());
MultiLevelPriorityQueue queue = createQueue(configuration, rm);

queue.put(createQueryRequest(GROUP_ONE, METRICS));
Expand Down Expand Up @@ -214,7 +217,7 @@ public void testNoPendingAfterTrim()

private MultiLevelPriorityQueue createQueue() {
PinotConfiguration conf = new PinotConfiguration();
return createQueue(conf, new UnboundedResourceManager(conf));
return createQueue(conf, new UnboundedResourceManager(conf, new Tracing.DefaultThreadResourceUsageAccountant()));
}

private MultiLevelPriorityQueue createQueue(PinotConfiguration config, ResourceManager rm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.trace.Tracing;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -241,7 +242,7 @@ public TestPriorityScheduler(PinotConfiguration config, ResourceManager resource
}

public static TestPriorityScheduler create(PinotConfiguration config) {
ResourceManager rm = new PolicyBasedResourceManager(config);
ResourceManager rm = new PolicyBasedResourceManager(config, new Tracing.DefaultThreadResourceUsageAccountant());
QueryExecutor qe = new TestQueryExecutor();
_groupFactory = new TestSchedulerGroupFactory();
MultiLevelPriorityQueue queue =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.pinot.core.query.scheduler.fcfs.FCFSQueryScheduler;
import org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
import org.apache.pinot.core.query.scheduler.tokenbucket.TokenPriorityScheduler;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.Tracing;
import org.testng.annotations.Test;

import static org.mockito.Mockito.mock;
Expand All @@ -43,33 +45,37 @@ public void testQuerySchedulerFactory() {
LongAccumulator latestQueryTime = mock(LongAccumulator.class);

PinotConfiguration config = new PinotConfiguration();
ThreadResourceUsageAccountant accountant = new Tracing.DefaultThreadResourceUsageAccountant();
config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, QuerySchedulerFactory.FCFS_ALGORITHM);
QueryScheduler queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime);
QueryScheduler queryScheduler =
QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime, accountant);
assertTrue(queryScheduler instanceof FCFSQueryScheduler);

config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, QuerySchedulerFactory.TOKEN_BUCKET_ALGORITHM);
queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime);
queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime, accountant);
assertTrue(queryScheduler instanceof TokenPriorityScheduler);

config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, QuerySchedulerFactory.BOUNDED_FCFS_ALGORITHM);
queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime);
queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime, accountant);
assertTrue(queryScheduler instanceof BoundedFCFSScheduler);

config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY,
QuerySchedulerFactory.BINARY_WORKLOAD_ALGORITHM);
queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime);
queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime, accountant);
assertTrue(queryScheduler instanceof BinaryWorkloadScheduler);

config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, TestQueryScheduler.class.getName());
queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime);
queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime, accountant);
assertTrue(queryScheduler instanceof TestQueryScheduler);
}

public static final class TestQueryScheduler extends QueryScheduler {

public TestQueryScheduler(PinotConfiguration config, QueryExecutor queryExecutor, ServerMetrics serverMetrics,
LongAccumulator latestQueryTime) {
super(config, queryExecutor, new UnboundedResourceManager(config), serverMetrics, latestQueryTime);
super(config, queryExecutor,
new UnboundedResourceManager(config, new Tracing.DefaultThreadResourceUsageAccountant()), serverMetrics,
latestQueryTime);
}

@Override
Expand Down
Loading
Loading