Skip to content

Commit

Permalink
[fix][broker] Fix consumers are not redirected to migrated cluster (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vineeth1995 authored Aug 18, 2023
1 parent 0cb1c78 commit 4a9fec6
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,20 @@ public void topicMigrated(Optional<ClusterUrl> clusterUrl) {
}
}

public boolean checkAndApplyTopicMigration() {
if (subscription.isSubsciptionMigrated()) {
Optional<ClusterUrl> clusterUrl = AbstractTopic.getMigratedClusterUrl(cnx.getBrokerService().getPulsar());
if (clusterUrl.isPresent()) {
ClusterUrl url = clusterUrl.get();
cnx.getCommandSender().sendTopicMigrated(ResourceType.Consumer, consumerId, url.getBrokerServiceUrl(),
url.getBrokerServiceUrlTls());
// disconnect consumer after sending migrated cluster url
disconnect();
return true;
}
}
return false;
}
/**
* Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/>
* a. consumer must have Shared-subscription<br/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,13 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
});
})
.thenAccept(consumer -> {
if (consumer.checkAndApplyTopicMigration()) {
log.info("[{}] Disconnecting consumer {} on migrated subscription on topic {} / {}",
remoteAddress, consumerId, subscriptionName, topicName);
consumers.remove(consumerId, consumerFuture);
return;
}

if (consumerFuture.complete(consumer)) {
log.info("[{}] Created subscription on topic {} / {}",
remoteAddress, topicName, subscriptionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ default long getNumberOfEntriesDelayed() {

CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties);

boolean isSubsciptionMigrated();

default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
// Default is no-op
}
Expand Down Expand Up @@ -130,4 +132,5 @@ static boolean isCumulativeAckMode(SubType subType) {
static boolean isIndividualAckMode(SubType subType) {
return SubType.Shared.equals(subType) || SubType.Key_Shared.equals(subType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ public NonPersistentDispatcher getDispatcher() {
return this.dispatcher;
}

@Override
public boolean isSubsciptionMigrated() {
return topic.isMigrated();
}

@Override
public CompletableFuture<Void> close() {
IS_FENCED_UPDATER.set(this, TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,12 @@ void topicTerminated() {
}
}

@Override
public boolean isSubsciptionMigrated() {
log.info("backlog for {} - {}", topicName, cursor.getNumberOfEntriesInBacklog(true));
return topic.isMigrated() && cursor.getNumberOfEntriesInBacklog(true) <= 0;
}

@Override
public Map<String, String> getSubscriptionProperties() {
return subscriptionProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2576,8 +2576,14 @@ private boolean hasBacklogs() {
public CompletableFuture<Void> checkClusterMigration() {
Optional<ClusterUrl> clusterUrl = getMigratedClusterUrl();
if (!isMigrated() && clusterUrl.isPresent()) {
log.info("{} triggering topic migration", topic);
return ledger.asyncMigrate().thenCompose(r -> null);
return ledger.asyncMigrate().thenApply(__ -> {
subscriptions.forEach((name, sub) -> {
if (sub.isSubsciptionMigrated()) {
sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration);
}
});
return null;
});
} else {
return CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,27 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t
retryStrategically((test) -> topic2.getSubscription("s2") != null, 10, 500);
assertFalse(topic2.getSubscription("s2").getConsumers().isEmpty());

// new sub on migration topic must be redirected immediately
Consumer<byte[]> consumerM = client1.newConsumer().topic(topicName).subscriptionType(subType)
.subscriptionName("sM").subscribe();
assertFalse(pulsar2.getBrokerService().getTopicReference(topicName).get().getSubscription("sM").getConsumers()
.isEmpty());
consumerM.close();

// migrate topic after creating subscription
String newTopicName = topicName + "-new";
consumerM = client1.newConsumer().topic(newTopicName).subscriptionType(subType)
.subscriptionName("sM").subscribe();
retryStrategically((t) -> pulsar1.getBrokerService().getTopicReference(newTopicName).isPresent(), 5, 100);
pulsar1.getBrokerService().getTopicReference(newTopicName).get().checkClusterMigration().get();
retryStrategically((t) ->
pulsar2.getBrokerService().getTopicReference(newTopicName).isPresent() &&
pulsar2.getBrokerService().getTopicReference(newTopicName).get().getSubscription("sM")
.getConsumers().isEmpty(), 5, 100);
assertFalse(pulsar2.getBrokerService().getTopicReference(newTopicName).get().getSubscription("sM").getConsumers()
.isEmpty());
consumerM.close();

// publish messages to cluster-2 and consume them
for (int i = 0; i < n; i++) {
producer1.send("test2".getBytes());
Expand Down

0 comments on commit 4a9fec6

Please sign in to comment.