Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.task.JobConfig;
Expand Down Expand Up @@ -106,12 +107,10 @@ public PinotHelixTaskResourceManager(PinotHelixResourceManager helixResourceMana

/**
* Get all task types.
* @note: It reads all resource config back and check which are workflows and which are jobs, so it can take some time
* if there are a lot of tasks.
* @return Set of all task types
*/
public synchronized Set<String> getTaskTypes() {
Set<String> helixJobQueues = _taskDriver.getWorkflows().keySet();
List<String> helixJobQueues = getTaskQueueNames();
Set<String> taskTypes = new HashSet<>(helixJobQueues.size());
for (String helixJobQueue : helixJobQueues) {
taskTypes.add(getTaskType(helixJobQueue));
Expand Down Expand Up @@ -217,7 +216,7 @@ public synchronized void deleteTask(String taskName, boolean forceDelete) {
* @return Set of task queue names
*/
public synchronized Set<String> getTaskQueues() {
return _taskDriver.getWorkflows().keySet();
return new HashSet<>(getTaskQueueNames());
}

/**
Expand Down Expand Up @@ -1263,6 +1262,20 @@ private synchronized TaskDebugInfo getTaskDebugInfo(WorkflowContext workflowCont
return taskDebugInfo;
}

/**
* Returns the names of all task queues by fetching only the resource config child names from ZooKeeper,
* filtered to those with the {@link #TASK_QUEUE_PREFIX} but excluding individual job entries (which
* contain {@link #TASK_PREFIX} e.g. {@code TaskQueue_Type_Task_Type_12345}). This avoids the expensive
* {@link TaskDriver#getWorkflows()} call which reads all resource config values.
*/
private List<String> getTaskQueueNames() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have this method return a set ? this will avoid changing all the clients.

HelixDataAccessor accessor = _helixResourceManager.getHelixZkManager().getHelixDataAccessor();
List<String> resourceConfigNames = accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
return resourceConfigNames.stream()
.filter(name -> name.startsWith(TASK_QUEUE_PREFIX) && !name.contains(TASK_NAME_SEPARATOR + TASK_PREFIX))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a hack. I am not sure if this is going to be easy to maintain & verify.

.collect(Collectors.toList());
}

/**
* Helper method to convert task type to Helix JobQueue name.
* <p>E.g. DummyTask -> TaskQueue_DummyTask
Expand Down Expand Up @@ -1431,7 +1444,7 @@ public synchronized Map<String, Integer> getRunningTaskCountsPerMinion() {
Map<String, Integer> runningTaskCounts = new HashMap<>();

// Get all workflows (task queues)
Set<String> workflows = _taskDriver.getWorkflows().keySet();
List<String> workflows = getTaskQueueNames();

for (String workflow : workflows) {
WorkflowContext workflowContext = _taskDriver.getWorkflowContext(workflow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.pinot.controller.api.resources.MinionStatusResponse;
import org.apache.pinot.controller.helix.ControllerTest;
Expand All @@ -54,6 +55,7 @@
*/
public class PinotHelixResourceManagerMinionStatusTest extends ControllerTest {

private static final String TEST_TASK_QUEUE = "TaskQueue_TestType";
private PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;

@BeforeClass
Expand All @@ -80,24 +82,44 @@ public void cleanupAfterTest() {
} catch (Exception e) {
// Ignore errors getting all instances
}
// Clean up the test task queue resource config if it was created
try {
removeTaskQueueResourceConfig();
} catch (Exception e) {
// Ignore if not present
}
}

/**
* Creates a resource config ZNode in ZK so that getChildNames(resourceConfigs()) returns the task queue name.
*/
private void createTaskQueueResourceConfig() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not fully clear why this test itself needed to be modified. The change in the main class is an internal change. Why should it have the tests affected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah ok got it. its because TaskDriver is mocked earlier. but now we need Znodes

HelixDataAccessor accessor = _helixResourceManager.getHelixZkManager().getHelixDataAccessor();
accessor.setProperty(accessor.keyBuilder().resourceConfig(TEST_TASK_QUEUE),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should set more properties here, similar to how the actual Zookeeper data will be. That will just help in increasing the comprehensiveness of the test.

new ResourceConfig(TEST_TASK_QUEUE));
}

/**
* Removes the resource config ZNode created for the test task queue.
*/
private void removeTaskQueueResourceConfig() {
HelixDataAccessor accessor = _helixResourceManager.getHelixZkManager().getHelixDataAccessor();
accessor.removeProperty(accessor.keyBuilder().resourceConfig(TEST_TASK_QUEUE));
}

/**
* Helper method to create a mock TaskDriver with workflows and jobs containing tasks assigned to minions
* Helper method to create a mock TaskDriver with workflows and jobs containing tasks assigned to minions.
* Also creates the task queue resource config in ZK so that it is discoverable via getChildNames.
* @param taskAssignments Map of minion instance ID to number of RUNNING tasks
* @return Mocked TaskDriver
*/
private TaskDriver createMockTaskDriverWithRunningTasks(Map<String, Integer> taskAssignments) {
createTaskQueueResourceConfig();
TaskDriver mockTaskDriver = mock(TaskDriver.class);

// Create a mock workflows - getWorkflows() returns a Map<String, WorkflowConfig>
Map<String, WorkflowConfig> workflows = new HashMap<>();
workflows.put("TestWorkflow1", mock(WorkflowConfig.class));
when(mockTaskDriver.getWorkflows()).thenReturn(workflows);

// Create workflow context with jobs
// Create workflow context with jobs for the task queue
WorkflowContext workflowContext = mock(WorkflowContext.class);
when(mockTaskDriver.getWorkflowContext("TestWorkflow1")).thenReturn(workflowContext);
when(mockTaskDriver.getWorkflowContext(TEST_TASK_QUEUE)).thenReturn(workflowContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the name getting changed from TestWorkflow1 to TaskQueue_TestType ?


Map<String, TaskState> jobStates = new HashMap<>();
jobStates.put("TestJob1", TaskState.IN_PROGRESS);
Expand Down Expand Up @@ -181,12 +203,6 @@ public void testGetMinionStatusWithOnlineMinions() {
assertEquals(status.getRunningTaskCount(), 0);
}
}

// Cleanup
for (int i = 0; i < 3; i++) {
String minionInstanceId = "Minion_minion-test-" + i + ".example.com_" + (9514);
_helixResourceManager.dropInstance(minionInstanceId);
}
}

@Test
Expand Down Expand Up @@ -614,10 +630,11 @@ public void testGetMinionStatusRunningTaskCountException() {
Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 0, 0, false);
_helixResourceManager.addInstance(minion, false);

// Create a mock TaskDriver that throws an exception when accessing workflows
// Create a task queue resource config and a mock TaskDriver that throws when getting workflow context
createTaskQueueResourceConfig();
TaskDriver mockTaskDriver = mock(TaskDriver.class);
Mockito.doThrow(new RuntimeException("Simulated task driver failure"))
.when(mockTaskDriver).getWorkflows();
.when(mockTaskDriver).getWorkflowContext(TEST_TASK_QUEUE);

PinotHelixTaskResourceManager taskResourceManager = new PinotHelixTaskResourceManager(
_helixResourceManager, mockTaskDriver);
Expand All @@ -628,9 +645,6 @@ public void testGetMinionStatusRunningTaskCountException() {
assertNotNull(response);
assertEquals(response.getCurrentMinionCount(), 1);
assertEquals(response.getMinionStatus().get(0).getRunningTaskCount(), 0);

// Cleanup
_helixResourceManager.dropInstance(minionId);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1285,7 +1285,7 @@ public void testGetTasksSummaryWithEmptyTaskTypes() {
PinotHelixTaskResourceManager mgr = new PinotHelixTaskResourceManager(helixResourceManager, taskDriver);

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.emptySet());
Mockito.doReturn(Collections.emptySet()).when(spyMgr).getTaskTypes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we have to change this? Is it just a syntactical thing, or is there something which was wrong with the older code?


PinotHelixTaskResourceManager.TaskSummaryResponse response = spyMgr.getTasksSummary(null);

Expand All @@ -1302,7 +1302,7 @@ public void testGetTasksSummaryWithNullTaskTypes() {
PinotHelixTaskResourceManager mgr = new PinotHelixTaskResourceManager(helixResourceManager, taskDriver);

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(null);
Mockito.doReturn(null).when(spyMgr).getTaskTypes();

PinotHelixTaskResourceManager.TaskSummaryResponse response = spyMgr.getTasksSummary(null);

Expand All @@ -1322,7 +1322,7 @@ public void testGetTasksSummaryWithNoActiveTasks() {
String taskName = "Task_TestTask_12345";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Task with no running/waiting tasks (all completed)
PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount();
Expand Down Expand Up @@ -1352,7 +1352,7 @@ public void testGetTasksSummaryWithSingleTenant() {
String tenant = "defaultTenant";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Task with running and waiting tasks
PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount();
Expand Down Expand Up @@ -1410,7 +1410,7 @@ public void testGetTasksSummaryWithMultipleTenants() {
String tenant2 = "tenant2";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Task 1: tenant1 with running tasks
PinotHelixTaskResourceManager.TaskCount taskCount1 = new PinotHelixTaskResourceManager.TaskCount();
Expand Down Expand Up @@ -1486,7 +1486,7 @@ public void testGetTasksSummaryWithTenantFilter() {
String tenant2 = "tenant2";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Both tasks have running tasks
PinotHelixTaskResourceManager.TaskCount taskCount1 = new PinotHelixTaskResourceManager.TaskCount();
Expand Down Expand Up @@ -1546,7 +1546,7 @@ public void testGetTasksSummaryWithUnknownTableName() {
String taskName = "Task_TestTask_12345";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Task with running tasks
PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount();
Expand Down Expand Up @@ -1579,7 +1579,7 @@ public void testGetTasksSummaryWithEmptySubtaskConfigs() {
String taskName = "Task_TestTask_12345";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Task with running tasks
PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount();
Expand Down Expand Up @@ -1616,7 +1616,7 @@ public void testGetTasksSummaryWithMultipleTaskTypes() {
Set<String> taskTypes = new HashSet<>();
taskTypes.add(taskType1);
taskTypes.add(taskType2);
when(spyMgr.getTaskTypes()).thenReturn(taskTypes);
Mockito.doReturn(taskTypes).when(spyMgr).getTaskTypes();

// Task 1: TaskType1 with running tasks
PinotHelixTaskResourceManager.TaskCount taskCount1 = new PinotHelixTaskResourceManager.TaskCount();
Expand Down Expand Up @@ -1682,7 +1682,7 @@ public void testGetTasksSummaryWithNullTableName() {
String taskName = "Task_TestTask_12345";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Task with running tasks
PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount();
Expand Down Expand Up @@ -1716,7 +1716,7 @@ public void testGetTasksSummaryWithTenantException() {
String tableName = "testTable_OFFLINE";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Task with running tasks
PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount();
Expand Down Expand Up @@ -1759,7 +1759,7 @@ public void testGetTasksSummaryWithNullTableConfig() {
String tableName = "testTable_OFFLINE";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Task with running tasks
PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount();
Expand Down Expand Up @@ -1802,7 +1802,7 @@ public void testGetTasksSummaryWithNullTenantConfig() {
String tableName = "testTable_OFFLINE";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Task with running tasks
PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount();
Expand Down Expand Up @@ -1847,7 +1847,7 @@ public void testGetTasksSummaryWithNullServerTenant() {
String tableName = "testTable_OFFLINE";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Task with running tasks
PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount();
Expand Down Expand Up @@ -1892,7 +1892,7 @@ public void testGetTasksSummaryWithEmptyTaskCounts() {
String taskType = "TestTask";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Empty task counts
when(spyMgr.getTaskCounts(taskType)).thenReturn(Collections.emptyMap());
Expand All @@ -1914,7 +1914,7 @@ public void testGetTasksSummaryWithNullTaskCounts() {
String taskType = "TestTask";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Null task counts
when(spyMgr.getTaskCounts(taskType)).thenReturn(null);
Expand All @@ -1937,7 +1937,7 @@ public void testGetTasksSummaryWithNullConfigs() {
String taskName = "Task_TestTask_12345";

PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
Mockito.doReturn(Collections.singleton(taskType)).when(spyMgr).getTaskTypes();

// Task with running tasks
PinotHelixTaskResourceManager.TaskCount taskCount = new PinotHelixTaskResourceManager.TaskCount();
Expand Down
Loading