Skip to content

Commit

Permalink
Quick Optimization For Partition Assignors (#956)
Browse files Browse the repository at this point in the history
Co-authored-by: Shrinand Thakkar <sthakkar@sthakkar-mn2.linkedin.biz>
  • Loading branch information
shrinandthakkar and Shrinand Thakkar authored Aug 23, 2023
1 parent c2e117f commit a3d26a8
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
tasks.forEach(task -> {
if (task.getTaskPrefix().equals(datastreamGroupName)) {
Set<String> retainedPartitions = new HashSet<>(task.getPartitionsV2());
retainedPartitions.retainAll(partitionMetadata.getPartitions());
retainedPartitions.retainAll(new HashSet<>(partitionMetadata.getPartitions()));
newPartitionAssignmentMap.put(task.getId(), retainedPartitions);
if (retainedPartitions.size() != task.getPartitionsV2().size()) {
tasksWithChangedPartition.add(task.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(Map<String,
return task;
} else {
Set<String> newPartitions = new HashSet<>(task.getPartitionsV2());
newPartitions.retainAll(datastreamPartitions.getPartitions());
newPartitions.retainAll(new HashSet<>(datastreamPartitions.getPartitions()));

//We need to create new task if the partition is changed
boolean partitionChanged = newPartitions.size() != task.getPartitionsV2().size();
Expand Down Expand Up @@ -332,7 +332,7 @@ public Map<String, Set<DatastreamTask>> movePartitions(Map<String, Set<Datastrea

Set<String> allToReassignPartitions = new HashSet<>();
targetAssignment.values().forEach(allToReassignPartitions::addAll);
allToReassignPartitions.retainAll(partitionsMetadata.getPartitions());
allToReassignPartitions.retainAll(new HashSet<>(partitionsMetadata.getPartitions()));

// construct a map to store the tasks and if it contain the partitions that can be released
// map: <source taskName, partitions that need to be released>
Expand Down

0 comments on commit a3d26a8

Please sign in to comment.