Skip to content

Commit

Permalink
[improve][broker]add NamespacePolicies check before unload bundle whe…
Browse files Browse the repository at this point in the history
…n doLoadShedding (#16476)

* add NamespacePolicies check before unload bundle when doLoadShedding

* fix typos

Co-authored-by: nicklixinyang <nicklixinyang@didiglobal.com>
  • Loading branch information
Nicklee007 and nicklixinyang authored Jul 21, 2022
1 parent 52da03f commit 0b315eb
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,10 @@ public synchronized void doLoadShedding() {
bundles.forEach(bundle -> {
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
if (!shouldNamespacePoliciesUnload(namespaceName, bundleRange, broker)) {
return;
}

if (!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
return;
}
Expand Down Expand Up @@ -696,6 +700,20 @@ private void updateBundleUnloadingMetrics(Multimap<String, String> bundlesToUnlo
this.bundleUnloadMetrics.set(metrics);
}

public boolean shouldNamespacePoliciesUnload(String namespace, String bundle, String currentBroker) {
synchronized (brokerCandidateCache) {
brokerCandidateCache.clear();
ServiceUnitId serviceUnit = pulsar.getNamespaceService().getNamespaceBundleFactory()
.getBundle(namespace, bundle);
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(), brokerTopicLoadingPredicate);

// if only current broker satisfy the NamespacePolicies should not unload.
brokerCandidateCache.remove(currentBroker);
return !brokerCandidateCache.isEmpty();
}
}

public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker) {
try {
Optional<LocalPolicies> nsPolicies = pulsar.getPulsarResources().getLocalPolicies()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -58,6 +59,8 @@
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
Expand Down Expand Up @@ -578,6 +581,54 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {

}

@Test
public void testLoadSheddingWithNamespaceIsolationPolicies() throws Exception {

final String cluster = "use";
final String tenant = "my-tenant";
final String namespace = "my-tenant/use/my-ns";
final String bundle = "0x00000000_0xffffffff";
final String brokerAddress = pulsar1.getAdvertisedAddress();
final String broker1Address = pulsar1.getAdvertisedAddress() + 1;

admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(namespace);

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getSafeWebServiceAddress()).build();
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + namespace + "/my-topic1")
.create();
ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) ((ModularLoadManagerWrapper) pulsar1
.getLoadManager().get()).getLoadManager();
pulsar1.getBrokerService().updateRates();
loadManager.updateAll();

// test1: no isolation policy
assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, primaryHost));

// test2: as isolation policy, there are not another broker to load the bundle.
String newPolicyJsonTemplate = "{\"namespaces\":[\"%s.*\"],\"primary\":[\"%s\"],"
+ "\"secondary\":[\"%s\"],\"auto_failover_policy\":{\"policy_type\":\"min_available\",\"parameters\":{\"min_limit\":%s,\"usage_threshold\":80}}}";
String newPolicyJson = String.format(newPolicyJsonTemplate, namespace, broker1Address,broker1Address, 1);
String newPolicyName = "my-ns-isolation-policies";
ObjectMapper jsonMapper = ObjectMapperFactory.create();
NamespaceIsolationDataImpl nsPolicyData = jsonMapper.readValue(newPolicyJson.getBytes(),
NamespaceIsolationDataImpl.class);
admin1.clusters().createNamespaceIsolationPolicy(cluster, newPolicyName, nsPolicyData);
assertFalse(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, broker1Address));

// test3: as isolation policy, there are another can load the bundle.
String newPolicyJson1 = String.format(newPolicyJsonTemplate, namespace, brokerAddress,brokerAddress, 1);
NamespaceIsolationDataImpl nsPolicyData1 = jsonMapper.readValue(newPolicyJson1.getBytes(),
NamespaceIsolationDataImpl.class);
admin1.clusters().updateNamespaceIsolationPolicy(cluster, newPolicyName, nsPolicyData1);
assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, primaryHost));

producer.close();
}

/**
* It verifies that pulsar-service fails if load-manager tries to create ephemeral znode for broker which is already
* created by other zk-session-id.
Expand Down

0 comments on commit 0b315eb

Please sign in to comment.