Replace expensive TaskDriver.getWorkflows() with lightweight getChildNames()#17779
Replace expensive TaskDriver.getWorkflows() with lightweight getChildNames()#17779KKcorps wants to merge 3 commits intoapache:masterfrom
Conversation
…Names() TaskDriver.getWorkflows() internally calls getChildValuesMap() which downloads the full content of every resource config ZNode from ZooKeeper. All callers in PinotHelixTaskResourceManager only need the workflow names (keys), not the actual WorkflowConfig values. This replaces those calls with HelixDataAccessor.getChildNames(resourceConfigs()) which fetches only ZNode names, filtered to the TaskQueue_ prefix. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
...c/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #17779 +/- ##
============================================
- Coverage 63.24% 63.20% -0.04%
Complexity 1454 1454
============================================
Files 3176 3182 +6
Lines 190984 191367 +383
Branches 29202 29268 +66
============================================
+ Hits 120780 120963 +183
- Misses 60784 60964 +180
- Partials 9420 9440 +20
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Resource configs also contain individual job entries like TaskQueue_Type_Task_Type_12345. Filter these out by excluding names containing _Task_ separator, keeping only the task queue workflow entries. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use doReturn().when(spy) instead of when(spy.method()).thenReturn() to avoid calling the real getTaskTypes() during stub setup, which now accesses HelixDataAccessor via the (unmocked) resource manager. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| * contain {@link #TASK_PREFIX} e.g. {@code TaskQueue_Type_Task_Type_12345}). This avoids the expensive | ||
| * {@link TaskDriver#getWorkflows()} call which reads all resource config values. | ||
| */ | ||
| private List<String> getTaskQueueNames() { |
There was a problem hiding this comment.
Let's have this method return a set ? this will avoid changing all the clients.
| HelixDataAccessor accessor = _helixResourceManager.getHelixZkManager().getHelixDataAccessor(); | ||
| List<String> resourceConfigNames = accessor.getChildNames(accessor.keyBuilder().resourceConfigs()); | ||
| return resourceConfigNames.stream() | ||
| .filter(name -> name.startsWith(TASK_QUEUE_PREFIX) && !name.contains(TASK_NAME_SEPARATOR + TASK_PREFIX)) |
There was a problem hiding this comment.
This seems like a hack. I am not sure if this is going to be easy to maintain & verify.
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.emptySet()); | ||
| Mockito.doReturn(Collections.emptySet()).when(spyMgr).getTaskTypes(); |
There was a problem hiding this comment.
Why did we have to change this? Is it just a syntactical thing, or is there something which was wrong with the older code?
| // Create workflow context with jobs for the task queue | ||
| WorkflowContext workflowContext = mock(WorkflowContext.class); | ||
| when(mockTaskDriver.getWorkflowContext("TestWorkflow1")).thenReturn(workflowContext); | ||
| when(mockTaskDriver.getWorkflowContext(TEST_TASK_QUEUE)).thenReturn(workflowContext); |
There was a problem hiding this comment.
Why is the name getting changed from TestWorkflow1 to TaskQueue_TestType ?
| /** | ||
| * Creates a resource config ZNode in ZK so that getChildNames(resourceConfigs()) returns the task queue name. | ||
| */ | ||
| private void createTaskQueueResourceConfig() { |
There was a problem hiding this comment.
I am not fully clear why this test itself needed to be modified. The change in the main class is an internal change. Why should it have the tests affected?
There was a problem hiding this comment.
aah ok got it. its because TaskDriver is mocked earlier. but now we need Znodes
| */ | ||
| private void createTaskQueueResourceConfig() { | ||
| HelixDataAccessor accessor = _helixResourceManager.getHelixZkManager().getHelixDataAccessor(); | ||
| accessor.setProperty(accessor.keyBuilder().resourceConfig(TEST_TASK_QUEUE), |
There was a problem hiding this comment.
We should set more properties here, similar to how the actual Zookeeper data will be. That will just help in increasing the comprehensiveness of the test.
Summary
TaskDriver.getWorkflows()internally callsgetChildValuesMap()on Helix resource configs, which downloads the full content of every resource config ZNode from ZooKeeper. This is extremely expensive when there are many resources (tables, workflows, jobs) and has been found to cause OOMs on controllerAll three callers in
PinotHelixTaskResourceManageronly needed the workflow names (keys), not the actualWorkflowConfigvalues:getTaskTypes()— uses.keySet()getTaskQueues()— uses.keySet()getRunningTaskCountsPerMinion()— uses.keySet()This PR replaces those calls with
HelixDataAccessor.getChildNames(keyBuilder().resourceConfigs()), which fetches only ZNode names (a lightweight metadata-only ZK call), filtered to names with theTaskQueue_prefix.Changes
getTaskQueueNames()helper inPinotHelixTaskResourceManagerthat usesgetChildNames()+ prefix filter_taskDriver.getWorkflows().keySet()with the new helpergetChildNamesreads from ZK, not the mockedTaskDriver)Test plan
PinotHelixResourceManagerMinionStatusTestpass