Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] response not-found error if topic does not exist when calling getPartitionedTopicMetadata #22838

Merged
merged 20 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove the param assumedNonPartitionedNonPersistentTopicAlwaysExists
  • Loading branch information
poorbarcode committed Jun 11, 2024
commit 1e7319ca1324c3398b73b012184529cbc2c09591
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName)
}

protected CompletableFuture<Void> internalCheckNonPartitionedTopicExists(TopicName topicName) {
return pulsar().getNamespaceService().checkNonPartitionedTopicExists(topicName, false)
return pulsar().getNamespaceService().checkNonPartitionedTopicExists(topicName)
.thenAccept(exist -> {
if (!exist) {
throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,15 @@ protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName
.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.LOOKUP, null))
.thenCompose(__ -> {
// Case-1: Non-persistent topic.
// Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists
// in the broker, it doesn't have metadata. If the topic is non-persistent and non-partitioned,
// we'll return the true flag.
return pulsar().getNamespaceService().checkTopicExists(topicName, true).thenCompose(info -> {
// we'll return the true flag. So either it is a partitioned topic or not, the result will be true.
if (!topicName.isPersistent()) {
return CompletableFuture.completedFuture(true);
}
// Case-2: Persistent topic.
return pulsar().getNamespaceService().checkTopicExists(topicName).thenCompose(info -> {
boolean exists = info.isExists();
info.recycle();
if (exists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1403,47 +1403,31 @@ public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(Names
});
}

public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName topic) {
return checkTopicExists(topic, false);
}

/***
* Check topic exists( partitioned or non-partitioned ).
* @param assumedNonPartitionedNonPersistentTopicAlwaysExists Currently, it's hard to check the
* non-persistent-non-partitioned topic, because it only exists in the broker, it doesn't have metadata.
* If the topic is non-persistent and non-partitioned, we'll return the true flag.
*/
public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName topic,
boolean assumedNonPartitionedNonPersistentTopicAlwaysExists) {
public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName topic) {
return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString()))
.thenCompose(metadata -> {
if (metadata.partitions > 0) {
return CompletableFuture.completedFuture(
TopicExistsInfo.newPartitionedTopicExists(metadata.partitions));
}
return checkNonPartitionedTopicExists(topic, assumedNonPartitionedNonPersistentTopicAlwaysExists)
return checkNonPartitionedTopicExists(topic)
.thenApply(b -> b ? TopicExistsInfo.newNonPartitionedTopicExists()
: TopicExistsInfo.newTopicNotExists());
});
}

/***
* Check non-partitioned topic exists.
* @param assumedNonPartitionedNonPersistentTopicAlwaysExists Currently, it's hard to check the
* non-persistent-non-partitioned topic, because it only exists in the broker, it doesn't have metadata.
* If the topic is non-persistent and non-partitioned, we'll return the true flag.
*/
public CompletableFuture<Boolean> checkNonPartitionedTopicExists(TopicName topic,
boolean assumedNonPartitionedNonPersistentTopicAlwaysExists) {
public CompletableFuture<Boolean> checkNonPartitionedTopicExists(TopicName topic) {
if (topic.isPersistent()) {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
} else {
if (assumedNonPartitionedNonPersistentTopicAlwaysExists) {
return CompletableFuture.completedFuture(true);
} else {
return checkNonPersistentNonPartitionedTopicExists(topic.toString());
}
return checkNonPersistentNonPartitionedTopicExists(topic.toString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,9 @@ public void testLookUpWithException() throws Exception {
existFuture.complete(TopicExistsInfo.newNonPartitionedTopicExists());
doReturn(future).when(nameSpaceService).getBrokerServiceUrlAsync(any(), any());
doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
doReturn(existFuture).when(nameSpaceService).checkTopicExists(any(), anyBoolean());
CompletableFuture existBooleanFuture = new CompletableFuture();
existBooleanFuture.complete(false);
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any(), anyBoolean());
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any());
doReturn(nameSpaceService).when(pulsar).getNamespaceService();
AsyncResponse asyncResponse = mock(AsyncResponse.class);
ProducerMessages producerMessages = new ProducerMessages();
Expand All @@ -387,7 +386,7 @@ public void testLookUpTopicNotExist() throws Exception {
CompletableFuture existBooleanFuture = new CompletableFuture();
existBooleanFuture.complete(false);
doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any(), anyBoolean());
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any());
doReturn(nameSpaceService).when(pulsar).getNamespaceService();
AsyncResponse asyncResponse = mock(AsyncResponse.class);
ProducerMessages producerMessages = new ProducerMessages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,9 @@ public void testLookupTopicNotExist() throws Exception {
CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
future.complete(TopicExistsInfo.newTopicNotExists());
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class), anyBoolean());
CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
booleanFuture.complete(false);
doReturn(booleanFuture).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class),
anyBoolean());
doReturn(booleanFuture).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class));

AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
destLookup.lookupTopicAsync(asyncResponse1, TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic_not_exist", false, null, null);
Expand Down Expand Up @@ -270,10 +268,9 @@ public void testValidateReplicationSettingsOnNamespace() throws Exception {
CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
future.complete(TopicExistsInfo.newTopicNotExists());
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class), anyBoolean());
CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
booleanFuture.complete(false);
doReturn(future).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class), anyBoolean());
doReturn(future).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class));
destLookup.lookupTopicAsync(asyncResponse, TopicDomain.persistent.value(), property, cluster, ns2,
"invalid-localCluster", false, null, null);
verify(asyncResponse).resume(arg.capture());
Expand Down
Loading