Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker]Create missed subscriptions for createMissedPartitions #17946

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update comments
  • Loading branch information
huangzegui committed Nov 12, 2022
commit 0ee2f24e66404af4ca5dad317a095d71006c9b1e
Original file line number Diff line number Diff line change
Expand Up @@ -509,16 +509,15 @@ protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPar

protected void internalCreateMissedPartitions(AsyncResponse asyncResponse, List<String> subscriptions) {
getPartitionedTopicMetadataAsync(topicName, false, false)
.thenApply(metadata -> {
.thenCompose(metadata ->{
if (metadata == null) {
throw new RestException(Status.NOT_FOUND, String.format(
"Partitioned Topic not found: %s ,has no metadata", topicName.toString()));
}
return metadata;
return tryCreatePartitionsAsync(metadata.partitions)
.thenCompose(__ ->
createMissedSubscriptionsAsync(topicName, subscriptions, metadata.partitions));
})
.thenCompose(metadata ->
tryCreatePartitionsAsync(metadata.partitions).thenApply(ignore -> metadata.partitions))
.thenCompose(numPartitions -> createMissedSubscriptionsAsync(topicName, subscriptions, numPartitions))
.thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Expand Down Expand Up @@ -4574,7 +4573,8 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
* @param numPartitions : number partitions for the topics
*
*/
private CompletableFuture<Void> createMissedSubscriptionsAsync(TopicName topicName, List<String> subscriptions, int numPartitions) {
private CompletableFuture<Void> createMissedSubscriptionsAsync(TopicName topicName, List<String> subscriptions,
int numPartitions) {
CompletableFuture<Void> result = new CompletableFuture<>();
if (CollectionUtils.isEmpty(subscriptions)) {
result.complete(null);
Expand All @@ -4583,19 +4583,18 @@ private CompletableFuture<Void> createMissedSubscriptionsAsync(TopicName topicNa
PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
Jason918 marked this conversation as resolved.
Show resolved Hide resolved
} catch (PulsarServerException e1) {
result.completeExceptionally(e1);
} catch (PulsarServerException ex) {
result.completeExceptionally(ex);
return result;
}

List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();
List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>(subscriptions.size());

subscriptions.forEach(subscription -> {
for (int i = 0; i < numPartitions; i++) {
final String topicNamePartition = topicName.getPartition(i).toString();
CompletableFuture<Void> future = new CompletableFuture<>();
admin.topics().createSubscriptionAsync(topicNamePartition,
subscription, MessageId.latest).whenComplete((__, ex) -> {
admin.topics().createSubscriptionAsync(topicName.getPartition(i).toString(), subscription,
MessageId.latest).whenComplete((__, ex) -> {
if (ex == null) {
future.complete(null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.assertj.core.util.Lists;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -121,20 +120,20 @@ public Object[] restCreateMissedPartitions() {
}

@Test(timeOut = 60000, dataProvider = "restCreateMissedPartitions")
public void testCreateMissedPartitions(boolean useRestApi) throws PulsarAdminException, PulsarClientException, MetadataStoreException {
public void testCreateMissedPartitions(boolean useRestApi) throws Exception {
conf.setAllowAutoTopicCreation(false);
final String topic = "testCreateMissedPartitions-useRestApi-" + useRestApi;
final String group1 = "cg_testCreateMissedPartitions-1";
final String group2 = "cg_testCreateMissedPartitions-2";
final String sub1 = "sub-1";
final String sub2 = "sub-2";
final TopicName topicName = TopicName.get(topic);
int numPartitions = 3;
// simulate partitioned topic without partitions
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.createPartitionedTopicAsync(TopicName.get(topic),
new PartitionedTopicMetadata(numPartitions));
new PartitionedTopicMetadata(numPartitions)).get();
Consumer<byte[]> consumer = null;
try {
consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(group1).subscribeAsync().get(3, TimeUnit.SECONDS);
consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(sub1).subscribeAsync().get(3, TimeUnit.SECONDS);
} catch (Exception e) {
//ok here, consumer will create failed with 'Topic does not exist'
}
Expand All @@ -146,7 +145,7 @@ public void testCreateMissedPartitions(boolean useRestApi) throws PulsarAdminExc
admin.topics().createNonPartitionedTopic(topicName.getPartition(i).toString());
}
}
consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(group1).subscribe();
consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(sub1).subscribe();
Assert.assertNotNull(consumer);
Assert.assertTrue(consumer instanceof MultiTopicsConsumerImpl);
Assert.assertEquals(((MultiTopicsConsumerImpl)consumer).getConsumers().size(), numPartitions);
Expand All @@ -155,22 +154,22 @@ public void testCreateMissedPartitions(boolean useRestApi) throws PulsarAdminExc
int newNumPartitions = 5;
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.updatePartitionedTopicAsync(TopicName.get(topic),p ->
new PartitionedTopicMetadata(newNumPartitions, p.properties));
new PartitionedTopicMetadata(newNumPartitions, p.properties)).get();

TopicStatsImpl topicStats;
for (int i = 0; i < newNumPartitions; i++) {
try {
topicStats = (TopicStatsImpl) admin.topics().getStats(topicName.getPartition(i).toString());
Assert.assertTrue(i < numPartitions);
Assert.assertTrue(topicStats.getSubscriptions().containsKey(group1));
Assert.assertTrue(topicStats.getSubscriptions().containsKey(sub1));
} catch (PulsarAdminException ex) {
Assert.assertTrue(numPartitions <= i);
Assert.assertEquals(ex.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode());
}
}

if (useRestApi) {
admin.topics().createMissedPartitions(topic, Lists.newArrayList(group2));
admin.topics().createMissedPartitions(topic, Lists.newArrayList(sub2));
} else {
for (int i = numPartitions; i < newNumPartitions; i++) {
admin.topics().createNonPartitionedTopic(topicName.getPartition(i).toString());
Expand All @@ -181,9 +180,9 @@ public void testCreateMissedPartitions(boolean useRestApi) throws PulsarAdminExc
topicStats = (TopicStatsImpl) admin.topics().getStats(topicName.getPartition(i).toString());
Assert.assertNotNull(topicStats);
if (useRestApi) {
Assert.assertTrue(i < numPartitions && topicStats.getSubscriptions().containsKey(group1) ||
numPartitions <= i && !topicStats.getSubscriptions().containsKey(group1));
Assert.assertTrue(topicStats.getSubscriptions().containsKey(group2));
Assert.assertTrue(i < numPartitions && topicStats.getSubscriptions().containsKey(sub1) ||
numPartitions <= i && !topicStats.getSubscriptions().containsKey(sub1));
Assert.assertTrue(topicStats.getSubscriptions().containsKey(sub2));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1627,7 +1627,11 @@ public void topics() throws Exception {
verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null);

cmdTopics.run(split("create-missed-partitions persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).createMissedPartitions("persistent://myprop/clust/ns1/ds1");
verify(mockTopics).createMissedPartitions("persistent://myprop/clust/ns1/ds1", null);
Pomelongan marked this conversation as resolved.
Show resolved Hide resolved

cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("create-missed-partitions persistent://myprop/clust/ns1/ds1 -s sub1,sub2"));
verify(mockTopics).createMissedPartitions("persistent://myprop/clust/ns1/ds1", Lists.newArrayList("sub1", "sub2"));

cmdTopics.run(split("create persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).createNonPartitionedTopic("persistent://myprop/clust/ns1/ds1", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,10 +559,15 @@ private class CreateMissedPartitionsCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-s", "--subscriptions" }, description = "List of subscriptions to create")
private String subscriptions;

@Override
void run() throws Exception {
String topic = validateTopicName(params);
getTopics().createMissedPartitions(topic);
List<String> subscriptionList =
isNotBlank(subscriptions) ? Lists.newArrayList(subscriptions.split(",")) : null;
getTopics().createMissedPartitions(topic, subscriptionList);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1887,20 +1887,22 @@ admin.topics().createPartitionedTopic(topicName, numPartitions);

### Create missed partitions

When topic auto-creation is disabled, and you have a partitioned topic without any partitions, you can use the [`create-missed-partitions`](reference-pulsar-admin.md#create-missed-partitions) command to create partitions for the topic.
When topic auto-creation is disabled, and you have a partitioned topic without any partitions, you can use the [`create-missed-partitions`](reference-pulsar-admin.md#create-missed-partitions) command to create partitions for the topic, and you can also specify the subscription list to create subscriptions for all partitions of the topic.

````mdx-code-block
<Tabs
defaultValue="pulsar-admin"
values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}>
<TabItem value="pulsar-admin">

You can create missed partitions with the [`create-missed-partitions`](reference-pulsar-admin.md#create-missed-partitions) command and specify the topic name as an argument.
When you create missed partitions with the [`create-missed-partitions`](reference-pulsar-admin.md#create-missed-partitions)
command,you need to specify the topic name as an argument, and use the `-s` or `--subscriptions` flag to selectively specify the subscription list to create.

```shell

$ bin/pulsar-admin topics create-missed-partitions \
persistent://my-tenant/my-namespace/my-topic \
--subscriptions my-subscription-1,my-subscription-2

```

Expand All @@ -1915,7 +1917,8 @@ $ bin/pulsar-admin topics create-missed-partitions \
```java

String topicName = "persistent://my-tenant/my-namespace/my-topic";
admin.topics().createMissedPartitions(topicName);
String subscriptionName = "my-subscription";
admin.topics().createMissedPartitions(topicName, Lists.newArrayList(subscriptionName));

```

Expand Down