Skip to content

Commit

Permalink
[fix] Combination of autocreate + forced delete of partitioned topic …
Browse files Browse the repository at this point in the history
…with active consumer leaves topic metadata inconsistent. (apache#17308)
  • Loading branch information
dlg99 authored Sep 1, 2022
1 parent 5647109 commit 9529850
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,13 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
.map(PersistentSubscription::getName).toList();
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs));
} else if (TopicName.get(topic).isPartitioned()
&& (getProducers().size() > 0 || getNumberOfConsumers() > 0)
&& getBrokerService().isAllowAutoTopicCreation(topic)) {
// to avoid inconsistent metadata as a result
return FutureUtil.failedFuture(
new TopicBusyException("Partitioned topic has active consumers or producers and "
+ "auto-creation of topic is allowed"));
}

fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.admin;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;

Expand All @@ -26,16 +28,25 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -54,6 +65,7 @@ protected int numberOfAdditionalBrokers() {
@Override
protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setManagedLedgerMaxEntriesPerLedger(10);
}

@Override
Expand Down Expand Up @@ -122,4 +134,90 @@ public void testTopicLookup(TopicDomain topicDomain, boolean isPartition) throws
Assert.assertEquals(lookupResultSet.size(), 1);
}

@Test
public void testForceDeletePartitionedTopicWithSub() throws Exception {
final int numPartitions = 10;
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));

admin.namespaces().setAutoTopicCreation("tenant-xyz/ns-abc",
AutoTopicCreationOverride.builder()
.allowAutoTopicCreation(true)
.topicType("partitioned")
.defaultNumPartitions(5)
.build());

RetentionPolicies retention = new RetentionPolicies(10, 10);
admin.namespaces().setRetention("tenant-xyz/ns-abc", retention);
final String topic = "persistent://tenant-xyz/ns-abc/topic-"
+ RandomStringUtils.randomAlphabetic(5)
+ "-testDeletePartitionedTopicWithSub";
final String subscriptionName = "sub";
((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get();

log.info("Creating producer and consumer");
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subscriptionName)
.subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topic).create();

log.info("producing messages");
for (int i = 0; i < numPartitions * 100; ++i) {
producer.newMessage()
.key("" + i)
.value("value-" + i)
.send();
}
producer.flush();
producer.close();

log.info("consuming some messages");
for (int i = 0; i < numPartitions * 5; i++) {
Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
}

log.info("trying to delete the topic");
try {
admin.topics().deletePartitionedTopic(topic, true);
fail("expected PulsarAdminException.NotFoundException");
} catch (PulsarAdminException e) {
assertTrue(e.getMessage().contains("Partitioned topic has active consumers or producers"));
}

// check that metadata is still consistent
assertEquals(numPartitions, admin.topics().getList("tenant-xyz/ns-abc")
.stream().filter(t -> t.contains(topic)).count());
assertEquals(numPartitions,
pulsar.getPulsarResources().getTopicResources()
.getExistingPartitions(TopicName.getPartitionedTopicName(topic))
.get()
.stream().filter(t -> t.contains(topic)).count());
assertTrue(admin.topics()
.getPartitionedTopicList("tenant-xyz/ns-abc")
.contains(topic));

log.info("closing producer and consumer");
producer.close();
consumer.close();

log.info("trying to delete the topic again");
admin.topics().deletePartitionedTopic(topic, true);

assertEquals(0, admin.topics().getList("tenant-xyz/ns-abc")
.stream().filter(t -> t.contains(topic)).count());
assertEquals(0,
pulsar.getPulsarResources().getTopicResources()
.getExistingPartitions(TopicName.getPartitionedTopicName(topic))
.get()
.stream().filter(t -> t.contains(topic)).count());
assertFalse(admin.topics()
.getPartitionedTopicList("tenant-xyz/ns-abc")
.contains(topic));

log.info("trying to create the topic again");
((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,7 @@ public void testSubscribeRate() throws Exception {
pulsarClient.updateServiceUrl(lookupUrl.toString());
Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected()));
pulsar.getConfiguration().setAuthorizationEnabled(true);
consumer.close();
admin.topics().deletePartitionedTopic(topicName, true);
admin.namespaces().deleteNamespace(namespace);
admin.tenants().deleteTenant("my-tenants");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.netty.util.HashedWheelTimer;
import lombok.Cleanup;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -331,7 +332,12 @@ public void topicDeleted(String ignored, boolean partitioned) throws Exception {
p1.send("msg-1");

if (partitioned) {
admin.topics().deletePartitionedTopic(topic, true);
try {
admin.topics().deletePartitionedTopic(topic, true);
fail("expected error because partitioned topic has active producer");
} catch (PulsarAdminException.ServerSideErrorException e) {
// expected
}
} else {
admin.topics().delete(topic, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,34 +1171,6 @@ public void testSubscriptionMustCompleteWhenOperationTimeoutOnMultipleTopics() t
}
}

@Test(timeOut = testTimeout)
public void testAutoDiscoverMultiTopicsPartitions() throws Exception {
final String topicName = "persistent://public/default/issue-9585";
admin.topics().createPartitionedTopic(topicName, 3);
PatternMultiTopicsConsumerImpl<String> consumer = (PatternMultiTopicsConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topicsPattern(topicName)
.subscriptionName("sub-issue-9585")
.subscribe();

Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3);
Assert.assertEquals(consumer.getConsumers().size(), 3);

admin.topics().deletePartitionedTopic(topicName, true);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0);
Assert.assertEquals(consumer.getConsumers().size(), 0);
});

admin.topics().createPartitionedTopic(topicName, 7);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7);
Assert.assertEquals(consumer.getConsumers().size(), 7);
});
}


@Test(timeOut = testTimeout)
public void testPartitionsUpdatesForMultipleTopics() throws Exception {
final String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.topics;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;

/**
* Test cases for compaction.
*/
@Slf4j
public class TestTopicDeletion extends PulsarTestSuite {

final private boolean unload = false;
final private int numBrokers = 2;

public void setupCluster() throws Exception {
brokerEnvs.put("managedLedgerMaxEntriesPerLedger", "10");
brokerEnvs.put("brokerDeleteInactivePartitionedTopicMetadataEnabled", "false");
brokerEnvs.put("brokerDeleteInactiveTopicsEnabled", "false");
this.setupCluster("");
}

protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
String clusterName,
PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
specBuilder.numBrokers(numBrokers);
specBuilder.enableContainerLog(true);
return specBuilder;
}

@Test(dataProvider = "ServiceUrls", timeOut=300_000)
public void testPartitionedTopicForceDeletion(Supplier<String> serviceUrl) throws Exception {

log.info("Creating tenant and namespace");

final String tenant = "test-partitioned-topic-" + randomName(4);
final String namespace = tenant + "/ns1";
final String topic = "persistent://" + namespace + "/partitioned-topic";
final int numPartitions = numBrokers * 3;
final int numKeys = numPartitions * 50;
final String subscriptionName = "sub1";

this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");

this.createNamespace(namespace);

pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
"set-clusters", "--clusters", pulsarCluster.getClusterName(), namespace);

pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
"set-retention", "--size", "100M", "--time", "100m", namespace);

this.createPartitionedTopic(topic, numPartitions);

try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) {

log.info("Creating consumer");
Consumer<byte[]> consumer = client.newConsumer()
.topic(topic)
.subscriptionName(subscriptionName)
.subscribe();

log.info("Producing messages");
try(Producer<byte[]> producer = client.newProducer()
.topic(topic)
.create()
) {
for (int i = 0; i < numKeys; i++) {
producer.newMessage()
.key("" + i)
.value(("value-" + i).getBytes(UTF_8))
.sendAsync();
}
producer.flush();
log.info("Successfully wrote {} values", numKeys);
}

log.info("Consuming half of the messages");
for (int i = 0; i < numKeys / 2; i++) {
Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
log.info("Read value {}", m.getKey());
}

if (unload) {
log.info("Unloading topic");
pulsarCluster.runAdminCommandOnAnyBroker("topics",
"unload", topic);
}

ContainerExecResult res;
log.info("Deleting the topic");
try {
res = pulsarCluster.runAdminCommandOnAnyBroker("topics",
"delete-partitioned-topic", "--force", topic);
assertNotEquals(0, res.getExitCode());
} catch (ContainerExecException e) {
log.info("Second delete failed with ContainerExecException, could be ok", e);
if (!e.getMessage().contains("with error code 1")) {
fail("Expected different error code");
}
}

log.info("Close the consumer and delete the topic again");
consumer.close();

res = pulsarCluster.runAdminCommandOnAnyBroker("topics",
"delete-partitioned-topic", "--force", topic);
assertNotEquals(0, res.getExitCode());

Thread.sleep(5000);
// should succeed
log.info("Creating the topic again");
this.createPartitionedTopic(topic, numBrokers * 2);
}
}


private ContainerExecResult createTenantName(final String tenantName,
final String allowedClusterName,
final String adminRoleName) throws Exception {
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
"tenants", "create", "--allowed-clusters", allowedClusterName,
"--admin-roles", adminRoleName, tenantName);
assertEquals(0, result.getExitCode());
return result;
}

private ContainerExecResult createNamespace(final String Ns) throws Exception {
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces",
"create",
"--clusters",
pulsarCluster.getClusterName(), Ns);
assertEquals(0, result.getExitCode());
return result;
}

private ContainerExecResult createPartitionedTopic(final String partitionedTopicName, int numPartitions)
throws Exception {
ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
"topics",
"create-partitioned-topic",
"--partitions", "" + numPartitions,
partitionedTopicName);
assertEquals(0, result.getExitCode());
return result;
}


}

0 comments on commit 9529850

Please sign in to comment.