Skip to content

Commit 33aa6e5

Browse files
authored
fix(zerozone2): fix the upgrade from v1 to v2 (#2985)
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
1 parent 152d74b commit 33aa6e5

File tree

4 files changed

+15
-14
lines changed

4 files changed

+15
-14
lines changed

core/src/main/java/kafka/automq/partition/snapshot/PartitionSnapshotsManager.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public synchronized CompletableFuture<AutomqGetPartitionSnapshotResponse> snapsh
171171
long finalSessionEpoch = sessionEpoch;
172172
CompletableFuture<Void> collectPartitionSnapshotsCf;
173173
if (!requestCommit && inflightCommitCfSet.isEmpty()) {
174-
collectPartitionSnapshotsCf = collectPartitionSnapshots(resp);
174+
collectPartitionSnapshotsCf = collectPartitionSnapshots(requestVersion, resp);
175175
} else {
176176
collectPartitionSnapshotsCf = CompletableFuture.completedFuture(null);
177177
}
@@ -203,15 +203,15 @@ public synchronized boolean expired() {
203203
return time.milliseconds() - lastGetSnapshotsTimestamp > 60000;
204204
}
205205

206-
private CompletableFuture<Void> collectPartitionSnapshots(AutomqGetPartitionSnapshotResponseData resp) {
206+
private CompletableFuture<Void> collectPartitionSnapshots(short requestVersion, AutomqGetPartitionSnapshotResponseData resp) {
207207
Map<Uuid, List<PartitionSnapshot>> topic2partitions = new HashMap<>();
208208
List<CompletableFuture<Void>> completeCfList = COMPLETE_CF_LIST_LOCAL.get();
209209
completeCfList.clear();
210210
removed.forEach(partition -> {
211211
PartitionSnapshotVersion version = synced.remove(partition);
212212
if (version != null) {
213213
List<PartitionSnapshot> partitionSnapshots = topic2partitions.computeIfAbsent(partition.topicId().get(), topic -> new ArrayList<>());
214-
partitionSnapshots.add(snapshot(partition, version, null, completeCfList));
214+
partitionSnapshots.add(snapshot(requestVersion, partition, version, null, completeCfList));
215215
}
216216
});
217217
removed.clear();
@@ -221,7 +221,7 @@ private CompletableFuture<Void> collectPartitionSnapshots(AutomqGetPartitionSnap
221221
if (!Objects.equals(p.version, oldVersion)) {
222222
List<PartitionSnapshot> partitionSnapshots = topic2partitions.computeIfAbsent(p.partition.topicId().get(), topic -> new ArrayList<>());
223223
PartitionSnapshotVersion newVersion = p.version.copy();
224-
PartitionSnapshot partitionSnapshot = snapshot(p.partition, oldVersion, newVersion, completeCfList);
224+
PartitionSnapshot partitionSnapshot = snapshot(requestVersion, p.partition, oldVersion, newVersion, completeCfList);
225225
partitionSnapshots.add(partitionSnapshot);
226226
synced.put(p.partition, newVersion);
227227
}
@@ -239,7 +239,7 @@ private CompletableFuture<Void> collectPartitionSnapshots(AutomqGetPartitionSnap
239239
return retCf;
240240
}
241241

242-
private PartitionSnapshot snapshot(Partition partition, PartitionSnapshotVersion oldVersion,
242+
private PartitionSnapshot snapshot(short requestVersion, Partition partition, PartitionSnapshotVersion oldVersion,
243243
PartitionSnapshotVersion newVersion, List<CompletableFuture<Void>> completeCfList) {
244244
if (newVersion == null) {
245245
// partition is closed
@@ -268,7 +268,9 @@ private PartitionSnapshot snapshot(Partition partition, PartitionSnapshotVersion
268268
if (includeSegments) {
269269
snapshot.setLogMetadata(logMetadata(src.logMeta()));
270270
}
271-
snapshot.setLastTimestampOffset(timestampOffset(src.lastTimestampOffset()));
271+
if (requestVersion > ZERO_ZONE_V0_REQUEST_VERSION) {
272+
snapshot.setLastTimestampOffset(timestampOffset(src.lastTimestampOffset()));
273+
}
272274
return snapshot;
273275
});
274276
}

core/src/main/java/kafka/automq/zerozone/AsyncSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public BrokersAsyncSender(
118118
brokerConfig.connectionSetupTimeoutMs(),
119119
brokerConfig.connectionSetupTimeoutMaxMs(),
120120
time,
121-
false,
121+
true,
122122
new ApiVersions(),
123123
logContext,
124124
MetadataRecoveryStrategy.REBOOTSTRAP

core/src/main/java/kafka/automq/zerozone/RouterIn.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,7 @@ public CompletableFuture<AutomqZoneRouterResponse> handleZoneRouterRequest0(Rout
112112
.thenCompose(rst -> prevLastRouterCf.thenApply(nil -> rst))
113113
.thenComposeAsync(produces -> {
114114
List<CompletableFuture<AutomqZoneRouterResponseData.Response>> cfList = new ArrayList<>();
115-
produces.stream().map(request -> {
116-
try (request) {
117-
return append(request);
118-
}
119-
}).forEach(cfList::add);
115+
produces.stream().map(this::append).forEach(cfList::add);
120116
return CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0])).thenApply(nil -> {
121117
AutomqZoneRouterResponseData response = new AutomqZoneRouterResponseData();
122118
cfList.forEach(cf -> response.responses().add(cf.join()));

core/src/main/java/kafka/automq/zerozone/SubscriberRequester.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ private void request0() {
115115

116116
tryReset0();
117117
lastRequestTime = time.milliseconds();
118-
AutomqGetPartitionSnapshotRequestData data = new AutomqGetPartitionSnapshotRequestData().setSessionId(sessionId).setSessionEpoch(sessionEpoch).setVersion((short) 1);
118+
AutomqGetPartitionSnapshotRequestData data = new AutomqGetPartitionSnapshotRequestData().setSessionId(sessionId).setSessionEpoch(sessionEpoch);
119119
if (version.isZeroZoneV2Supported()) {
120120
data.setVersion((short) 1);
121121
}
@@ -198,7 +198,10 @@ private void handleResponse(ClientResponse clientResponse, CompletableFuture<Voi
198198
int c2 = o2.operation.code() == SnapshotOperation.REMOVE.code() ? 0 : 1;
199199
return c1 - c2;
200200
});
201-
subscriber.onNewWalEndOffset(resp.confirmWalConfig(), DefaultRecordOffset.of(Unpooled.wrappedBuffer(resp.confirmWalEndOffset())));
201+
if (resp.confirmWalEndOffset() != null && resp.confirmWalEndOffset().length > 0) {
202+
// zerozone v2
203+
subscriber.onNewWalEndOffset(resp.confirmWalConfig(), DefaultRecordOffset.of(Unpooled.wrappedBuffer(resp.confirmWalEndOffset())));
204+
}
202205
batch.operations.add(SnapshotWithOperation.snapshotMark(snapshotCf));
203206
subscriber.onNewOperationBatch(batch);
204207
}

0 commit comments

Comments
 (0)