Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,8 @@ private void makeFollowers(

// Truncate the follower replicas LEO to highWatermark.
// TODO this logic need to be removed after we introduce leader epoch cache, and fetcher
// manager support truncating while fetching. See FLUSS-56112423
// manager support truncating while fetching. Trace by
// https://github.com/alibaba/fluss/issues/673
truncateToHighWatermark(replicasBecomeFollower);

// add fetcher for those follower replicas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,13 @@ private LogAppendInfo processFetchResultFromLocalStorage(
// For the follower replica, we do not need to keep its segment base offset and physical
// position. These values will be computed upon becoming leader or handling a preferred read
// replica fetch.
logTablet.updateHighWatermark(replicaData.getHighWatermark());
// TODO, to avoid lose data in case of leader change, we now change to update highWatermark
// first for follower instead of first for leader. The reason why can see
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation
// for more details. However, this is just a temporary solution, if we want to have a strong
// consistency guarantee, we should do as KIP-101 do, trace by:
// https://github.com/alibaba/fluss/issues/673
logTablet.updateHighWatermark(logTablet.localLogEndOffset());
LOG.trace(
"Follower received high watermark {} from the leader for replica {}",
replicaData.getHighWatermark(),
Expand All @@ -502,7 +508,7 @@ private long processFetchResultFromRemoteStorage(
RemoteLogManager rlm = replicaManager.getRemoteLogManager();

// TODO after introduce leader epoch cache, we need to rebuild the local leader epoch
// cache.
// cache. Trace by https://github.com/alibaba/fluss/issues/673

// update next fetch offset and writer id snapshot in local.
for (RemoteLogSegment remoteLogSegment : rlFetchInfo.remoteLogSegmentList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,42 @@ void testSimpleFetch() throws Exception {
.isEqualTo(20L));
}

@Test
void testFollowerHighWatermarkHigherThanOrEqualToLeader() throws Exception {
Replica leaderReplica = leaderRM.getReplicaOrException(tb);
Replica followerReplica = followerRM.getReplicaOrException(tb);

followerFetcher.addBuckets(
Collections.singletonMap(
tb, new InitialFetchStatus(DATA1_TABLE_ID, leader.id(), 0L)));
assertThat(leaderReplica.getLocalLogEndOffset()).isEqualTo(0L);
assertThat(leaderReplica.getLogHighWatermark()).isEqualTo(0L);
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(0L);
assertThat(followerReplica.getLogHighWatermark()).isEqualTo(0L);
// begin fetcher thread.
followerFetcher.start();

CompletableFuture<List<ProduceLogResultForBucket>> future;
for (int i = 0; i < 1000; i++) {
long baseOffset = i * 10L;
future = new CompletableFuture<>();
leaderRM.appendRecordsToLog(
1000,
1, // don't wait ack
Collections.singletonMap(tb, genMemoryLogRecordsByObject(DATA1)),
future::complete);
assertThat(future.get())
.containsOnly(new ProduceLogResultForBucket(tb, baseOffset, baseOffset + 10L));
retry(
Duration.ofSeconds(20),
() ->
assertThat(followerReplica.getLocalLogEndOffset())
.isEqualTo(baseOffset + 10L));
assertThat(followerReplica.getLogHighWatermark())
.isGreaterThanOrEqualTo(leaderReplica.getLogHighWatermark());
}
}

private void registerTableInZkClient() throws Exception {
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();
zkClient.registerTable(
Expand Down