Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.segment.local.function.GroovyFunctionEvaluator;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.cursors.ResponseStoreService;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -153,6 +154,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
protected AbstractResponseStore _responseStore;
protected BrokerGrpcServer _brokerGrpcServer;
protected FailureDetector _failureDetector;
protected ThreadResourceUsageAccountant _resourceUsageAccountant;

@Override
public void init(PinotConfiguration brokerConf)
Expand Down Expand Up @@ -415,9 +417,10 @@ public void start()
ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
_brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
CommonConstants.Broker.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
Tracing.ThreadAccountantOps.initializeThreadAccountant(
_resourceUsageAccountant = Tracing.ThreadAccountantOps.createThreadAccountant(
_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId,
org.apache.pinot.spi.config.instance.InstanceType.BROKER);
Preconditions.checkNotNull(_resourceUsageAccountant);
Tracing.ThreadAccountantOps.startThreadAccountant();

String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,4 +473,25 @@ protected static String getServerTenantRequestPayload(String tenantName, int num
return new Tenant(TenantRole.SERVER, tenantName, numOfflineServers + numRealtimeServers, numOfflineServers,
numRealtimeServers).toJsonString();
}

public void updateClusterConfig(Map<String, String> newConfigs)
throws IOException {
try {
HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(
new URI(_controllerRequestURLBuilder.forClusterConfigUpdate()),
JsonUtils.objectToString(newConfigs), _headers));
} catch (HttpErrorStatusException | URISyntaxException e) {
throw new IOException(e);
}
}

public void deleteClusterConfig(String config)
throws IOException {
try {
HttpClient.wrapAndThrowHttpException(_httpClient.sendDeleteRequest(
new URI(_controllerRequestURLBuilder.forClusterConfigDelete(config)), _headers));
} catch (HttpErrorStatusException | URISyntaxException e) {
throw new IOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,16 @@ public void runPeriodicTask(String taskName, String tableName, TableType tableTy
sendGetRequest(getControllerRequestURLBuilder().forPeriodTaskRun(taskName, tableName, tableType));
}

public void updateClusterConfig(Map<String, String> clusterConfig)
throws IOException {
getControllerRequestClient().updateClusterConfig(clusterConfig);
}

public void deleteClusterConfig(String clusterConfig)
throws IOException {
getControllerRequestClient().deleteClusterConfig(clusterConfig);
}

/**
* Trigger a task on a table and wait for completion
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void setToIdle() {
_currentThreadCPUTimeSampleMS = 0;
// clear memory usage
_currentThreadMemoryAllocationSampleBytes = 0;
_errorStatus.set(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static class PerQueryCPUMemResourceUsageAccountant implements ThreadResou
*/
private static final String ACCOUNTANT_TASK_NAME = "CPUMemThreadAccountant";
private static final int ACCOUNTANT_PRIORITY = 4;
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1, r -> {
private final ExecutorService _executorService = Executors.newFixedThreadPool(1, r -> {
Thread thread = new Thread(r);
thread.setPriority(ACCOUNTANT_PRIORITY);
thread.setDaemon(true);
Expand Down Expand Up @@ -213,6 +213,10 @@ protected WatcherTask createWatcherTask() {
return new WatcherTask();
}

public QueryMonitorConfig getQueryMonitorConfig() {
return _watcherTask.getQueryMonitorConfig();
}

@Override
public Collection<? extends ThreadResourceTracker> getThreadResources() {
return _threadEntriesMap.values();
Expand Down Expand Up @@ -431,7 +435,12 @@ public WatcherTask getWatcherTask() {

@Override
public void startWatcherTask() {
EXECUTOR_SERVICE.submit(_watcherTask);
_executorService.submit(_watcherTask);
}

@Override
public void stopWatcherTask() {
_executorService.shutdownNow();
}

@Override
Expand Down Expand Up @@ -756,7 +765,7 @@ private void logQueryMonitorConfig() {

@Override
public void run() {
while (true) {
while (!Thread.currentThread().isInterrupted()) {
Copy link

Copilot AI Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The while loop should also check the interrupted flag at the beginning of each iteration to ensure prompt response to interruption. Consider adding Thread.interrupted() check after the sleep/wait operations.

Copilot uses AI. Check for mistakes.
try {
runOnce();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static class ResourceUsageAccountant implements ThreadResourceUsageAccoun
private static final String ACCOUNTANT_TASK_NAME = "ResourceUsageAccountant";
private static final int ACCOUNTANT_PRIORITY = 4;

private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1, r -> {
private final ExecutorService _executorService = Executors.newFixedThreadPool(1, r -> {
Thread thread = new Thread(r);
thread.setPriority(ACCOUNTANT_PRIORITY);
thread.setDaemon(true);
Expand Down Expand Up @@ -286,7 +286,12 @@ public void clear() {

@Override
public void startWatcherTask() {
EXECUTOR_SERVICE.submit(_watcherTask);
_executorService.submit(_watcherTask);
}

@Override
public void stopWatcherTask() {
_executorService.shutdownNow();
}

@Override
Expand Down Expand Up @@ -315,7 +320,7 @@ class WatcherTask implements Runnable {
@Override
public void run() {
LOGGER.debug("Running timed task for {}", this.getClass().getName());
while (true) {
while (!Thread.currentThread().isInterrupted()) {
Copy link

Copilot AI Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The while loop should also check the interrupted flag at the beginning of each iteration to ensure prompt response to interruption. Consider adding Thread.interrupted() check after the sleep/wait operations.

Copilot uses AI. Check for mistakes.
try {
// Preaggregation.
runPreAggregation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
Expand Down Expand Up @@ -99,14 +100,15 @@ public void testCPUtimeProvider()
"org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, false);
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, true);
ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
Future[] futures = new Future[2000];
AtomicInteger atomicInteger = new AtomicInteger();
PinotConfiguration pinotCfg = new PinotConfiguration(configs);
Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg, "testCPUtimeProvider",
InstanceType.SERVER);
ThreadResourceUsageAccountant accountant = Tracing.ThreadAccountantOps.createThreadAccountant(pinotCfg,
"testCPUtimeProvider", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();

ResourceManager rm = getResourceManager(20, 40, 1, 1, configs, accountant);
Future[] futures = new Future[2000];
AtomicInteger atomicInteger = new AtomicInteger();

for (int k = 0; k < 30; k++) {
int finalK = k;
rm.getQueryRunners().submit(() -> {
Expand Down Expand Up @@ -164,12 +166,13 @@ public void testThreadMemoryAccounting()
"org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, true);
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, false);
ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
PinotConfiguration pinotCfg = new PinotConfiguration(configs);
Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg, "testCPUtimeProvider",
InstanceType.SERVER);
ThreadResourceUsageAccountant accountant = Tracing.ThreadAccountantOps.createThreadAccountant(pinotCfg,
"testCPUtimeProvider", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();

ResourceManager rm = getResourceManager(20, 40, 1, 1, configs, accountant);

for (int k = 0; k < 30; k++) {
int finalK = k;
rm.getQueryRunners().submit(() -> {
Expand Down Expand Up @@ -241,13 +244,13 @@ public void testWorkloadLevelThreadMemoryAccounting()

String workloadName = CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME;
PinotConfiguration pinotCfg = new PinotConfiguration(configs);
Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg, "testWorkloadMemoryAccounting",
InstanceType.SERVER);
ThreadResourceUsageAccountant accountant = Tracing.ThreadAccountantOps.createThreadAccountant(pinotCfg,
"testWorkloadMemoryAccounting", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();
WorkloadBudgetManager workloadBudgetManager =
Tracing.ThreadAccountantOps.getWorkloadBudgetManager();
workloadBudgetManager.addOrUpdateWorkload(workloadName, 88_000_000, 27_000_000);
ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
ResourceManager rm = getResourceManager(20, 40, 1, 1, configs, accountant);

for (int k = 0; k < 30; k++) {
int finalK = k;
Expand Down Expand Up @@ -301,7 +304,8 @@ public void testWorkloadLevelThreadMemoryAccounting()
@Test
public void testWorkerThreadInterruption()
throws Exception {
ResourceManager rm = getResourceManager(2, 5, 1, 3, Collections.emptyMap());
ResourceManager rm = getResourceManager(2, 5, 1, 3, Collections.emptyMap(),
new Tracing.DefaultThreadResourceUsageAccountant());
AtomicReference<Future>[] futures = new AtomicReference[5];
for (int i = 0; i < 5; i++) {
futures[i] = new AtomicReference<>();
Expand Down Expand Up @@ -379,10 +383,12 @@ public void testGetDataTableOOMSelect(String accountantFactoryClass)
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, false);
configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, true);
PinotConfiguration config = getConfig(20, 2, configs);
ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
// init accountant and start watcher task
Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testSelect", InstanceType.SERVER);
Tracing.unregisterThreadAccountant();
ThreadResourceUsageAccountant accountant = Tracing.ThreadAccountantOps.createThreadAccountant(config,
"testSelect", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();
ResourceManager rm = getResourceManager(20, 2, 1, 1, configs, accountant);

CountDownLatch latch = new CountDownLatch(100);
AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);
Expand All @@ -404,7 +410,7 @@ public void testGetDataTableOOMSelect(String accountantFactoryClass)
}
});
}
latch.await();
latch.await(1, java.util.concurrent.TimeUnit.MINUTES);
// assert that EarlyTerminationException was thrown in at least one runner thread
Assert.assertTrue(earlyTerminationOccurred.get());
}
Expand Down Expand Up @@ -450,11 +456,14 @@ public void testGetDataTableOOMGroupBy(String accountantFactoryClass)
configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, true);
PinotConfiguration config = getConfig(20, 2, configs);

ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
// init accountant and start watcher task
Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testGroupBy", InstanceType.SERVER);
Tracing.unregisterThreadAccountant();
ThreadResourceUsageAccountant accountant = Tracing.ThreadAccountantOps.createThreadAccountant(config,
"testGroupBy", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();

ResourceManager rm = getResourceManager(20, 2, 1, 1, configs, accountant);

CountDownLatch latch = new CountDownLatch(100);
AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);

Expand All @@ -475,7 +484,7 @@ public void testGetDataTableOOMGroupBy(String accountantFactoryClass)
}
});
}
latch.await();
latch.await(1, java.util.concurrent.TimeUnit.MINUTES);
// assert that EarlyTerminationException was thrown in at least one runner thread
Assert.assertTrue(earlyTerminationOccurred.get());
}
Expand Down Expand Up @@ -507,12 +516,14 @@ public void testJsonIndexExtractMapOOM(String accountantFactoryClass)
configs.put(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO, 0.00f);

PinotConfiguration config = getConfig(2, 2, configs);
ResourceManager rm = getResourceManager(2, 2, 1, 1, configs);
// init accountant and start watcher task
Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testJsonIndexExtractMapOOM",
InstanceType.SERVER);
Tracing.unregisterThreadAccountant();
ThreadResourceUsageAccountant accountant = Tracing.ThreadAccountantOps.createThreadAccountant(config,
"testJsonIndexExtractMapOOM", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();

ResourceManager rm = getResourceManager(2, 2, 1, 1, configs, accountant);

Supplier<String> randomJsonValue = () -> {
Random random = new Random();
int length = random.nextInt(1000);
Expand Down Expand Up @@ -575,7 +586,7 @@ public void testJsonIndexExtractMapOOM(String accountantFactoryClass)
}
});

latch.await();
latch.await(1, java.util.concurrent.TimeUnit.MINUTES);
Assert.assertTrue(mutableEarlyTerminationOccurred.get(),
"Expected early termination reading the mutable index");
Assert.assertTrue(immutableEarlyTerminationOccurred.get(),
Expand All @@ -602,7 +613,7 @@ public void testThreadMemory()
"org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, true);
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, false);
ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
ResourceManager rm = getResourceManager(20, 40, 1, 1, configs, new Tracing.DefaultThreadResourceUsageAccountant());
Future[] futures = new Future[30];

for (int k = 0; k < 4; k++) {
Expand Down Expand Up @@ -651,9 +662,9 @@ public void testThreadMemory()
}

private ResourceManager getResourceManager(int runners, int workers, final int softLimit, final int hardLimit,
Map<String, Object> map) {
Map<String, Object> map, ThreadResourceUsageAccountant accountant) {

return new ResourceManager(getConfig(runners, workers, map), new Tracing.DefaultThreadResourceUsageAccountant()) {
return new ResourceManager(getConfig(runners, workers, map), accountant) {

@Override
public QueryExecutorService getExecutorService(ServerQueryRequest query, SchedulerGroupAccountant accountant) {
Expand Down
Loading
Loading