Skip to content

Commit

Permalink
Added log messages to debug assignPartitions (#951)
Browse files Browse the repository at this point in the history
* Added log messages to debug assignPartitions

* Minor improvements
  • Loading branch information
jzakaryan authored Aug 7, 2023
1 parent da925e6 commit 886c4c6
Showing 1 changed file with 8 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
ClusterThroughputInfo throughputInfo, Map<String, Set<DatastreamTask>> currentAssignment,
List<String> unassignedPartitions, DatastreamGroupPartitionsMetadata partitionMetadata, int maxPartitionsPerTask) {
String datastreamGroupName = partitionMetadata.getDatastreamGroup().getName();
LOG.info("START: assignPartitions for datasteam={}", datastreamGroupName);
Map<String, PartitionThroughputInfo> partitionInfoMap = new HashMap<>(throughputInfo.getPartitionInfoMap());
Set<String> tasksWithChangedPartition = new HashSet<>();

Expand All @@ -103,6 +104,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
validatePartitionCountAndThrow(datastreamGroupName, numTasks, numPartitions, maxPartitionsPerTask);

// sort the current assignment's tasks on total throughput
LOG.info("Extracting throughput info for partitions which have the throughput data (recognized partitions)");
Map<String, Integer> taskThroughputMap = new HashMap<>();
PartitionThroughputInfo defaultPartitionInfo = new PartitionThroughputInfo(_defaultPartitionBytesInKBRate,
_defaultPartitionMsgsInRate, "");
Expand Down Expand Up @@ -137,13 +139,15 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
}
}

LOG.info("Sorting recognized partitions on byte rate");
// sort unassigned partitions with throughput info on throughput
recognizedPartitions.sort((p1, p2) -> {
Integer p1KBRate = partitionInfoMap.get(p1).getBytesInKBRate();
Integer p2KBRate = partitionInfoMap.get(p2).getBytesInKBRate();
return p1KBRate.compareTo(p2KBRate);
});

LOG.info("Building a priority min queue with tasks based on throughput");
// build a priority queue of tasks based on throughput
// only add tasks that can accommodate more partitions in the queue
List<String> tasks = newPartitionAssignmentMap.keySet().stream()
Expand All @@ -152,6 +156,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
PriorityQueue<String> taskQueue = new PriorityQueue<>(Comparator.comparing(taskThroughputMap::get));
taskQueue.addAll(tasks);

LOG.info("Assigning partitions to the tasks from the priority queue");
// assign partitions with throughput info one by one, by putting the heaviest partition in the lightest task
while (recognizedPartitions.size() > 0 && taskQueue.size() > 0) {
String heaviestPartition = recognizedPartitions.remove(recognizedPartitions.size() - 1);
Expand All @@ -168,6 +173,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
}

// assign unrecognized partitions with round-robin
LOG.info("Assigning unrecognized partitions with round-robin");
Map<String, Integer> unrecognizedPartitionCountPerTask = new HashMap<>();
Collections.shuffle(unrecognizedPartitions);
int index = 0;
Expand All @@ -181,6 +187,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
}

// build the new assignment using the new partitions for the affected datastream's tasks
LOG.info("Finishing building new assignment");
Map<String, Set<DatastreamTask>> newAssignments = currentAssignment.entrySet().stream()
.collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().stream()
.map(task -> {
Expand Down Expand Up @@ -208,6 +215,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
LOG.info("Assignment stats for {}. Min partitions across tasks: {}, max partitions across tasks: {}", taskPrefix,
stats.getMin(), stats.getMax());

LOG.info("END: assignPartitions for datastream={}", datastreamGroupName);
return newAssignments;
}

Expand Down

0 comments on commit 886c4c6

Please sign in to comment.