From 7fa72021754212766465198559f462b4eba68d95 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Fri, 30 Mar 2018 11:35:35 -0700 Subject: [PATCH] Avoid creating failure-domain for cluster which doesn't exist (#1474) * Avoid creating failure-domain for cluster which doesn't exist * Fix admin api test --- .../broker/cache/ConfigurationCacheService.java | 4 +++- .../apache/pulsar/broker/SLAMonitoringTest.java | 2 +- .../apache/pulsar/broker/admin/AdminApiTest.java | 14 ++++++-------- .../apache/pulsar/broker/admin/AdminApiTest2.java | 2 +- .../org/apache/pulsar/broker/admin/AdminTest.java | 8 ++++---- .../apache/pulsar/broker/admin/NamespacesTest.java | 2 +- .../pulsar/broker/auth/AuthorizationTest.java | 2 +- .../loadbalance/ModularLoadManagerImplTest.java | 2 +- .../broker/service/BacklogQuotaManagerTest.java | 2 +- .../broker/service/BrokerBkEnsemblesTests.java | 2 +- .../pulsar/broker/service/ReplicatorTestBase.java | 6 +++--- .../api/AuthenticatedProducerConsumerTest.java | 6 +++--- .../pulsar/client/api/NonPersistentTopicTest.java | 6 +++--- .../pulsar/client/api/TlsProducerConsumerBase.java | 2 +- .../websocket/proxy/ProxyAuthorizationTest.java | 2 +- .../ProxyAuthenticatedProducerConsumerTest.java | 2 +- 16 files changed, 32 insertions(+), 32 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java index 4ee28650afe22..e063e325bff07 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.cache; +import java.nio.file.Paths; import java.util.Map; import org.apache.bookkeeper.util.ZkUtils; @@ -124,7 +125,8 @@ public FailureDomain deserialize(String path, byte[] content) throws Exception { private void createFailureDomainRoot(ZooKeeper zk, String path) { try { - if (zk.exists(path, false) == null) { + final String clusterZnodePath = Paths.get(path).getParent().toString(); + if (zk.exists(clusterZnodePath, false) != null && zk.exists(path, false) == null) { try { byte[] data = "".getBytes(); ZkUtils.createFullPathOptimistic(zk, path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index e28b2261ec4e8..1e2452ff5566b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -117,7 +117,7 @@ private void createProperty(PulsarAdmin pulsarAdmin) throws PulsarClientException, MalformedURLException, PulsarAdminException { ClusterData clusterData = new ClusterData(); clusterData.setServiceUrl(pulsarAdmin.getServiceUrl().toString()); - pulsarAdmins[0].clusters().updateCluster("my-cluster", clusterData); + pulsarAdmins[0].clusters().createCluster("my-cluster", clusterData); Set allowedClusters = new HashSet<>(); allowedClusters.add("my-cluster"); PropertyAdmin adminConfig = new PropertyAdmin(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index a49faa288286f..ffc5f616d0919 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -193,21 +193,21 @@ public void clusters() throws Exception { new ClusterData("http://broker.messaging.use.example.com" + ":" + BROKER_WEBSERVICE_PORT)); // "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates // failure-domain znode of this default cluster - assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw")); + assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw")); assertEquals(admin.clusters().getCluster("use"), new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT)); admin.clusters().updateCluster("usw", new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT)); - assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw")); + assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw")); assertEquals(admin.clusters().getCluster("usw"), new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT)); admin.clusters().updateCluster("usw", new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT, "https://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT_TLS)); - assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw")); + assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw")); assertEquals(admin.clusters().getCluster("usw"), new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT, "https://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT_TLS)); @@ -215,11 +215,11 @@ public void clusters() throws Exception { admin.clusters().deleteCluster("usw"); Thread.sleep(300); - assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use")); + assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use")); admin.namespaces().deleteNamespace("prop-xyz/use/ns1"); admin.clusters().deleteCluster("use"); - assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test")); + assertEquals(admin.clusters().getClusters(), Lists.newArrayList()); // Check name validation try { @@ -409,9 +409,7 @@ public void brokers() throws Exception { admin.namespaces().deleteNamespace("prop-xyz/use/ns1"); admin.clusters().deleteCluster("use"); - // "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates - // failure-domain znode of this default cluster - assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test")); + assertEquals(admin.clusters().getClusters(), Lists.newArrayList()); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index dbc36f3934669..44b7bf60021a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -636,7 +636,7 @@ public void testReplicationPeerCluster() throws Exception { public void clusterFailureDomain() throws PulsarAdminException { final String cluster = pulsar.getConfiguration().getClusterName(); - admin.clusters().updateCluster(cluster, + admin.clusters().createCluster(cluster, new ClusterData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls())); // create FailureDomain domain = new FailureDomain(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 8e66acaf9455e..953fa683f4e1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -204,10 +204,10 @@ void internalConfiguration() throws Exception { @Test void clusters() throws Exception { - assertEquals(clusters.getClusters(), Lists.newArrayList(configClusterName)); + assertEquals(clusters.getClusters(), Lists.newArrayList()); verify(clusters, never()).validateSuperUserAccess(); - clusters.updateCluster("use", new ClusterData("http://broker.messaging.use.example.com")); + clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com")); verify(clusters, times(1)).validateSuperUserAccess(); // ensure to read from ZooKeeper directly clusters.clustersListCache().clear(); @@ -465,7 +465,7 @@ void properties() throws Exception { assertEquals(properties.getProperties(), Lists.newArrayList()); // Create a namespace to test deleting a non-empty property - clusters.updateCluster("use", new ClusterData()); + clusters.createCluster("use", new ClusterData()); newPropertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "other-role"), Sets.newHashSet("use")); properties.createProperty("my-property", newPropertyAdmin); @@ -492,7 +492,7 @@ void properties() throws Exception { @Test void brokers() throws Exception { - clusters.updateCluster("use", new ClusterData("http://broker.messaging.use.example.com", + clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com", "https://broker.messaging.use.example.com:4443")); URI requestUri = new URI( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 16ca86c707f81..0fc54b10fe1ba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -137,7 +137,7 @@ public void setup() throws Exception { doNothing().when(namespaces).validateAdminAccessOnProperty("other-property"); doNothing().when(namespaces).validateAdminAccessOnProperty("new-property"); - admin.clusters().updateCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT)); + admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT)); admin.clusters().createCluster("usw", new ClusterData("http://broker-usw.com:" + BROKER_WEBSERVICE_PORT)); admin.clusters().createCluster("usc", new ClusterData("http://broker-usc.com:" + BROKER_WEBSERVICE_PORT)); admin.properties().createProperty(this.testProperty, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index 4f0baf60986c7..c4375e2562cfe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -65,7 +65,7 @@ void simple() throws Exception { assertEquals(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), false); - admin.clusters().updateCluster("c1", new ClusterData()); + admin.clusters().createCluster("c1", new ClusterData()); admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1"))); waitForChange(); admin.namespaces().createNamespace("p1/c1/ns1"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java index 0502d7837c4cd..37e3899833e2a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java @@ -498,7 +498,7 @@ public void testNamespaceIsolationPoliciesForPrimaryAndSecondaryBrokers() throws final String broker1Address = pulsar1.getAdvertisedAddress() + "0"; final String broker2Address = pulsar2.getAdvertisedAddress() + "1"; final String sharedBroker = "broker3"; - admin1.clusters().updateCluster(cluster, new ClusterData("http://" + pulsar1.getAdvertisedAddress())); + admin1.clusters().createCluster(cluster, new ClusterData("http://" + pulsar1.getAdvertisedAddress())); admin1.properties().createProperty(property, new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet(cluster))); admin1.namespaces().createNamespace(property + "/" + cluster + "/" + namespace); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 80a870c9fcb38..cbf94f009a36d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -94,7 +94,7 @@ void setup() throws Exception { adminUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT); admin = new PulsarAdmin(adminUrl, (Authentication) null); - admin.clusters().updateCluster("usc", new ClusterData(adminUrl.toString())); + admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString())); admin.properties().createProperty("prop", new PropertyAdmin(Lists.newArrayList("appid1"), Sets.newHashSet("usc"))); admin.namespaces().createNamespace("prop/usc/ns-quota"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 7f640824b317f..32b31be2c543e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -99,7 +99,7 @@ void setup() throws Exception { adminUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT); admin = new PulsarAdmin(adminUrl, (Authentication) null); - admin.clusters().updateCluster("usc", new ClusterData(adminUrl.toString())); + admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString())); admin.properties().createProperty("prop", new PropertyAdmin(Lists.newArrayList("appid1"), Sets.newHashSet("usc"))); } catch (Throwable t) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index abde142559323..0d30f7ff9f77f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -207,11 +207,11 @@ void setup() throws Exception { admin3 = new PulsarAdmin(url3, (Authentication) null); // Provision the global namespace - admin1.clusters().updateCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(), + admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(), pulsar1.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls())); - admin1.clusters().updateCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(), + admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(), pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls())); - admin1.clusters().updateCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(), + admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(), pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls())); admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java index eafe9c8fcda82..a1fe85efdc0e6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -220,7 +220,7 @@ public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws authTls.configure(authParams); internalSetup(authTls); - admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), + admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList("anonymousUser"), Sets.newHashSet("use"))); @@ -276,7 +276,7 @@ public void testAuthenticationFilterNegative() throws Exception { // this will cause NPE and it should throw 500 doReturn(null).when(pulsar).getGlobalZkCache(); try { - admin.clusters().updateCluster(cluster, clusterData); + admin.clusters().createCluster(cluster, clusterData); } catch (PulsarAdminException e) { Assert.assertTrue(e.getCause() instanceof InternalServerErrorException); } @@ -301,7 +301,7 @@ public void testInternalServerExceptionOnLookup() throws Exception { authTls.configure(authParams); internalSetup(authTls); - admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), + admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 41dfb7cbffee8..5c8932ab15fdc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -941,11 +941,11 @@ void setupReplicationCluster() throws Exception { admin3 = new PulsarAdmin(url3, (Authentication) null); // Provision the global namespace - admin1.clusters().updateCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(), + admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls())); - admin1.clusters().updateCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(), + admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls())); - admin1.clusters().updateCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(), + admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls())); admin1.clusters().createCluster("global", new ClusterData("http://global:8080")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java index f1e1308918077..c06984ba86a5c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java @@ -99,7 +99,7 @@ protected void internalSetUpForNamespace() throws Exception { authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); clientConf.setAuthentication(AuthenticationTls.class.getName(), authParams); admin = spy(new PulsarAdmin(brokerUrlTls, clientConf)); - admin.clusters().updateCluster(clusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), + admin.clusters().createCluster(clusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index 46ef7115c5ec4..025ccb21c7fcd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -81,7 +81,7 @@ public void test() throws Exception { assertEquals(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), false); - admin.clusters().updateCluster(configClusterName, new ClusterData()); + admin.clusters().createCluster(configClusterName, new ClusterData()); admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1"))); waitForChange(); admin.namespaces().createNamespace("p1/c1/ns1"); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java index 4b3a914e1ab70..a6598e5ed9c11 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java @@ -160,7 +160,7 @@ public void testTlsSyncProducerAndConsumer() throws Exception { // create a client which connects to proxy over tls and pass authData PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl); - admin.clusters().updateCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), + admin.clusters().createCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));