Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix broker immediately redirect migration eligible consumers #20351

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[fix][broker] Fix broker immediately redirect migration eligible cons…
…umers
  • Loading branch information
rdhabalia committed May 19, 2023
commit df11296cfc85ff52d1030d8472305624fbf2b606
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,21 @@ public void topicMigrated(Optional<ClusterUrl> clusterUrl) {
}
}

public boolean checkAndApplyTopicMigration() {
if (subscription.isSubsciptionMigrated()) {
Optional<ClusterUrl> clusterUrl = AbstractTopic.getMigratedClusterUrl(cnx.getBrokerService().getPulsar());
if (clusterUrl.isPresent()) {
ClusterUrl url = clusterUrl.get();
cnx.getCommandSender().sendTopicMigrated(ResourceType.Consumer, consumerId, url.getBrokerServiceUrl(),
url.getBrokerServiceUrlTls());
// disconnect consumer after sending migrated cluster url
disconnect();
return true;
}
}
return false;
}

/**
* Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/>
* a. consumer must have Shared-subscription<br/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,12 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
}
})
.thenAccept(consumer -> {
if (consumer.checkAndApplyTopicMigration()) {
log.info("[{}] Disconnecting consumer {} on migrated subscription on topic {} / {}",
remoteAddress, consumerId, subscriptionName, topicName);
consumers.remove(consumerId, consumerFuture);
return;
}
if (consumerFuture.complete(consumer)) {
log.info("[{}] Created subscription on topic {} / {}",
remoteAddress, topicName, subscriptionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ default long getNumberOfEntriesDelayed() {

CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties);

boolean isSubsciptionMigrated();

default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
// Default is no-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ public NonPersistentDispatcher getDispatcher() {
return this.dispatcher;
}

@Override
public boolean isSubsciptionMigrated() {
return topic.isMigrated();
}

@Override
public CompletableFuture<Void> close() {
IS_FENCED_UPDATER.set(this, TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,12 @@ void topicTerminated() {
}
}

@Override
public boolean isSubsciptionMigrated() {
log.info("backlog for {} - {}", topicName, cursor.getNumberOfEntriesInBacklog(true));
return topic.isMigrated() && cursor.getNumberOfEntriesInBacklog(true) <= 0;
}

@Override
public Map<String, String> getSubscriptionProperties() {
return subscriptionProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2556,12 +2556,20 @@ private boolean hasBacklogs() {
@Override
public CompletableFuture<Void> checkClusterMigration() {
Optional<ClusterUrl> clusterUrl = getMigratedClusterUrl();
if (!isMigrated() && clusterUrl.isPresent()) {
log.info("{} triggering topic migration", topic);
return ledger.asyncMigrate().thenCompose(r -> null);
} else {
if (!clusterUrl.isPresent()) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Position> migrationFuture = !isMigrated() ? ledger.asyncMigrate()
: CompletableFuture.completedFuture(null);
log.info("{} triggering topic migration", topic);
return migrationFuture.thenApply(__ -> {
subscriptions.forEach((name, sub) -> {
if (sub.isSubsciptionMigrated()) {
sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration);
}
});
return null;
});
}

public boolean isReplicationBacklogExist() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,27 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t
retryStrategically((test) -> topic2.getSubscription("s2") != null, 10, 500);
assertFalse(topic2.getSubscription("s2").getConsumers().isEmpty());

// new sub on migration topic must be redirected immediately
Consumer<byte[]> consumerM = client1.newConsumer().topic(topicName).subscriptionType(subType)
.subscriptionName("sM").subscribe();
assertFalse(pulsar2.getBrokerService().getTopicReference(topicName).get().getSubscription("sM").getConsumers()
.isEmpty());
consumerM.close();

// migrate topic after creating subscription
String newTopicName = topicName + "-new";
consumerM = client1.newConsumer().topic(newTopicName).subscriptionType(subType)
.subscriptionName("sM").subscribe();
retryStrategically((t) -> pulsar1.getBrokerService().getTopicReference(newTopicName).isPresent(), 5, 100);
pulsar1.getBrokerService().getTopicReference(newTopicName).get().checkClusterMigration().get();
retryStrategically((t) ->
pulsar2.getBrokerService().getTopicReference(newTopicName).isPresent() &&
pulsar2.getBrokerService().getTopicReference(newTopicName).get().getSubscription("sM")
.getConsumers().isEmpty(), 5, 100);
assertFalse(pulsar2.getBrokerService().getTopicReference(newTopicName).get().getSubscription("sM").getConsumers()
.isEmpty());
consumerM.close();

// publish messages to cluster-2 and consume them
for (int i = 0; i < n; i++) {
producer1.send("test2".getBytes());
Expand Down
Loading