Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ public void start()
Tracing.ThreadAccountantOps.initializeThreadAccountant(
_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId,
org.apache.pinot.spi.config.instance.InstanceType.BROKER);
Tracing.ThreadAccountantOps.startThreadAccountant();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason not to move this closer to the bottom as well? I imagine brokers are typically not doing as expensive work as servers, but who knows with large tables with 100k+ segments for example

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my code reading, we cannot assume broker starts taking queries after service status check is done. Controller will start redirecting queries to broker whenever the table's routing is built on the broker. Broker doesn't have the same mechanism as server to prevent queries coming in, so we need to start the thread accountant earlier.
Routing table build is usually cheap and broker startup is usually fast, so I feel this should be okay. We can revisit this if this is causing issues.


String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
if (controllerUrl != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public void testCPUtimeProvider()
PinotConfiguration pinotCfg = new PinotConfiguration(configs);
Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg, "testCPUtimeProvider",
InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();

for (int k = 0; k < 30; k++) {
int finalK = k;
Expand Down Expand Up @@ -167,6 +168,7 @@ public void testThreadMemoryAccounting()
PinotConfiguration pinotCfg = new PinotConfiguration(configs);
Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg, "testCPUtimeProvider",
InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();

for (int k = 0; k < 30; k++) {
int finalK = k;
Expand Down Expand Up @@ -241,6 +243,7 @@ public void testWorkloadLevelThreadMemoryAccounting()
PinotConfiguration pinotCfg = new PinotConfiguration(configs);
Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg, "testWorkloadMemoryAccounting",
InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();
WorkloadBudgetManager workloadBudgetManager =
Tracing.ThreadAccountantOps.getWorkloadBudgetManager();
workloadBudgetManager.addOrUpdateWorkload(workloadName, 88_000_000, 27_000_000);
Expand Down Expand Up @@ -379,6 +382,7 @@ public void testGetDataTableOOMSelect(String accountantFactoryClass)
ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
// init accountant and start watcher task
Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testSelect", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();

CountDownLatch latch = new CountDownLatch(100);
AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);
Expand Down Expand Up @@ -449,6 +453,7 @@ public void testGetDataTableOOMGroupBy(String accountantFactoryClass)
ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
// init accountant and start watcher task
Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testGroupBy", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();

CountDownLatch latch = new CountDownLatch(100);
AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);
Expand Down Expand Up @@ -506,6 +511,7 @@ public void testJsonIndexExtractMapOOM(String accountantFactoryClass)
// init accountant and start watcher task
Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testJsonIndexExtractMapOOM",
InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();

Supplier<String> randomJsonValue = () -> {
Random random = new Random();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public static void setUpClass() {
// init accountant and start watcher task
Tracing.ThreadAccountantOps.initializeThreadAccountant(new PinotConfiguration(configs), "testGroupBy",
InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();

// Setup Thread Context
Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest", ThreadExecutionContext.TaskType.MSE, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public static void setUpClass() {
// init accountant and start watcher task
PinotConfiguration pinotCfg = new PinotConfiguration(configs);
Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg, "testGroupBy", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();

// Setup Thread Context
Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest", ThreadExecutionContext.TaskType.MSE, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.server.starter.ServerInstance;
import org.apache.pinot.server.starter.ServerQueriesDisabledTracker;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
Expand Down Expand Up @@ -663,19 +664,16 @@ public void start()
segmentDownloadThrottler, segmentMultiColTextIndexPreprocessThrottler);
}

// initialize the thread accountant for query killing
// Initialize the thread accountant for query killing
Tracing.ThreadAccountantOps.initializeThreadAccountant(
_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId,
org.apache.pinot.spi.config.instance.InstanceType.SERVER);
if (Tracing.getThreadAccountant().getClusterConfigChangeListener() != null) {
_clusterConfigChangeHandler.registerClusterConfigChangeListener(
Tracing.getThreadAccountant().getClusterConfigChangeListener());
}
ThreadResourceUsageAccountant threadAccountant = Tracing.getThreadAccountant();

SendStatsPredicate sendStatsPredicate = SendStatsPredicate.create(_serverConf, _helixManager);
ServerConf serverConf = new ServerConf(_serverConf);
_serverInstance = new ServerInstance(serverConf, _helixManager, _accessControlFactory, _segmentOperationsThrottler,
sendStatsPredicate, Tracing.getThreadAccountant());
sendStatsPredicate, threadAccountant);
ServerMetrics serverMetrics = _serverInstance.getServerMetrics();

InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
Expand Down Expand Up @@ -778,6 +776,13 @@ public void start()
new ServerRateLimitConfigChangeListener(serverMetrics);
_clusterConfigChangeHandler.registerClusterConfigChangeListener(serverRateLimitConfigChangeListener);

// Start the thread accountant
Tracing.ThreadAccountantOps.startThreadAccountant();
PinotClusterConfigChangeListener threadAccountantListener = threadAccountant.getClusterConfigChangeListener();
if (threadAccountantListener != null) {
_clusterConfigChangeHandler.registerClusterConfigChangeListener(threadAccountantListener);
}

// Start the query server after finishing the service status check. If the query server is started before all the
// segments are loaded, broker might not have finished processing the callback of routing table update, and start
// querying the server pre-maturely.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ public static void initializeThreadAccountant(PinotConfiguration config, String
+ "due to invalid thread accountant factory {} provided, exception:", factoryName, exception);
}
}
}

public static void startThreadAccountant() {
Tracing.getThreadAccountant().startWatcherTask();
}

Expand Down
Loading