Skip to content

Commit

Permalink
[feat] [broker] PIP-188 support blue-green cluster migration [part-2] (
Browse files Browse the repository at this point in the history
…#19605)

Co-authored-by: Vineeth <vineeth.polamreddy@verizonmedia.com>
  • Loading branch information
vineeth1995 and Vineeth authored Apr 10, 2023
1 parent c3a92b2 commit 34b6e89
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,6 @@ default Optional<DispatchRateLimiter> getRateLimiter() {
}

boolean isConnected();

long getNumberOfEntriesInBacklog();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1579,13 +1579,23 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ
if (ex.getCause() instanceof BrokerServiceException.TopicMigratedException) {
Optional<ClusterUrl> clusterURL = getMigratedClusterUrl(service.getPulsar());
if (clusterURL.isPresent()) {
log.info("[{}] redirect migrated producer to topic {}: producerId={}, {}", remoteAddress, topicName,
producerId, ex.getCause().getMessage());
commandSender.sendTopicMigrated(ResourceType.Producer, producerId,
clusterURL.get().getBrokerServiceUrl(), clusterURL.get().getBrokerServiceUrlTls());
closeProducer(producer);
return null;

if (topic.isReplicationBacklogExist()) {
log.info("Topic {} is migrated but replication backlog exist: "
+ "producerId = {}, producerName = {}, {}", topicName,
producerId, producerName, ex.getCause().getMessage());
} else {
log.info("[{}] redirect migrated producer to topic {}: "
+ "producerId={}, producerName = {}, {}", remoteAddress,
topicName, producerId, producerName, ex.getCause().getMessage());
boolean msgSent = commandSender.sendTopicMigrated(ResourceType.Producer, producerId,
clusterURL.get().getBrokerServiceUrl(), clusterURL.get().getBrokerServiceUrlTls());
if (!msgSent) {
log.info("client doesn't support topic migration handling {}-{}-{}", topic,
remoteAddress, producerId);
}
closeProducer(producer);
return null;
}
} else {
log.warn("[{}] failed producer because migration url not configured topic {}: producerId={}, {}",
remoteAddress, topicName, producerId, ex.getCause().getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

boolean isBrokerPublishRateExceeded();

boolean isReplicationBacklogExist();

void disableCnxAutoRead();

void enableCnxAutoRead();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ protected Position getReplicatorReadPosition() {
}

@Override
protected long getNumberOfEntriesInBacklog() {
public long getNumberOfEntriesInBacklog() {
// No-op
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ public void checkMessageDeduplicationInfo() {
// No-op
}

@Override
public boolean isReplicationBacklogExist() {
return false;
}

@Override
public void removeProducer(Producer producer) {
checkArgument(producer.getTopic() == this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ protected Position getReplicatorReadPosition() {
}

@Override
protected long getNumberOfEntriesInBacklog() {
return cursor.getNumberOfEntriesInBacklog(false);
public long getNumberOfEntriesInBacklog() {
return cursor.getNumberOfEntriesInBacklog(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ public void addComplete(Position pos, ByteBuf entryData, Object ctx) {

@Override
public synchronized void addFailed(ManagedLedgerException exception, Object ctx) {
PublishContext callback = (PublishContext) ctx;
if (exception instanceof ManagedLedgerFencedException) {
// If the managed ledger has been fenced, we cannot continue using it. We need to close and reopen
close();
Expand All @@ -587,7 +588,11 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx)
List<CompletableFuture<Void>> futures = new ArrayList<>();
// send migration url metadata to producers before disconnecting them
if (isMigrated()) {
producers.forEach((__, producer) -> producer.topicMigrated(getMigratedClusterUrl()));
if (isReplicationBacklogExist()) {
log.info("Topic {} is migrated but replication backlog exists. Closing producers.", topic);
} else {
producers.forEach((__, producer) -> producer.topicMigrated(getMigratedClusterUrl()));
}
}
producers.forEach((__, producer) -> futures.add(producer.disconnect()));
disconnectProducersFuture = FutureUtil.waitForAll(futures);
Expand All @@ -599,8 +604,6 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx)
return null;
});

PublishContext callback = (PublishContext) ctx;

if (exception instanceof ManagedLedgerAlreadyClosedException) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
Expand Down Expand Up @@ -2510,6 +2513,18 @@ public CompletableFuture<Void> checkClusterMigration() {
}
}

public boolean isReplicationBacklogExist() {
ConcurrentOpenHashMap<String, Replicator> replicators = getReplicators();
if (replicators != null) {
for (Replicator replicator : replicators.values()) {
if (replicator.getNumberOfEntriesInBacklog() != 0) {
return true;
}
}
}
return false;
}

@Override
public void checkGC() {
if (!isDeleteWhileInactive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,20 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
protected URI lookupUrl;

protected boolean isTcpLookup = false;
protected static final String configClusterName = "test";
protected String configClusterName = "test";

protected boolean enableBrokerInterceptor = false;

public MockedPulsarServiceBaseTest() {
resetConfig();
}

protected void setupWithClusterName(String clusterName) throws Exception {
this.conf.setClusterName(clusterName);
this.configClusterName = clusterName;
this.internalSetup();
}

protected PulsarService getPulsar() {
return pulsar;
}
Expand Down
Loading

0 comments on commit 34b6e89

Please sign in to comment.