-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Replace expensive TaskDriver.getWorkflows() with lightweight getChildNames() #17779
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ | |
| import org.apache.commons.lang3.StringUtils; | ||
| import org.apache.commons.lang3.tuple.Pair; | ||
| import org.apache.hc.client5.http.io.HttpClientConnectionManager; | ||
| import org.apache.helix.HelixDataAccessor; | ||
| import org.apache.helix.model.InstanceConfig; | ||
| import org.apache.helix.store.zk.ZkHelixPropertyStore; | ||
| import org.apache.helix.task.JobConfig; | ||
|
|
@@ -106,12 +107,10 @@ public PinotHelixTaskResourceManager(PinotHelixResourceManager helixResourceMana | |
|
|
||
| /** | ||
| * Get all task types. | ||
| * @note: It reads all resource config back and check which are workflows and which are jobs, so it can take some time | ||
| * if there are a lot of tasks. | ||
| * @return Set of all task types | ||
| */ | ||
| public synchronized Set<String> getTaskTypes() { | ||
| Set<String> helixJobQueues = _taskDriver.getWorkflows().keySet(); | ||
| List<String> helixJobQueues = getTaskQueueNames(); | ||
| Set<String> taskTypes = new HashSet<>(helixJobQueues.size()); | ||
| for (String helixJobQueue : helixJobQueues) { | ||
| taskTypes.add(getTaskType(helixJobQueue)); | ||
|
|
@@ -217,7 +216,7 @@ public synchronized void deleteTask(String taskName, boolean forceDelete) { | |
| * @return Set of task queue names | ||
| */ | ||
| public synchronized Set<String> getTaskQueues() { | ||
| return _taskDriver.getWorkflows().keySet(); | ||
| return new HashSet<>(getTaskQueueNames()); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1263,6 +1262,20 @@ private synchronized TaskDebugInfo getTaskDebugInfo(WorkflowContext workflowCont | |
| return taskDebugInfo; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the names of all task queues by fetching only the resource config child names from ZooKeeper, | ||
| * filtered to those with the {@link #TASK_QUEUE_PREFIX} but excluding individual job entries (which | ||
| * contain {@link #TASK_PREFIX} e.g. {@code TaskQueue_Type_Task_Type_12345}). This avoids the expensive | ||
| * {@link TaskDriver#getWorkflows()} call which reads all resource config values. | ||
| */ | ||
| private List<String> getTaskQueueNames() { | ||
| HelixDataAccessor accessor = _helixResourceManager.getHelixZkManager().getHelixDataAccessor(); | ||
| List<String> resourceConfigNames = accessor.getChildNames(accessor.keyBuilder().resourceConfigs()); | ||
| return resourceConfigNames.stream() | ||
| .filter(name -> name.startsWith(TASK_QUEUE_PREFIX) && !name.contains(TASK_NAME_SEPARATOR + TASK_PREFIX)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like a hack. I am not sure if this is going to be easy to maintain & verify. |
||
| .collect(Collectors.toList()); | ||
KKcorps marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| /** | ||
| * Helper method to convert task type to Helix JobQueue name. | ||
| * <p>E.g. DummyTask -> TaskQueue_DummyTask | ||
|
|
@@ -1431,7 +1444,7 @@ public synchronized Map<String, Integer> getRunningTaskCountsPerMinion() { | |
| Map<String, Integer> runningTaskCounts = new HashMap<>(); | ||
|
|
||
| // Get all workflows (task queues) | ||
| Set<String> workflows = _taskDriver.getWorkflows().keySet(); | ||
| List<String> workflows = getTaskQueueNames(); | ||
|
|
||
| for (String workflow : workflows) { | ||
| WorkflowContext workflowContext = _taskDriver.getWorkflowContext(workflow); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,11 +24,12 @@ | |
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.helix.HelixDataAccessor; | ||
| import org.apache.helix.model.ResourceConfig; | ||
| import org.apache.helix.task.JobContext; | ||
| import org.apache.helix.task.TaskDriver; | ||
| import org.apache.helix.task.TaskPartitionState; | ||
| import org.apache.helix.task.TaskState; | ||
| import org.apache.helix.task.WorkflowConfig; | ||
| import org.apache.helix.task.WorkflowContext; | ||
| import org.apache.pinot.controller.api.resources.MinionStatusResponse; | ||
| import org.apache.pinot.controller.helix.ControllerTest; | ||
|
|
@@ -54,6 +55,7 @@ | |
| */ | ||
| public class PinotHelixResourceManagerMinionStatusTest extends ControllerTest { | ||
|
|
||
| private static final String TEST_TASK_QUEUE = "TaskQueue_TestType"; | ||
| private PinotHelixTaskResourceManager _pinotHelixTaskResourceManager; | ||
|
|
||
| @BeforeClass | ||
|
|
@@ -80,24 +82,44 @@ public void cleanupAfterTest() { | |
| } catch (Exception e) { | ||
| // Ignore errors getting all instances | ||
| } | ||
| // Clean up the test task queue resource config if it was created | ||
| try { | ||
| removeTaskQueueResourceConfig(); | ||
| } catch (Exception e) { | ||
| // Ignore if not present | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Creates a resource config ZNode in ZK so that getChildNames(resourceConfigs()) returns the task queue name. | ||
| */ | ||
| private void createTaskQueueResourceConfig() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not fully clear why this test itself needed to be modified. The change in the main class is an internal change. Why should it have the tests affected?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aah ok got it. its because TaskDriver is mocked earlier. but now we need Znodes |
||
| HelixDataAccessor accessor = _helixResourceManager.getHelixZkManager().getHelixDataAccessor(); | ||
| accessor.setProperty(accessor.keyBuilder().resourceConfig(TEST_TASK_QUEUE), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should set more properties here, similar to how the actual Zookeeper data will be. That will just help in increasing the comprehensiveness of the test. |
||
| new ResourceConfig(TEST_TASK_QUEUE)); | ||
| } | ||
|
|
||
| /** | ||
| * Removes the resource config ZNode created for the test task queue. | ||
| */ | ||
| private void removeTaskQueueResourceConfig() { | ||
| HelixDataAccessor accessor = _helixResourceManager.getHelixZkManager().getHelixDataAccessor(); | ||
| accessor.removeProperty(accessor.keyBuilder().resourceConfig(TEST_TASK_QUEUE)); | ||
| } | ||
|
|
||
| /** | ||
| * Helper method to create a mock TaskDriver with workflows and jobs containing tasks assigned to minions | ||
| * Helper method to create a mock TaskDriver with workflows and jobs containing tasks assigned to minions. | ||
| * Also creates the task queue resource config in ZK so that it is discoverable via getChildNames. | ||
| * @param taskAssignments Map of minion instance ID to number of RUNNING tasks | ||
| * @return Mocked TaskDriver | ||
| */ | ||
| private TaskDriver createMockTaskDriverWithRunningTasks(Map<String, Integer> taskAssignments) { | ||
| createTaskQueueResourceConfig(); | ||
| TaskDriver mockTaskDriver = mock(TaskDriver.class); | ||
|
|
||
| // Create a mock workflows - getWorkflows() returns a Map<String, WorkflowConfig> | ||
| Map<String, WorkflowConfig> workflows = new HashMap<>(); | ||
| workflows.put("TestWorkflow1", mock(WorkflowConfig.class)); | ||
| when(mockTaskDriver.getWorkflows()).thenReturn(workflows); | ||
|
|
||
| // Create workflow context with jobs | ||
| // Create workflow context with jobs for the task queue | ||
| WorkflowContext workflowContext = mock(WorkflowContext.class); | ||
| when(mockTaskDriver.getWorkflowContext("TestWorkflow1")).thenReturn(workflowContext); | ||
| when(mockTaskDriver.getWorkflowContext(TEST_TASK_QUEUE)).thenReturn(workflowContext); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is the name getting changed from TestWorkflow1 to TaskQueue_TestType ? |
||
|
|
||
| Map<String, TaskState> jobStates = new HashMap<>(); | ||
| jobStates.put("TestJob1", TaskState.IN_PROGRESS); | ||
|
|
@@ -181,12 +203,6 @@ public void testGetMinionStatusWithOnlineMinions() { | |
| assertEquals(status.getRunningTaskCount(), 0); | ||
| } | ||
| } | ||
|
|
||
| // Cleanup | ||
| for (int i = 0; i < 3; i++) { | ||
| String minionInstanceId = "Minion_minion-test-" + i + ".example.com_" + (9514); | ||
| _helixResourceManager.dropInstance(minionInstanceId); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
|
|
@@ -614,10 +630,11 @@ public void testGetMinionStatusRunningTaskCountException() { | |
| Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 0, 0, false); | ||
| _helixResourceManager.addInstance(minion, false); | ||
|
|
||
| // Create a mock TaskDriver that throws an exception when accessing workflows | ||
| // Create a task queue resource config and a mock TaskDriver that throws when getting workflow context | ||
| createTaskQueueResourceConfig(); | ||
| TaskDriver mockTaskDriver = mock(TaskDriver.class); | ||
| Mockito.doThrow(new RuntimeException("Simulated task driver failure")) | ||
| .when(mockTaskDriver).getWorkflows(); | ||
| .when(mockTaskDriver).getWorkflowContext(TEST_TASK_QUEUE); | ||
|
|
||
| PinotHelixTaskResourceManager taskResourceManager = new PinotHelixTaskResourceManager( | ||
| _helixResourceManager, mockTaskDriver); | ||
|
|
@@ -628,9 +645,6 @@ public void testGetMinionStatusRunningTaskCountException() { | |
| assertNotNull(response); | ||
| assertEquals(response.getCurrentMinionCount(), 1); | ||
| assertEquals(response.getMinionStatus().get(0).getRunningTaskCount(), 0); | ||
|
|
||
| // Cleanup | ||
| _helixResourceManager.dropInstance(minionId); | ||
| } | ||
|
|
||
| @Test | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1285,7 +1285,7 @@ public void testGetTasksSummaryWithEmptyTaskTypes() { | |
| PinotHelixTaskResourceManager mgr = new PinotHelixTaskResourceManager(helixResourceManager, taskDriver); | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.emptySet()); | ||
| Mockito.doReturn(Collections.emptySet()).when(spyMgr).getTaskTypes(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did we have to change this? Is it just a syntactical thing, or is there something which was wrong with the older code? |
||
|
|
||
| PinotHelixTaskResourceManager.TaskSummaryResponse response = spyMgr.getTasksSummary(null); | ||
|
|
||
|
|
@@ -1302,7 +1302,7 @@ public void testGetTasksSummaryWithNullTaskTypes() { | |
| PinotHelixTaskResourceManager mgr = new PinotHelixTaskResourceManager(helixResourceManager, taskDriver); | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(null); | ||
| Mockito.doReturn(null).when(spyMgr).getTaskTypes(); | ||
|
|
||
| PinotHelixTaskResourceManager.TaskSummaryResponse response = spyMgr.getTasksSummary(null); | ||
|
|
||
|
|
@@ -1322,7 +1322,7 @@ public void testGetTasksSummaryWithNoActiveTasks() { | |
| String taskName = "Task_TestTask_12345"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Task with no running/waiting tasks (all completed) | ||
| PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount(); | ||
|
|
@@ -1352,7 +1352,7 @@ public void testGetTasksSummaryWithSingleTenant() { | |
| String tenant = "defaultTenant"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Task with running and waiting tasks | ||
| PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount(); | ||
|
|
@@ -1410,7 +1410,7 @@ public void testGetTasksSummaryWithMultipleTenants() { | |
| String tenant2 = "tenant2"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Task 1: tenant1 with running tasks | ||
| PinotHelixTaskResourceManager.TaskCount taskCount1 = new PinotHelixTaskResourceManager.TaskCount(); | ||
|
|
@@ -1486,7 +1486,7 @@ public void testGetTasksSummaryWithTenantFilter() { | |
| String tenant2 = "tenant2"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Both tasks have running tasks | ||
| PinotHelixTaskResourceManager.TaskCount taskCount1 = new PinotHelixTaskResourceManager.TaskCount(); | ||
|
|
@@ -1546,7 +1546,7 @@ public void testGetTasksSummaryWithUnknownTableName() { | |
| String taskName = "Task_TestTask_12345"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Task with running tasks | ||
| PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount(); | ||
|
|
@@ -1579,7 +1579,7 @@ public void testGetTasksSummaryWithEmptySubtaskConfigs() { | |
| String taskName = "Task_TestTask_12345"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Task with running tasks | ||
| PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount(); | ||
|
|
@@ -1616,7 +1616,7 @@ public void testGetTasksSummaryWithMultipleTaskTypes() { | |
| Set<String> taskTypes = new HashSet<>(); | ||
| taskTypes.add(taskType1); | ||
| taskTypes.add(taskType2); | ||
| when(spyMgr.getTaskTypes()).thenReturn(taskTypes); | ||
| Mockito.doReturn(taskTypes).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Task 1: TaskType1 with running tasks | ||
| PinotHelixTaskResourceManager.TaskCount taskCount1 = new PinotHelixTaskResourceManager.TaskCount(); | ||
|
|
@@ -1682,7 +1682,7 @@ public void testGetTasksSummaryWithNullTableName() { | |
| String taskName = "Task_TestTask_12345"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Task with running tasks | ||
| PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount(); | ||
|
|
@@ -1716,7 +1716,7 @@ public void testGetTasksSummaryWithTenantException() { | |
| String tableName = "testTable_OFFLINE"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Task with running tasks | ||
| PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount(); | ||
|
|
@@ -1759,7 +1759,7 @@ public void testGetTasksSummaryWithNullTableConfig() { | |
| String tableName = "testTable_OFFLINE"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Task with running tasks | ||
| PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount(); | ||
|
|
@@ -1802,7 +1802,7 @@ public void testGetTasksSummaryWithNullTenantConfig() { | |
| String tableName = "testTable_OFFLINE"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Task with running tasks | ||
| PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount(); | ||
|
|
@@ -1847,7 +1847,7 @@ public void testGetTasksSummaryWithNullServerTenant() { | |
| String tableName = "testTable_OFFLINE"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Task with running tasks | ||
| PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount(); | ||
|
|
@@ -1892,7 +1892,7 @@ public void testGetTasksSummaryWithEmptyTaskCounts() { | |
| String taskType = "TestTask"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Empty task counts | ||
| when(spyMgr.getTaskCounts(taskType)).thenReturn(Collections.emptyMap()); | ||
|
|
@@ -1914,7 +1914,7 @@ public void testGetTasksSummaryWithNullTaskCounts() { | |
| String taskType = "TestTask"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Null task counts | ||
| when(spyMgr.getTaskCounts(taskType)).thenReturn(null); | ||
|
|
@@ -1937,7 +1937,7 @@ public void testGetTasksSummaryWithNullConfigs() { | |
| String taskName = "Task_TestTask_12345"; | ||
|
|
||
| PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr); | ||
| when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType)); | ||
| Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes(); | ||
|
|
||
| // Task with running tasks | ||
| PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's have this method return a set ? this will avoid changing all the clients.