3535import com .google .common .collect .Sets ;
3636import java .lang .reflect .Field ;
3737import java .util .ArrayList ;
38+ import java .util .Collections ;
39+ import java .util .HashSet ;
3840import java .util .List ;
41+ import java .util .Set ;
3942import java .util .UUID ;
4043import java .util .concurrent .TimeUnit ;
4144import lombok .Data ;
4245import java .util .concurrent .CompletableFuture ;
4346import java .util .concurrent .ExecutionException ;
4447import java .util .concurrent .atomic .AtomicBoolean ;
48+ import java .util .function .Supplier ;
4549import org .apache .bookkeeper .client .LedgerHandle ;
50+ import org .apache .bookkeeper .mledger .ManagedCursor ;
4651import org .apache .bookkeeper .mledger .ManagedLedger ;
4752import org .apache .pulsar .broker .service .BrokerService ;
4853import org .apache .pulsar .broker .service .BrokerTestBase ;
4954import org .apache .pulsar .client .api .Consumer ;
5055import org .apache .pulsar .client .api .Message ;
56+ import org .apache .pulsar .client .api .MessageId ;
5157import org .apache .pulsar .client .api .MessageRoutingMode ;
5258import org .apache .pulsar .client .api .Producer ;
5359import org .apache .pulsar .client .api .PulsarClientException ;
5460import org .apache .pulsar .client .api .Schema ;
5561import org .apache .pulsar .client .api .SubscriptionType ;
5662import org .apache .pulsar .common .naming .NamespaceBundle ;
5763import org .apache .pulsar .common .naming .TopicName ;
64+ import org .apache .pulsar .common .policies .data .ClusterData ;
5865import org .apache .pulsar .common .policies .data .Policies ;
66+ import org .apache .pulsar .common .policies .data .TenantInfo ;
5967import org .apache .pulsar .common .policies .data .TopicStats ;
6068import org .awaitility .Awaitility ;
6169import org .testng .Assert ;
6270import org .testng .annotations .AfterMethod ;
6371import org .testng .annotations .BeforeMethod ;
72+ import org .testng .annotations .DataProvider ;
6473import org .testng .annotations .Test ;
6574
6675@ Test (groups = "broker" )
@@ -69,6 +78,8 @@ public class PersistentTopicTest extends BrokerTestBase {
6978 @ BeforeMethod (alwaysRun = true )
7079 @ Override
7180 protected void setup () throws Exception {
81+ conf .setSystemTopicEnabled (true );
82+ conf .setTopicLevelPoliciesEnabled (true );
7283 super .baseSetup ();
7384 }
7485
@@ -392,4 +403,59 @@ public void testDeleteTopicFail() throws Exception {
392403 makeDeletedFailed .set (false );
393404 persistentTopic .delete ().get ();
394405 }
406+
407+ @ DataProvider (name = "topicLevelPolicy" )
408+ public static Object [][] topicLevelPolicy () {
409+ return new Object [][] { { true }, { false } };
410+ }
411+
412+ @ Test (dataProvider = "topicLevelPolicy" )
413+ public void testCreateTopicWithZombieReplicatorCursor (boolean topicLevelPolicy ) throws Exception {
414+ final String namespace = "prop/ns-abc" ;
415+ final String topicName = "persistent://" + namespace
416+ + "/testCreateTopicWithZombieReplicatorCursor" + topicLevelPolicy ;
417+ final String remoteCluster = "remote" ;
418+ admin .topics ().createNonPartitionedTopic (topicName );
419+ admin .topics ().createSubscription (topicName , conf .getReplicatorPrefix () + "." + remoteCluster ,
420+ MessageId .earliest , true );
421+
422+ admin .clusters ().createCluster (remoteCluster , ClusterData .builder ()
423+ .serviceUrl ("http://localhost:11112" )
424+ .brokerServiceUrl ("pulsar://localhost:11111" )
425+ .build ());
426+ TenantInfo tenantInfo = admin .tenants ().getTenantInfo ("prop" );
427+ tenantInfo .getAllowedClusters ().add (remoteCluster );
428+ admin .tenants ().updateTenant ("prop" , tenantInfo );
429+
430+ if (topicLevelPolicy ) {
431+ admin .topics ().setReplicationClusters (topicName , Collections .singletonList (remoteCluster ));
432+ } else {
433+ admin .namespaces ().setNamespaceReplicationClustersAsync (
434+ namespace , Collections .singleton (remoteCluster )).get ();
435+ }
436+
437+ final PersistentTopic topic = (PersistentTopic ) pulsar .getBrokerService ().getTopic (topicName , false )
438+ .get (3 , TimeUnit .SECONDS ).orElse (null );
439+ assertNotNull (topic );
440+
441+ final Supplier <Set <String >> getCursors = () -> {
442+ final Set <String > cursors = new HashSet <>();
443+ final Iterable <ManagedCursor > iterable = topic .getManagedLedger ().getCursors ();
444+ iterable .forEach (c -> cursors .add (c .getName ()));
445+ return cursors ;
446+ };
447+ assertEquals (getCursors .get (), Collections .singleton (conf .getReplicatorPrefix () + "." + remoteCluster ));
448+
449+ if (topicLevelPolicy ) {
450+ admin .topics ().setReplicationClusters (topicName , Collections .emptyList ());
451+ } else {
452+ admin .namespaces ().setNamespaceReplicationClustersAsync (namespace , Collections .emptySet ()).get ();
453+ }
454+ admin .clusters ().deleteCluster (remoteCluster );
455+ // Now the cluster and its related policy has been removed but the replicator cursor still exists
456+
457+ topic .initialize ().get (3 , TimeUnit .SECONDS );
458+ Awaitility .await ().atMost (3 , TimeUnit .SECONDS )
459+ .until (() -> !topic .getManagedLedger ().getCursors ().iterator ().hasNext ());
460+ }
395461}
0 commit comments