Skip to content

Commit 678d456

Browse files
authored
KAFKA-19044: Handle tasks that are not present in the current topology (apache#19722)
A heartbeat might be sent to the group coordinator, claiming to own tasks that we do not know about. We need some logic to handle those requests. In KIP-1071, we propose to return `INVALID_REQUEST` error whenever this happens, effectively letting the clients crash. This behavior will, however, make topology updates impossible. Bruno Cadonna proposed to only check that owned tasks match our set of expected tasks if the topology epochs between the group and the client match. The aim of this change is to implement a check and a behavior for the first version of the protocol, which is to always return `INVALID_REQUEST` if an unknown task is sent to the group coordinator. We can relax this constraint once we allow topology updating with topology epochs. To efficiently check this whenever we receive a heartbeat containing tasks, we precompute the number of tasks for each subtopology. This also benefits the performance of the assignor. Reviewers: Bill Bejeck <bbejeck@apache.org>
1 parent 949617b commit 678d456

File tree

11 files changed

+160
-47
lines changed

11 files changed

+160
-47
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@
156156
import org.apache.kafka.coordinator.group.streams.assignor.StickyTaskAssignor;
157157
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
158158
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
159+
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
159160
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
160161
import org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
161162
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
@@ -195,6 +196,7 @@
195196
import java.util.Optional;
196197
import java.util.OptionalInt;
197198
import java.util.Set;
199+
import java.util.SortedMap;
198200
import java.util.concurrent.CompletableFuture;
199201
import java.util.concurrent.TimeUnit;
200202
import java.util.function.BiFunction;
@@ -1665,6 +1667,34 @@ private static void throwIfStreamsGroupMemberEpochIsInvalid(
16651667
}
16661668
}
16671669

1670+
/**
1671+
* Validates that the requested tasks exist in the configured topology and partitions are valid.
1672+
* If tasks is null, does nothing. If an invalid task is found, throws InvalidRequestException.
1673+
*
1674+
* @param subtopologySortedMap The configured topology.
1675+
* @param tasks The list of requested tasks.
1676+
*/
1677+
private static void throwIfRequestContainsInvalidTasks(
1678+
SortedMap<String, ConfiguredSubtopology> subtopologySortedMap,
1679+
List<StreamsGroupHeartbeatRequestData.TaskIds> tasks
1680+
) {
1681+
if (tasks == null || tasks.isEmpty()) return;
1682+
for (StreamsGroupHeartbeatRequestData.TaskIds task : tasks) {
1683+
String subtopologyId = task.subtopologyId();
1684+
ConfiguredSubtopology subtopology = subtopologySortedMap.get(subtopologyId);
1685+
if (subtopology == null) {
1686+
throw new InvalidRequestException("Subtopology " + subtopologyId + " does not exist in the topology.");
1687+
}
1688+
int numTasks = subtopology.numberOfTasks();
1689+
for (Integer partition : task.partitions()) {
1690+
if (partition < 0 || partition >= numTasks) {
1691+
throw new InvalidRequestException("Task " + partition + " for subtopology " + subtopologyId +
1692+
" is invalid. Number of tasks for this subtopology: " + numTasks);
1693+
}
1694+
}
1695+
}
1696+
}
1697+
16681698
/**
16691699
* Validates if the received classic member protocols are supported by the group.
16701700
*
@@ -1917,6 +1947,13 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
19171947
updatedConfiguredTopology = group.configuredTopology().get();
19181948
}
19191949

1950+
if (updatedConfiguredTopology.isReady()) {
1951+
SortedMap<String, ConfiguredSubtopology> subtopologySortedMap = updatedConfiguredTopology.subtopologies().get();
1952+
throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedActiveTasks);
1953+
throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedStandbyTasks);
1954+
throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedWarmupTasks);
1955+
}
1956+
19201957
// Actually bump the group epoch
19211958
int groupEpoch = group.groupEpoch();
19221959
if (bumpGroupEpoch) {

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@
1919
import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
2020
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
2121
import org.apache.kafka.image.MetadataImage;
22-
import org.apache.kafka.image.TopicImage;
2322

2423
import java.util.Collections;
2524
import java.util.List;
2625
import java.util.NoSuchElementException;
2726
import java.util.Objects;
2827
import java.util.SortedMap;
29-
import java.util.stream.Stream;
3028

3129
/**
3230
* The topology metadata class is used by the {@link org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor} to get topic and
@@ -42,14 +40,6 @@ public record TopologyMetadata(MetadataImage metadataImage, SortedMap<String, Co
4240
subtopologyMap = Objects.requireNonNull(Collections.unmodifiableSortedMap(subtopologyMap));
4341
}
4442

45-
/**
46-
* @return The metadata image in topology metadata.
47-
*/
48-
@Override
49-
public MetadataImage metadataImage() {
50-
return this.metadataImage;
51-
}
52-
5343
/**
5444
* Checks whether the given subtopology is associated with a changelog topic.
5545
*
@@ -85,18 +75,7 @@ public List<String> subtopologies() {
8575
@Override
8676
public int maxNumInputPartitions(String subtopologyId) {
8777
final ConfiguredSubtopology subtopology = getSubtopologyOrFail(subtopologyId);
88-
return Stream.concat(
89-
subtopology.sourceTopics().stream(),
90-
subtopology.repartitionSourceTopics().keySet().stream()
91-
).map(topic -> {
92-
TopicImage topicImage = metadataImage.topics().getTopic(topic);
93-
if (topicImage == null) {
94-
throw new IllegalStateException("Topic " + topic + " not found in metadata image");
95-
}
96-
return topicImage.partitions().size();
97-
}).max(Integer::compareTo).orElseThrow(
98-
() -> new IllegalStateException("Subtopology does not contain any source topics")
99-
);
78+
return subtopology.numberOfTasks();
10079
}
10180

10281
private ConfiguredSubtopology getSubtopologyOrFail(String subtopologyId) {

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,10 @@ public Map<String, Integer> setup() {
8585
}
8686
}
8787

88-
log.debug("Expecting state changelog topic partitions {} for the requested topology.",
89-
changelogTopicPartitions.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")));
88+
if (!changelogTopicPartitions.isEmpty()) {
89+
log.debug("Expecting state changelog topic partitions {} for the requested topology.",
90+
changelogTopicPartitions.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")));
91+
}
9092

9193
return changelogTopicPartitions;
9294
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,25 @@
3131
* <p>
3232
* Configured subtopologies may be recreated every time the input topics used by the subtopology are modified.
3333
*
34+
* @param numberOfTasks Precomputed number of tasks. Not that not every source topic may have a partition for
35+
* every task, in cases where there are multiple source topics with an unequal number of
36+
* partitions (e.g., one topic has 3 partitions and another has 5 and both are used in a
37+
* merge).
3438
* @param sourceTopics The source topics of the subtopology.
3539
* @param repartitionSourceTopics The repartition source topics of the subtopology.
3640
* @param repartitionSinkTopics The repartition sink topics of the subtopology.
3741
* @param stateChangelogTopics The state changelog topics of the subtopology.
3842
*/
39-
public record ConfiguredSubtopology(Set<String> sourceTopics,
43+
public record ConfiguredSubtopology(int numberOfTasks,
44+
Set<String> sourceTopics,
4045
Map<String, ConfiguredInternalTopic> repartitionSourceTopics,
4146
Set<String> repartitionSinkTopics,
4247
Map<String, ConfiguredInternalTopic> stateChangelogTopics) {
4348

4449
public ConfiguredSubtopology {
50+
if (numberOfTasks <= 0) {
51+
throw new IllegalArgumentException("Number of tasks must be positive");
52+
}
4553
Objects.requireNonNull(sourceTopics, "sourceTopics can't be null");
4654
Objects.requireNonNull(repartitionSourceTopics, "repartitionSourceTopics can't be null");
4755
Objects.requireNonNull(repartitionSinkTopics, "repartitionSinkTopics can't be null");
@@ -61,4 +69,6 @@ public StreamsGroupDescribeResponseData.Subtopology asStreamsGroupDescribeSubtop
6169
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList());
6270
}
6371

72+
73+
6474
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public static ConfiguredTopology configureTopics(LogContext logContext,
8282
subtopologies.stream()
8383
.collect(Collectors.toMap(
8484
StreamsGroupTopologyValue.Subtopology::subtopologyId,
85-
x -> fromPersistedSubtopology(x, decidedPartitionCountsForInternalTopics),
85+
x -> fromPersistedSubtopology(x, topicsImage, decidedPartitionCountsForInternalTopics),
8686
(v1, v2) -> {
8787
throw new RuntimeException(String.format("Duplicate key for values %s and %s", v1, v2));
8888
},
@@ -264,9 +264,11 @@ private static CreatableTopic toCreatableTopic(final ConfiguredInternalTopic con
264264
}
265265

266266
private static ConfiguredSubtopology fromPersistedSubtopology(final StreamsGroupTopologyValue.Subtopology subtopology,
267+
final TopicsImage topicsImage,
267268
final Map<String, Integer> decidedPartitionCountsForInternalTopics
268269
) {
269270
return new ConfiguredSubtopology(
271+
computeNumberOfTasks(subtopology, topicsImage, decidedPartitionCountsForInternalTopics),
270272
new HashSet<>(subtopology.sourceTopics()),
271273
subtopology.repartitionSourceTopics().stream()
272274
.map(x -> fromPersistedTopicInfo(x, decidedPartitionCountsForInternalTopics))
@@ -278,6 +280,21 @@ private static ConfiguredSubtopology fromPersistedSubtopology(final StreamsGroup
278280
);
279281
}
280282

283+
private static int computeNumberOfTasks(final StreamsGroupTopologyValue.Subtopology subtopology,
284+
final TopicsImage topicsImage,
285+
final Map<String, Integer> decidedPartitionCountsForInternalTopics) {
286+
return Stream.concat(
287+
subtopology.sourceTopics().stream(),
288+
subtopology.repartitionSourceTopics().stream().map(StreamsGroupTopologyValue.TopicInfo::name)
289+
).map(
290+
topic -> getPartitionCount(topicsImage, topic, decidedPartitionCountsForInternalTopics).orElseThrow(
291+
() -> new IllegalStateException("Number of partitions must be set for topic " + topic)
292+
)
293+
).max(Integer::compareTo).orElseThrow(
294+
() -> new IllegalStateException("Subtopology does not contain any source topics")
295+
);
296+
}
297+
281298
private static ConfiguredInternalTopic fromPersistedTopicInfo(final StreamsGroupTopologyValue.TopicInfo topicInfo,
282299
final Map<String, Integer> decidedPartitionCountsForInternalTopics) {
283300
if (topicInfo.partitions() == 0 && !decidedPartitionCountsForInternalTopics.containsKey(topicInfo.name())) {

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.common.errors.IllegalGenerationException;
2929
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
3030
import org.apache.kafka.common.errors.InvalidRegularExpression;
31+
import org.apache.kafka.common.errors.InvalidRequestException;
3132
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
3233
import org.apache.kafka.common.errors.RebalanceInProgressException;
3334
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -16014,6 +16015,67 @@ public void testStreamsGroupMemberEpochValidation() {
1601416015
assertEquals(100, result.response().data().memberEpoch());
1601516016
}
1601616017

16018+
@Test
16019+
public void testStreamsOwnedTasksValidation() {
16020+
String groupId = "fooup";
16021+
String memberId = Uuid.randomUuid().toString();
16022+
String subtopology1 = "subtopology1";
16023+
String subtopologyMissing = "subtopologyMissing";
16024+
String fooTopicName = "foo";
16025+
Uuid fooTopicId = Uuid.randomUuid();
16026+
Topology topology = new Topology().setSubtopologies(List.of(
16027+
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
16028+
));
16029+
16030+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
16031+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
16032+
.withStreamsGroupTaskAssignors(List.of(assignor))
16033+
.withMetadataImage(new MetadataImageBuilder()
16034+
.addTopic(fooTopicId, fooTopicName, 3)
16035+
.build())
16036+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
16037+
.withMember(streamsGroupMemberBuilderWithDefaults(memberId)
16038+
.setMemberEpoch(10)
16039+
.setPreviousMemberEpoch(10)
16040+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
16041+
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
16042+
.build())
16043+
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
16044+
.withTargetAssignment(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
16045+
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
16046+
.withTargetAssignmentEpoch(10)
16047+
)
16048+
.build();
16049+
16050+
InvalidRequestException e1 = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat(
16051+
new StreamsGroupHeartbeatRequestData()
16052+
.setGroupId(groupId)
16053+
.setMemberId(memberId)
16054+
.setMemberEpoch(10)
16055+
.setActiveTasks(List.of(
16056+
new StreamsGroupHeartbeatRequestData.TaskIds()
16057+
.setSubtopologyId(subtopologyMissing)
16058+
.setPartitions(List.of(0))
16059+
))
16060+
.setStandbyTasks(List.of())
16061+
.setWarmupTasks(List.of())));
16062+
assertEquals(e1.getMessage(), "Subtopology subtopologyMissing does not exist in the topology.");
16063+
16064+
InvalidRequestException e2 = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat(
16065+
new StreamsGroupHeartbeatRequestData()
16066+
.setGroupId(groupId)
16067+
.setMemberId(memberId)
16068+
.setMemberEpoch(10)
16069+
.setActiveTasks(List.of(
16070+
new StreamsGroupHeartbeatRequestData.TaskIds()
16071+
.setSubtopologyId(subtopology1)
16072+
.setPartitions(List.of(3))
16073+
))
16074+
.setStandbyTasks(List.of())
16075+
.setWarmupTasks(List.of())));
16076+
assertEquals(e2.getMessage(), "Task 3 for subtopology subtopology1 is invalid. Number of tasks for this subtopology: 3");
16077+
}
16078+
1601716079
@Test
1601816080
public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() {
1601916081
String groupId = "fooup";

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -713,7 +713,7 @@ public String addSubtopologyWithSingleSourceTopic(
713713
String subtopologyId = Uuid.randomUuid().toString();
714714
Uuid topicId = Uuid.randomUuid();
715715
topicsImageBuilder = topicsImageBuilder.addTopic(topicId, topicName, numTasks);
716-
subtopologies.put(subtopologyId, new ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(), Map.of()));
716+
subtopologies.put(subtopologyId, new ConfiguredSubtopology(numTasks, Set.of(topicId.toString()), Map.of(), Set.of(), Map.of()));
717717

718718
return subtopologyId;
719719
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.NoSuchElementException;
31-
import java.util.Set;
3231
import java.util.SortedMap;
3332
import java.util.TreeMap;
3433

@@ -82,11 +81,9 @@ void testIsStateful() {
8281

8382
@Test
8483
void testMaxNumInputPartitions() {
85-
ConfiguredInternalTopic internalTopic = mock(ConfiguredInternalTopic.class);
8684
ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
8785
subtopologyMap.put("subtopology1", subtopology);
88-
when(subtopology.sourceTopics()).thenReturn(Set.of("source_topic"));
89-
when(subtopology.repartitionSourceTopics()).thenReturn(Map.of("repartition_source_topic", internalTopic));
86+
when(subtopology.numberOfTasks()).thenReturn(4);
9087

9188
assertEquals(4, topologyMetadata.maxNumInputPartitions("subtopology1"));
9289
}
@@ -111,14 +108,4 @@ void testIsStatefulThrowsExceptionWhenSubtopologyIdDoesNotExist() {
111108
void testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyIdDoesNotExist() {
112109
assertThrows(NoSuchElementException.class, () -> topologyMetadata.maxNumInputPartitions("non_existent_subtopology"));
113110
}
114-
115-
@Test
116-
void testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyContainsNoSourceTopics() {
117-
ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
118-
when(subtopology.sourceTopics()).thenReturn(Set.of());
119-
when(subtopology.repartitionSourceTopics()).thenReturn(Map.of());
120-
subtopologyMap.put("subtopology1", subtopology);
121-
122-
assertThrows(IllegalStateException.class, () -> topologyMetadata.maxNumInputPartitions("subtopology1"));
123-
}
124-
}
111+
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class ConfiguredSubtopologyTest {
3636
public void testConstructorWithNullSourceTopics() {
3737
assertThrows(NullPointerException.class,
3838
() -> new ConfiguredSubtopology(
39+
2,
3940
null,
4041
Map.of(),
4142
Set.of(),
@@ -48,6 +49,7 @@ public void testConstructorWithNullSourceTopics() {
4849
public void testConstructorWithNullRepartitionSourceTopics() {
4950
assertThrows(NullPointerException.class,
5051
() -> new ConfiguredSubtopology(
52+
2,
5153
Set.of(),
5254
null,
5355
Set.of(),
@@ -60,6 +62,7 @@ public void testConstructorWithNullRepartitionSourceTopics() {
6062
public void testConstructorWithNullRepartitionSinkTopics() {
6163
assertThrows(NullPointerException.class,
6264
() -> new ConfiguredSubtopology(
65+
2,
6366
Set.of(),
6467
Map.of(),
6568
null,
@@ -72,6 +75,7 @@ public void testConstructorWithNullRepartitionSinkTopics() {
7275
public void testConstructorWithNullStateChangelogTopics() {
7376
assertThrows(NullPointerException.class,
7477
() -> new ConfiguredSubtopology(
78+
2,
7579
Set.of(),
7680
Map.of(),
7781
Set.of(),
@@ -80,6 +84,19 @@ public void testConstructorWithNullStateChangelogTopics() {
8084
);
8185
}
8286

87+
@Test
88+
public void testConstructorWithNegativeTaskCount() {
89+
assertThrows(IllegalArgumentException.class,
90+
() -> new ConfiguredSubtopology(
91+
-1,
92+
Set.of(),
93+
Map.of(),
94+
Set.of(),
95+
Map.of()
96+
)
97+
);
98+
}
99+
83100
@Test
84101
public void testAsStreamsGroupDescribeSubtopology() {
85102
String subtopologyId = "subtopology1";
@@ -91,7 +108,7 @@ public void testAsStreamsGroupDescribeSubtopology() {
91108
Map<String, ConfiguredInternalTopic> repartitionSourceTopics = Map.of("repartitionSourceTopic1", internalTopicMock);
92109
Map<String, ConfiguredInternalTopic> stateChangelogTopics = Map.of("stateChangelogTopic1", internalTopicMock);
93110
ConfiguredSubtopology configuredSubtopology = new ConfiguredSubtopology(
94-
sourceTopics, repartitionSourceTopics, repartitionSinkTopics, stateChangelogTopics);
111+
1, sourceTopics, repartitionSourceTopics, repartitionSinkTopics, stateChangelogTopics);
95112

96113
StreamsGroupDescribeResponseData.Subtopology subtopology = configuredSubtopology.asStreamsGroupDescribeSubtopology(subtopologyId);
97114

0 commit comments

Comments
 (0)