Skip to content

Commit 4749007

Browse files
committed
refactor(inkless): add artificial latency to metadata update
When refreshing metadata, new requests to next broker may be processed _before_ the last request on the previous broker, causing OutOfOrderSequence. These errors are retried and data finally lands in order. However to avoid this error, this proposal adds artificial latency to the metadata response when it includes inkless topics. This way it gives enough time for the previous broker to complete processing requests, before the new one takes on.
1 parent 3e2ff5f commit 4749007

File tree

3 files changed

+28
-12
lines changed

3 files changed

+28
-12
lines changed

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class KafkaApis(val requestChannel: RequestChannel,
125125
val describeTopicPartitionsRequestHandler = new DescribeTopicPartitionsRequestHandler(
126126
metadataCache, authHelper, config)
127127

128-
val inklessTopicMetadataTransformer = inklessSharedState.map(s => new InklessTopicMetadataTransformer(s.metadata()))
128+
val inklessTopicMetadataTransformer = inklessSharedState.map(s => new InklessTopicMetadataTransformer(time, s.metadata()))
129129

130130
def close(): Unit = {
131131
aclApis.close()

storage/inkless/src/main/java/io/aiven/inkless/metadata/InklessTopicMetadataTransformer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
2222
import org.apache.kafka.common.message.MetadataResponseData;
2323
import org.apache.kafka.common.network.ListenerName;
24+
import org.apache.kafka.common.utils.Time;
2425
import org.apache.kafka.metadata.LeaderAndIsr;
2526

2627
import java.util.Collections;
@@ -33,11 +34,13 @@
3334
import io.aiven.inkless.control_plane.MetadataView;
3435

3536
public class InklessTopicMetadataTransformer {
37+
private final Time time;
3638
private final MetadataView metadataView;
3739

3840
private final AtomicInteger roundRobinCounter = new AtomicInteger();
3941

40-
public InklessTopicMetadataTransformer(final MetadataView metadataView) {
42+
public InklessTopicMetadataTransformer(final Time time, final MetadataView metadataView) {
43+
this.time = Objects.requireNonNull(time, "time cannot be null");
4144
this.metadataView = Objects.requireNonNull(metadataView, "metadataView cannot be null");
4245
}
4346

@@ -52,10 +55,12 @@ public void transformClusterMetadata(
5255
Objects.requireNonNull(topicMetadata, "topicMetadata cannot be null");
5356

5457
final int leaderForInklessPartitions = selectLeaderForInklessPartitions(listenerName, clientId);
58+
boolean hasInklessTopics = false;
5559
for (final var topic : topicMetadata) {
5660
if (!metadataView.isInklessTopic(topic.name())) {
5761
continue;
5862
}
63+
hasInklessTopics = true;
5964
for (final var partition : topic.partitions()) {
6065
partition.setLeaderId(leaderForInklessPartitions);
6166
final List<Integer> list = List.of(leaderForInklessPartitions);
@@ -65,6 +70,11 @@ public void transformClusterMetadata(
6570
partition.setLeaderEpoch(LeaderAndIsr.INITIAL_LEADER_EPOCH);
6671
}
6772
}
73+
if (hasInklessTopics) {
74+
// Introduce artificial latency to avoid a race condition between the metadata update and the producer
75+
// causing OutOfOrderSequenceException.
76+
time.sleep(500);
77+
}
6878
}
6979

7080
/**

storage/inkless/src/test/java/io/aiven/inkless/metadata/InklessTopicMetadataTransformerTest.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.kafka.common.network.ListenerName;
2929
import org.apache.kafka.common.security.auth.SecurityProtocol;
3030

31+
import org.apache.kafka.common.utils.MockTime;
32+
import org.apache.kafka.common.utils.Time;
3133
import org.junit.jupiter.api.BeforeEach;
3234
import org.junit.jupiter.api.Nested;
3335
import org.junit.jupiter.api.Test;
@@ -62,16 +64,20 @@ class InklessTopicMetadataTransformerTest {
6264
static final Uuid TOPIC_CLASSIC_ID = new Uuid(456, 456);
6365
static final ListenerName LISTENER_NAME = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
6466

67+
Time time = new MockTime();
6568
@Mock
6669
MetadataView metadataView;
6770

6871
@Test
6972
void nulls() {
70-
assertThatThrownBy(() -> new InklessTopicMetadataTransformer(null))
73+
assertThatThrownBy(() -> new InklessTopicMetadataTransformer(null, metadataView))
74+
.isInstanceOf(NullPointerException.class)
75+
.hasMessage("time cannot be null");
76+
assertThatThrownBy(() -> new InklessTopicMetadataTransformer(time, null))
7177
.isInstanceOf(NullPointerException.class)
7278
.hasMessage("metadataView cannot be null");
7379

74-
final var transformer = new InklessTopicMetadataTransformer(metadataView);
80+
final var transformer = new InklessTopicMetadataTransformer(time, metadataView);
7581
assertThatThrownBy(() -> transformer.transformClusterMetadata(LISTENER_NAME, "x", null))
7682
.isInstanceOf(NullPointerException.class)
7783
.hasMessage("topicMetadata cannot be null");
@@ -94,7 +100,7 @@ void setup() {
94100
@NullSource
95101
@ValueSource(strings = {"inkless_az=az1", "x=y", ""})
96102
void clusterMetadata(final String clientId) {
97-
final var transformer = new InklessTopicMetadataTransformer(metadataView);
103+
final var transformer = new InklessTopicMetadataTransformer(time, metadataView);
98104

99105
final List<MetadataResponseTopic> topicMetadata = List.of();
100106
transformer.transformClusterMetadata(LISTENER_NAME, clientId, topicMetadata);
@@ -105,7 +111,7 @@ void clusterMetadata(final String clientId) {
105111
@NullSource
106112
@ValueSource(strings = {"inkless_az=az1", "x=y", ""})
107113
void describeTopicResponse(final String clientId) {
108-
final var transformer = new InklessTopicMetadataTransformer(metadataView);
114+
final var transformer = new InklessTopicMetadataTransformer(time, metadataView);
109115

110116
final DescribeTopicPartitionsResponseData describeResponse = new DescribeTopicPartitionsResponseData();
111117
transformer.transformDescribeTopicResponse(LISTENER_NAME, clientId, describeResponse);
@@ -186,7 +192,7 @@ void clusterMetadata(final String clientAZ, final int expectedLeaderId1, final i
186192
inklessTopicMetadata.get(),
187193
classicTopicMetadata.get()
188194
);
189-
final var transformer = new InklessTopicMetadataTransformer(metadataView);
195+
final var transformer = new InklessTopicMetadataTransformer(time, metadataView);
190196

191197
transformer.transformClusterMetadata(LISTENER_NAME, "inkless_az=" + clientAZ, topicMetadata);
192198

@@ -279,7 +285,7 @@ void describeTopicResponse(final String clientAZ, final int expectedLeaderId1, f
279285
inklessTopicMetadata.get(),
280286
classicTopicMetadata.get()
281287
).iterator()));
282-
final var transformer = new InklessTopicMetadataTransformer(metadataView);
288+
final var transformer = new InklessTopicMetadataTransformer(time, metadataView);
283289

284290
transformer.transformDescribeTopicResponse(LISTENER_NAME, "inkless_az=" + clientAZ, describeResponse);
285291

@@ -333,7 +339,7 @@ void clusterMetadata() {
333339
));
334340

335341
final List<MetadataResponseTopic> topicMetadata = List.of(inklessTopicMetadata.get());
336-
final var transformer = new InklessTopicMetadataTransformer(metadataView);
342+
final var transformer = new InklessTopicMetadataTransformer(time, metadataView);
337343

338344
transformer.transformClusterMetadata(LISTENER_NAME, "inkless_az=az0", topicMetadata);
339345
final var expectedInklessTopicMetadata = inklessTopicMetadata.get();
@@ -368,7 +374,7 @@ void describeTopicResponse() {
368374
))
369375
).iterator()));
370376

371-
final var transformer = new InklessTopicMetadataTransformer(metadataView);
377+
final var transformer = new InklessTopicMetadataTransformer(time, metadataView);
372378

373379
final DescribeTopicPartitionsResponseData describeResponse = describeResponseSupplier.get();
374380
transformer.transformDescribeTopicResponse(LISTENER_NAME, "inkless_az=az0", describeResponse);
@@ -413,7 +419,7 @@ void clusterMetadata() {
413419
));
414420

415421
final List<MetadataResponseTopic> topicMetadata = List.of(inklessTopicMetadata.get());
416-
final var transformer = new InklessTopicMetadataTransformer(metadataView);
422+
final var transformer = new InklessTopicMetadataTransformer(time, metadataView);
417423

418424
transformer.transformClusterMetadata(LISTENER_NAME, null, topicMetadata);
419425
final var expectedInklessTopicMetadata = inklessTopicMetadata.get();
@@ -448,7 +454,7 @@ void describeTopicResponse() {
448454
))
449455
).iterator()));
450456

451-
final var transformer = new InklessTopicMetadataTransformer(metadataView);
457+
final var transformer = new InklessTopicMetadataTransformer(time, metadataView);
452458
final DescribeTopicPartitionsResponseData describeResponse = describeResponseSupplier.get();
453459

454460
transformer.transformDescribeTopicResponse(LISTENER_NAME, null, describeResponse);

0 commit comments

Comments
 (0)