Skip to content

Commit

Permalink
Upgrade Pre Assignment Clean Up To Remove Dependency Tasks Across Hos…
Browse files Browse the repository at this point in the history
…ts (#952)

* Upgrade Pre Assignment Clean Up To Remove Dependency Tasks Across Hosts

* Nit Updates to Comments

---------

Co-authored-by: Shrinand Thakkar <sthakkar@sthakkar-mn2.linkedin.biz>
  • Loading branch information
shrinandthakkar and Shrinand Thakkar authored Aug 29, 2023
1 parent a3d26a8 commit 06fed5d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -490,15 +490,21 @@ public Map<String, List<DatastreamTask>> getTasksToCleanUp(List<DatastreamGroup>
.collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity(),
(existingTask, duplicateTask) -> existingTask));

Set<DatastreamTask> allDependencyTasksInCurrentAssignment = currentAssignment.values()
.stream()
.flatMap(Collection::stream)
.map(task -> ((DatastreamTaskImpl) task).getDependencies())
.flatMap(Collection::stream)
.map(assignmentsMap::get)
.filter(Objects::nonNull)
.collect(Collectors.toSet());

for (String instance : currentAssignment.keySet()) {
// find the dependency tasks which also exist in the assignmentsMap.
List<DatastreamTask> dependencyTasksPerInstance = currentAssignment.get(instance)
.stream()
.filter(t -> datastreamGroupsSet.contains(t.getTaskPrefix()))
.map(task -> ((DatastreamTaskImpl) task).getDependencies())
.flatMap(Collection::stream)
.map(assignmentsMap::get)
.filter(Objects::nonNull)
.filter(allDependencyTasksInCurrentAssignment::contains)
.collect(Collectors.toList());

if (!dependencyTasksPerInstance.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import org.jetbrains.annotations.NotNull;
import org.testng.Assert;
Expand Down Expand Up @@ -186,6 +188,42 @@ public void testAddPartitions() {
});
}

@Test
public void testTaskCleanUpAcrossMultipleInstances() {
StickyPartitionAssignmentStrategy strategy =
createStickyPartitionAssignmentStrategy(3, 90, true, getZkClient(true), _clusterName);
List<DatastreamGroup> datastreams = generateDatastreams("testTaskCleanUpAcrossMultipleInstances", 1, 3);

Map<String, Set<DatastreamTask>> assignment = generateEmptyAssignment(datastreams, 2, 3, true);
assignment.put("instance1", new HashSet<>());

DatastreamGroupPartitionsMetadata partitionsMetadata =
new DatastreamGroupPartitionsMetadata(datastreams.get(0), ImmutableList.of("t-0", "t-1", "t1-0"));

assignment = strategy.assignPartitions(assignment, partitionsMetadata);

DatastreamGroupPartitionsMetadata newPartitionsMetadata = new DatastreamGroupPartitionsMetadata(datastreams.get(0),
ImmutableList.of("t-0", "t-1", "t1-0", "t2-0", "t2-1", "t2-2"));

Map<String, Set<DatastreamTask>> newAssignment = strategy.assignPartitions(assignment, newPartitionsMetadata);

// Adding the dependency task as well in the assignment list to simulate the scenario where
// the dependency task nodes are not deleted and the leader gets interrupted, OOM, hit session expiry or
// in some bad assignment state.
// The next leader should be able to identify and cleanup.
Map<String, Set<DatastreamTask>> finalAssignment = assignment;

// The dependency tasks went to other host as well due to bad assignment, and the next leader should
// handle this as well.
newAssignment.forEach((instance, taskSet1) -> taskSet1.addAll(
finalAssignment.values().stream().flatMap(Collection::stream).collect(Collectors.toList())));

Map<String, List<DatastreamTask>> taskToCleanup = strategy.getTasksToCleanUp(datastreams, newAssignment);
Assert.assertEquals(taskToCleanup.size(), 2);
taskToCleanup.forEach((instance, taskList1) -> Assert.assertEquals(taskList1.size(), 3));
Assert.assertEquals(new HashSet<>(taskToCleanup.get("instance0")), new HashSet<>(assignment.get("instance0")));
}

@Test
public void testCreateAssignmentFailureDueToUnlockedTask() {
Set<Boolean> elasticTaskAssignmentEnabledSet = new HashSet<>(Arrays.asList(true, false));
Expand Down

0 comments on commit 06fed5d

Please sign in to comment.