Skip to content

Commit

Permalink
[hotfix] Fix FlinkKafkaConsumerBaseTest.testClosePartitionDiscovererW…
Browse files Browse the repository at this point in the history
…ithCancellation

Make a copy of AbstractPartitionDiscoverer#getAllTopics before modifying it.
  • Loading branch information
AHeise committed Sep 23, 2024
1 parent 2ee9b9a commit 9b97c51
Showing 1 changed file with 10 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import org.apache.flink.annotation.Internal;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -130,21 +130,18 @@ public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, Cl
// topics or a topic pattern
if (topicsDescriptor.isFixedTopics()) {
newDiscoveredPartitions =
getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
new ArrayList<>(
getAllPartitionsForTopics(topicsDescriptor.getFixedTopics()));
} else {
List<String> matchedTopics = getAllTopics();
List<String> matchedTopics = new ArrayList<>(getAllTopics());

// retain topics that match the pattern
Iterator<String> iter = matchedTopics.iterator();
while (iter.hasNext()) {
if (!topicsDescriptor.isMatchingTopic(iter.next())) {
iter.remove();
}
}
matchedTopics.removeIf(s -> !topicsDescriptor.isMatchingTopic(s));

if (matchedTopics.size() != 0) {
if (!matchedTopics.isEmpty()) {
// get partitions only for matched topics
newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
newDiscoveredPartitions =
new ArrayList<>(getAllPartitionsForTopics(matchedTopics));
} else {
newDiscoveredPartitions = null;
}
Expand All @@ -157,14 +154,8 @@ public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, Cl
"Unable to retrieve any partitions with KafkaTopicsDescriptor: "
+ topicsDescriptor);
} else {
Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
KafkaTopicPartition nextPartition;
while (iter.hasNext()) {
nextPartition = iter.next();
if (!setAndCheckDiscoveredPartition(nextPartition)) {
iter.remove();
}
}
newDiscoveredPartitions.removeIf(
nextPartition -> !setAndCheckDiscoveredPartition(nextPartition));
}

return newDiscoveredPartitions;
Expand Down

0 comments on commit 9b97c51

Please sign in to comment.