Skip to content

Commit

Permalink
[improve][admin] PIP-369 Change default value of unload-scope in `n…
Browse files Browse the repository at this point in the history
…s-isolation-policy set` (#23253)

Co-authored-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
grssam and nodece authored Sep 18, 2024
1 parent 590e133 commit 4f00259
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -778,15 +778,16 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String clus
} catch (PulsarServerException e) {
return FutureUtil.failedFuture(e);
}
// compile regex patterns once
List<Pattern> namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList();
// TODO for 4.x, we should include both old and new namespace regex pattern for unload `all_matching` option
Set<String> combinedNamespaces = new HashSet<>(policyData.getNamespaces());
final List<String> oldNamespaces = new ArrayList<>();
if (oldPolicy != null) {
oldNamespaces.addAll(oldPolicy.getNamespaces());
combinedNamespaces.addAll(oldNamespaces);
}
return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> {
List<CompletableFuture<List<String>>> filteredNamespacesForEachTenant = tenants.stream()
.map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
List<CompletableFuture<String>> namespaceNamesInCluster = namespaces.stream()
.filter(namespaceName -> namespacePatterns.stream()
.anyMatch(pattern -> pattern.matcher(namespaceName).matches()))
.map(namespaceName -> adminClient.namespaces().getPoliciesAsync(namespaceName)
.thenApply(policies -> policies.replication_clusters.contains(cluster)
? namespaceName : null))
Expand All @@ -802,46 +803,44 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String clus
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList()));
}).thenCompose(shouldUnloadNamespaces -> {
if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
}).thenCompose(clusterLocalNamespaces -> {
if (CollectionUtils.isEmpty(clusterLocalNamespaces)) {
return CompletableFuture.completedFuture(null);
}
// If unload type is 'changed', we need to figure out a further subset of namespaces whose placement might
// actually have been changed.

log.debug("Old policy: {} ; new policy: {}", oldPolicy, policyData);
if (oldPolicy != null && NamespaceIsolationPolicyUnloadScope.changed.equals(policyData.getUnloadScope())) {
// We also compare that the previous primary broker list is same as current, in case all namespaces need
// to be placed again anyway.
if (CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), policyData.getPrimary())) {
// list is same, so we continue finding the changed namespaces.

// We create a union regex list contains old + new regexes
Set<String> combinedNamespaces = new HashSet<>(oldPolicy.getNamespaces());
combinedNamespaces.addAll(policyData.getNamespaces());
// We create a intersection of the old and new regexes. These won't need to be unloaded
Set<String> commonNamespaces = new HashSet<>(oldPolicy.getNamespaces());
commonNamespaces.retainAll(policyData.getNamespaces());
boolean unloadAllNamespaces = false;
// We also compare that the previous primary broker list is same as current, in case all namespaces need
// to be placed again anyway.
if (NamespaceIsolationPolicyUnloadScope.all_matching.equals(policyData.getUnloadScope())
|| (oldPolicy != null
&& !CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), policyData.getPrimary()))) {
unloadAllNamespaces = true;
}
// list is same, so we continue finding the changed namespaces.

log.debug("combined regexes: {}; common regexes:{}", combinedNamespaces, combinedNamespaces);
// We create a intersection of the old and new regexes. These won't need to be unloaded.
Set<String> commonNamespaces = new HashSet<>(policyData.getNamespaces());
commonNamespaces.retainAll(oldNamespaces);

// Find the changed regexes (new - new ∩ old). TODO for 4.x, make this (new U old - new ∩ old)
combinedNamespaces.removeAll(commonNamespaces);
log.debug("combined regexes: {}; common regexes:{}", combinedNamespaces, commonNamespaces);

log.debug("changed regexes: {}", commonNamespaces);
if (!unloadAllNamespaces) {
// Find the changed regexes ((new U old) - (new ∩ old)).
combinedNamespaces.removeAll(commonNamespaces);
log.debug("changed regexes: {}", commonNamespaces);
}

// Now we further filter the filtered namespaces based on this combinedNamespaces set
shouldUnloadNamespaces = shouldUnloadNamespaces.stream()
.filter(name -> combinedNamespaces.stream()
.map(Pattern::compile)
.anyMatch(pattern -> pattern.matcher(name).matches())
).toList();
// Now we further filter the filtered namespaces based on this combinedNamespaces set
List<Pattern> namespacePatterns = combinedNamespaces.stream().map(Pattern::compile).toList();
clusterLocalNamespaces = clusterLocalNamespaces.stream()
.filter(name -> namespacePatterns.stream().anyMatch(pattern -> pattern.matcher(name).matches()))
.toList();

}
}
// unload type is either null or not in (changed, none), so we proceed to unload all namespaces
// TODO - default in 4.x should become `changed`
List<CompletableFuture<Void>> futures = shouldUnloadNamespaces.stream()
List<CompletableFuture<Void>> futures = clusterLocalNamespaces.stream()
.map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName))
.collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenAccept(__ -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3488,17 +3488,19 @@ private NamespaceIsolationData createPolicyData(NamespaceIsolationPolicyUnloadSc
parameters1.put("usage_threshold", "100");
List<String> nsRegexList = new ArrayList<>(namespaces);

return NamespaceIsolationData.builder()
NamespaceIsolationData.Builder build = NamespaceIsolationData.builder()
// "prop-ig/ns1" is present in test cluster, policy set on test2 should work
.namespaces(nsRegexList)
.primary(primaryBrokers)
.secondary(Collections.singletonList(""))
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
.build())
.unloadScope(scope)
.build();
.build());
if (scope != null) {
build.unloadScope(scope);
}
return build.build();
}

private boolean allTopicsUnloaded(List<String> topics) {
Expand Down Expand Up @@ -3624,18 +3626,42 @@ public void testIsolationPolicyUnloadsNSWithAllScope(final String topicType) thr
testIsolationPolicyUnloadsNSWithScope(
topicType, "policy-all", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-c.*"), List.of("b1", "b2"),
Collections.singletonList(".*")
);
}

@Test(dataProvider = "topicType")
public void testIsolationPolicyUnloadsNSWithChangedScope1(final String topicType) throws Exception {
String nsPrefix1 = newUniqueName(defaultTenant + "/") + "-unload-test-";
// Addition case
testIsolationPolicyUnloadsNSWithScope(
topicType, "policy-changed1", nsPrefix1, List.of("a1", "a2", "b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"),
Collections.singletonList(".*")
);
}

@Test(dataProvider = "topicType")
public void testIsolationPolicyUnloadsNSWithChangedScope2(final String topicType) throws Exception {
String nsPrefix2 = newUniqueName(defaultTenant + "/") + "-unload-test-";
// removal case
testIsolationPolicyUnloadsNSWithScope(
topicType, "policy-changed2", nsPrefix2, List.of("a1", "a2", "b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("b1", "b2"),
changed, List.of(".*-unload-test-c.*"), List.of("b1", "b2", "c1"),
Collections.singletonList(".*")
);
}

@Test(dataProvider = "topicType")
public void testIsolationPolicyUnloadsNSWithChangedScope(final String topicType) throws Exception {
public void testIsolationPolicyUnloadsNSWithScopeMissing(final String topicType) throws Exception {
String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
testIsolationPolicyUnloadsNSWithScope(
topicType, "policy-changed", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"),
null, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"),
Collections.singletonList(".*")
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ private class SetPolicy extends CliCommand {
private Map<String, String> autoFailoverPolicyParams;

@Option(names = "--unload-scope", description = "configure the type of unload to do -"
+ " ['all_matching', 'none', 'changed'] namespaces. By default, all namespaces matching the namespaces"
+ " regex will be unloaded and placed again. You can choose to not unload any namespace while setting"
+ " this new policy by choosing `none` or choose to unload only the namespaces whose placement will"
+ " actually change. If you chose 'none', you will need to manually unload the namespaces for them to"
+ " be placed correctly, or wait till some namespaces get load balanced automatically based on load"
+ " shedding configurations.")
+ " ['all_matching', 'none', 'changed'] namespaces. By default, only namespaces whose placement will"
+ " actually change would be unloaded and placed again. You can choose to not unload any namespace"
+ " while setting this new policy by choosing `none` or choose to unload all namespaces matching"
+ " old (if any) and new namespace regex. If you chose 'none', you will need to manually unload the"
+ " namespaces for them to be placed correctly, or wait till some namespaces get load balanced"
+ " automatically based on load shedding configurations.")
private NamespaceIsolationPolicyUnloadScope unloadScope;

void run() throws PulsarAdminException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public class NamespaceIsolationDataImpl implements NamespaceIsolationData {
@ApiModelProperty(
name = "unload_scope",
value = "The type of unload to perform while applying the new isolation policy.",
example = "'all_matching' (default) for unloading all matching namespaces. 'none' for not unloading "
+ "any namespace. 'changed' for unloading only the namespaces whose placement is actually changing"
example = "'changed' (default) for unloading only the namespaces whose placement is actually changing. "
+ "'all_matching' for unloading all matching namespaces. 'none' for not unloading any namespaces."
)
@JsonProperty("unload_scope")
private NamespaceIsolationPolicyUnloadScope unloadScope;
Expand Down

0 comments on commit 4f00259

Please sign in to comment.