Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,10 @@
import org.apache.kafka.coordinator.group.streams.assignor.StickyTaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException;
import org.apache.kafka.coordinator.group.streams.topics.TopologyValidationResult;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
Expand All @@ -189,7 +188,6 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -1732,21 +1730,20 @@ private static void throwIfStreamsGroupMemberEpochIsInvalid(
* Validates that the requested tasks exist in the configured topology and partitions are valid.
* If tasks is null, does nothing. If an invalid task is found, throws InvalidRequestException.
*
* @param subtopologySortedMap The configured topology.
* @param tasks The list of requested tasks.
* @param maxPartitionsPerSubtopology The max partitions per subtopology.
* @param tasks The list of requested tasks.
*/
private static void throwIfRequestContainsInvalidTasks(
SortedMap<String, ConfiguredSubtopology> subtopologySortedMap,
Map<String, Integer> maxPartitionsPerSubtopology,
List<StreamsGroupHeartbeatRequestData.TaskIds> tasks
) {
if (tasks == null || tasks.isEmpty()) return;
for (StreamsGroupHeartbeatRequestData.TaskIds task : tasks) {
String subtopologyId = task.subtopologyId();
ConfiguredSubtopology subtopology = subtopologySortedMap.get(subtopologyId);
if (subtopology == null) {
Integer numTasks = maxPartitionsPerSubtopology.get(subtopologyId);
if (numTasks == null) {
throw new InvalidRequestException("Subtopology " + subtopologyId + " does not exist in the topology.");
}
int numTasks = subtopology.numberOfTasks();
for (Integer partition : task.partitions()) {
if (partition < 0 || partition >= numTasks) {
throw new InvalidRequestException("Task " + partition + " for subtopology " + subtopologyId +
Expand Down Expand Up @@ -1979,10 +1976,10 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
maybeSetTopologyStaleStatus(group, updatedMember, returnedStatus);

// 3. Determine any internal topics if needed.
ConfiguredTopology updatedConfiguredTopology;
TopologyValidationResult updatedTopologyValidationResult;
boolean reconfigureTopology = group.topology().isEmpty();
long metadataHash = group.metadataHash();
if (reconfigureTopology || group.configuredTopology().isEmpty() || group.hasMetadataExpired(currentTimeMs)) {
if (reconfigureTopology || group.topologyValidationResult().isEmpty() || group.hasMetadataExpired(currentTimeMs)) {

metadataHash = group.computeMetadataHash(
metadataImage,
Expand All @@ -1997,26 +1994,26 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
reconfigureTopology = true;
}

if (reconfigureTopology || group.configuredTopology().isEmpty()) {
if (reconfigureTopology || group.topologyValidationResult().isEmpty()) {
log.info("[GroupId {}][MemberId {}] Configuring the topology {}", groupId, memberId, updatedTopology.topologyEpoch());
LogContext topicManagerLogContext = new LogContext(String.format("%s[GroupId %s][MemberId %s] ", logContext.logPrefix(), groupId, memberId));
updatedConfiguredTopology = InternalTopicManager.configureTopics(topicManagerLogContext, metadataHash, updatedTopology, metadataImage, time);
group.setConfiguredTopology(updatedConfiguredTopology);
updatedTopologyValidationResult = InternalTopicManager.configureTopics(topicManagerLogContext, updatedTopology, metadataImage, time);
group.setTopologyValidationResult(updatedTopologyValidationResult);
} else {
updatedConfiguredTopology = group.configuredTopology().get();
updatedTopologyValidationResult = group.topologyValidationResult().get();
}
} else {
updatedConfiguredTopology = group.configuredTopology().get();
updatedTopologyValidationResult = group.topologyValidationResult().get();
}

// 3b. If the topology is validated, persist the fact that it is validated.
int validatedTopologyEpoch = -1;
if (updatedConfiguredTopology.isReady()) {
if (updatedTopologyValidationResult.isReady()) {
validatedTopologyEpoch = updatedTopology.topologyEpoch();
SortedMap<String, ConfiguredSubtopology> subtopologySortedMap = updatedConfiguredTopology.subtopologies().get();
throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedActiveTasks);
throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedStandbyTasks);
throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedWarmupTasks);
Map<String, Integer> maxPartitionsPerSubtopology = updatedTopologyValidationResult.maxPartitionsPerSubtopology().get();
throwIfRequestContainsInvalidTasks(maxPartitionsPerSubtopology, ownedActiveTasks);
throwIfRequestContainsInvalidTasks(maxPartitionsPerSubtopology, ownedStandbyTasks);
throwIfRequestContainsInvalidTasks(maxPartitionsPerSubtopology, ownedWarmupTasks);
}
// We validated a topology that was not validated before, so bump the group epoch as we may have to reassign tasks.
if (validatedTopologyEpoch != group.validatedTopologyEpoch()) {
Expand Down Expand Up @@ -2082,8 +2079,8 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
group,
groupEpoch,
Optional.of(updatedMember),
updatedConfiguredTopology,
metadataImage,
updatedTopology,
updatedTopologyValidationResult,
records,
currentAssignmentConfigs
);
Expand Down Expand Up @@ -2135,7 +2132,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
}

if (group.endpointInformationEpoch() != memberEndpointEpoch) {
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group, updatedMember));
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group, updatedMember, updatedTopology));
}
if (groups.containsKey(group.groupId())) {
// If we just created the group, the endpoint information epoch will not be persisted, so return epoch 0.
Expand All @@ -2144,9 +2141,9 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
}

Map<String, CreatableTopic> internalTopicsToBeCreated = Collections.emptyMap();
if (updatedConfiguredTopology.topicConfigurationException().isPresent()) {
TopicConfigurationException exception = updatedConfiguredTopology.topicConfigurationException().get();
internalTopicsToBeCreated = updatedConfiguredTopology.internalTopicsToBeCreated();
if (updatedTopologyValidationResult.topicConfigurationException().isPresent()) {
TopicConfigurationException exception = updatedTopologyValidationResult.topicConfigurationException().get();
internalTopicsToBeCreated = updatedTopologyValidationResult.internalTopicsToBeCreated();
returnedStatus.add(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(exception.status().code())
Expand Down Expand Up @@ -2252,21 +2249,24 @@ private static List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGrou
.collect(Collectors.toList());
}

private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> maybeBuildEndpointToPartitions(StreamsGroup group,
StreamsGroupMember updatedMember) {
private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> maybeBuildEndpointToPartitions(
StreamsGroup group,
StreamsGroupMember updatedMember,
StreamsTopology topology
) {
List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> endpointToPartitionsList = new ArrayList<>();
final Map<String, StreamsGroupMember> members = group.members();
// Build endpoint information for all members except the updated member
for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) {
if (updatedMember != null && entry.getKey().equals(updatedMember.memberId())) {
continue;
}
EndpointToPartitionsManager.maybeEndpointToPartitions(entry.getValue(), group, metadataImage)
EndpointToPartitionsManager.maybeEndpointToPartitions(entry.getValue(), topology, metadataImage)
.ifPresent(endpointToPartitionsList::add);
}
// Always build endpoint information for the updated member (whether new or existing)
if (updatedMember != null) {
EndpointToPartitionsManager.maybeEndpointToPartitions(updatedMember, group, metadataImage)
EndpointToPartitionsManager.maybeEndpointToPartitions(updatedMember, topology, metadataImage)
.ifPresent(endpointToPartitionsList::add);
}
return endpointToPartitionsList;
Expand Down Expand Up @@ -3945,19 +3945,21 @@ private Assignment updateTargetAssignment(
/**
* Updates the target assignment according to the updated member and metadata image.
*
* @param group The StreamsGroup.
* @param groupEpoch The group epoch.
* @param updatedMember The updated member (optional).
* @param metadataImage The metadata image.
* @param records The list to accumulate any new records.
* @param group The StreamsGroup.
* @param groupEpoch The group epoch.
* @param updatedMember The updated member (optional).
* @param topology The streams topology.
* @param topologyValidationResult The topology validation result.
* @param records The list to accumulate any new records.
* @param assignmentConfigs The assignment configurations.
* @return The new target assignment for the updated member, or EMPTY if no member specified.
*/
private TasksTuple updateStreamsTargetAssignment(
StreamsGroup group,
int groupEpoch,
Optional<StreamsGroupMember> updatedMember,
ConfiguredTopology configuredTopology,
CoordinatorMetadataImage metadataImage,
StreamsTopology topology,
TopologyValidationResult topologyValidationResult,
List<CoordinatorRecord> records,
Map<String, String> assignmentConfigs
) {
Expand All @@ -3971,9 +3973,9 @@ private TasksTuple updateStreamsTargetAssignment(
assignmentConfigs
)
.withMembers(group.members())
.withTopology(configuredTopology)
.withTopology(topology)
.withTopologyValidationResult(topologyValidationResult)
.withStaticMembers(group.staticMembers())
.withMetadataImage(metadataImage)
.withTargetAssignment(group.targetAssignment());

updatedMember.ifPresent(member ->
Expand Down Expand Up @@ -4027,8 +4029,13 @@ private CoordinatorResult<Void, CoordinatorRecord> computeDelayedTargetAssignmen
throw new IllegalStateException("Group epoch should be always larger to assignment epoch");
}

if (!group.configuredTopology().isPresent()) {
log.warn("[GroupId {}] Cannot compute delayed target assignment: configured topology is not present", groupId);
if (!group.topologyValidationResult().isPresent()) {
log.warn("[GroupId {}] Cannot compute delayed target assignment: topology validation result is not present", groupId);
return EMPTY_RESULT;
}

if (!group.topology().isPresent()) {
log.warn("[GroupId {}] Cannot compute delayed target assignment: topology is not present", groupId);
return EMPTY_RESULT;
}

Expand All @@ -4037,8 +4044,8 @@ private CoordinatorResult<Void, CoordinatorRecord> computeDelayedTargetAssignmen
group,
group.groupEpoch(),
Optional.empty(),
group.configuredTopology().get(),
metadataImage,
group.topology().get(),
group.topologyValidationResult().get(),
records,
group.lastAssignmentConfigs()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.coordinator.group.streams.topics.TopologyValidationResult;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineInteger;
Expand Down Expand Up @@ -184,9 +184,9 @@ public static class DeadlineAndEpoch {
private final TimelineObject<Optional<StreamsTopology>> topology;

/**
* The configured topology including resolved regular expressions.
* The topology validation result including resolved regular expressions.
*/
private final TimelineObject<Optional<ConfiguredTopology>> configuredTopology;
private final TimelineObject<Optional<TopologyValidationResult>> topologyValidationResult;

/**
* The metadata refresh deadline. It consists of a timestamp in milliseconds together with the group epoch at the time of setting it.
Expand Down Expand Up @@ -237,7 +237,7 @@ public StreamsGroup(
this.currentStandbyTaskToProcessIds = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentWarmupTaskToProcessIds = new TimelineHashMap<>(snapshotRegistry, 0);
this.topology = new TimelineObject<>(snapshotRegistry, Optional.empty());
this.configuredTopology = new TimelineObject<>(snapshotRegistry, Optional.empty());
this.topologyValidationResult = new TimelineObject<>(snapshotRegistry, Optional.empty());
this.lastAssignmentConfigs = new TimelineHashMap<>(snapshotRegistry, 0);
}

Expand Down Expand Up @@ -275,8 +275,8 @@ public ListGroupsResponseData.ListedGroup asListedGroup(long committedOffset) {
.setGroupType(type().toString());
}

public Optional<ConfiguredTopology> configuredTopology() {
return configuredTopology.get();
public Optional<TopologyValidationResult> topologyValidationResult() {
return topologyValidationResult.get();
}

public Optional<StreamsTopology> topology() {
Expand All @@ -288,8 +288,8 @@ public void setTopology(StreamsTopology topology) {
maybeUpdateGroupState();
}

public void setConfiguredTopology(ConfiguredTopology configuredTopology) {
this.configuredTopology.set(Optional.ofNullable(configuredTopology));
public void setTopologyValidationResult(TopologyValidationResult topologyValidationResult) {
this.topologyValidationResult.set(Optional.ofNullable(topologyValidationResult));
}

/**
Expand Down Expand Up @@ -1082,14 +1082,9 @@ public StreamsGroupDescribeResponseData.DescribedGroup asDescribedGroup(
.setGroupState(state.get(committedOffset).toString())
.setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset))
.setTopology(
configuredTopology.get(committedOffset)
.filter(ConfiguredTopology::isReady)
.map(ConfiguredTopology::asStreamsGroupDescribeTopology)
.orElse(
topology.get(committedOffset)
.map(StreamsTopology::asStreamsGroupDescribeTopology)
.orElseThrow(() -> new IllegalStateException("There should always be a topology for a streams group."))
)
topology.get(committedOffset)
.map(StreamsTopology::asStreamsGroupDescribeTopology)
.orElseThrow(() -> new IllegalStateException("There should always be a topology for a streams group."))
);
members.entrySet(committedOffset).forEach(
entry -> describedGroup.members().add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* Contains the topology sent by a Streams client in the Streams heartbeat during initialization.
* <p>
* This topology is used together with the partition metadata on the broker to create a
* {@link org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology configured topology}.
* {@link org.apache.kafka.coordinator.group.streams.topics.TopologyValidationResult topology validation result}.
* This class allows to look-up subtopologies by subtopology ID in constant time by getting the subtopologies map.
* The information in this class is fully backed by records stored in the __consumer_offsets topic.
*
Expand Down
Loading
Loading