Skip to content

Commit

Permalink
[improve] [broker] PIP-299-part-5: Add namespace-level policy: dispat…
Browse files Browse the repository at this point in the history
…cherPauseOnAckStatePersistent (apache#21926)
  • Loading branch information
Technoboy- authored Jan 21, 2024
1 parent 40eebc0 commit 17bb322
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2678,4 +2678,27 @@ protected Policies getDefaultPolicesIfNull(Policies policies) {
}
return policies;
}

protected CompletableFuture<Void> internalSetDispatcherPauseOnAckStatePersistentAsync(
boolean dispatcherPauseOnAckStatePersistentEnabled) {
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.dispatcherPauseOnAckStatePersistentEnabled = dispatcherPauseOnAckStatePersistentEnabled;
return policies;
}));
}

protected CompletableFuture<Object> internalGetDispatcherPauseOnAckStatePersistentAsync() {
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, PolicyOperation.READ)
.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName))
.thenApply(policiesOpt -> {
if (!policiesOpt.isPresent()) {
throw new RestException(Response.Status.NOT_FOUND, "Namespace policies does not exist");
}
return policiesOpt.map(p -> p.dispatcherPauseOnAckStatePersistentEnabled).orElse(false);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2777,5 +2777,66 @@ public void enableMigration(@PathParam("tenant") String tenant,
internalEnableMigration(migrated);
}

@POST
@Path("/{tenant}/{namespace}/dispatcherPauseOnAckStatePersistent")
@ApiOperation(value = "Set dispatcher pause on ack state persistent configuration for specified namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setDispatcherPauseOnAckStatePersistent(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetDispatcherPauseOnAckStatePersistentAsync(true)
.thenRun(() -> {
log.info("[{}] Successfully enabled dispatcherPauseOnAckStatePersistent: namespace={}",
clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/dispatcherPauseOnAckStatePersistent")
@ApiOperation(value = "Remove dispatcher pause on ack state persistent configuration for specified namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeDispatcherPauseOnAckStatePersistent(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetDispatcherPauseOnAckStatePersistentAsync(false)
.thenRun(() -> {
log.info("[{}] Successfully remove dispatcherPauseOnAckStatePersistent: namespace={}",
clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/dispatcherPauseOnAckStatePersistent")
@ApiOperation(value = "Get dispatcher pause on ack state persistent config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public void getDispatcherPauseOnAckStatePersistent(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalGetDispatcherPauseOnAckStatePersistentAsync()
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced);
topicPolicies.getEntryFilters().updateNamespaceValue(namespacePolicies.entryFilters);

topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().updateNamespaceValue(
namespacePolicies.dispatcherPauseOnAckStatePersistentEnabled);

updateEntryFilters();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2179,4 +2179,20 @@ private void createTestNamespaces(List<NamespaceName> nsnames, Policies policies
asyncRequests(ctx -> namespaces.createNamespace(ctx, nsName.getTenant(), nsName.getCluster(), nsName.getLocalName(), policies));
}
}

@Test
public void testDispatcherPauseOnAckStatePersistent() throws Exception {
String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace");

admin.namespaces().createNamespace(namespace, Set.of(testLocalCluster));

assertFalse(admin.namespaces().getDispatcherPauseOnAckStatePersistent(namespace));
// should pass
admin.namespaces().setDispatcherPauseOnAckStatePersistent(namespace);
assertTrue(admin.namespaces().getDispatcherPauseOnAckStatePersistent(namespace));
admin.namespaces().removeDispatcherPauseOnAckStatePersistent(namespace);
assertFalse(admin.namespaces().getDispatcherPauseOnAckStatePersistent(namespace));

admin.namespaces().deleteNamespace(namespace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,14 @@ public boolean hasAckedMessage(String v) {
public Object[][] typesOfSetDispatcherPauseOnAckStatePersistent() {
return new Object[][]{
{TypeOfUpdateTopicConfig.BROKER_CONF},
//{TypeOfUpdateTopicConfig.NAMESPACE_LEVEL_POLICY},
{TypeOfUpdateTopicConfig.NAMESPACE_LEVEL_POLICY},
{TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY}
};
}

public enum TypeOfUpdateTopicConfig {
BROKER_CONF,
NAMESPACE_LEVEL_POLICY,
TOPIC_LEVEL_POLICY;
}

Expand All @@ -235,6 +236,9 @@ private void enableDispatcherPauseOnAckStatePersistentAndCreateTopic(String tpNa
} else if (type == TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY) {
admin.topics().createNonPartitionedTopic(tpName);
admin.topicPolicies().setDispatcherPauseOnAckStatePersistent(tpName).join();
} else if (type == TypeOfUpdateTopicConfig.NAMESPACE_LEVEL_POLICY) {
admin.topics().createNonPartitionedTopic(tpName);
admin.namespaces().setDispatcherPauseOnAckStatePersistent(TopicName.get(tpName).getNamespace());
}
Awaitility.await().untilAsserted(() -> {
PersistentTopic persistentTopic =
Expand All @@ -256,6 +260,8 @@ private void disableDispatcherPauseOnAckStatePersistent(String tpName, TypeOfUpd
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled", "false");
} else if (type == TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY) {
admin.topicPolicies().removeDispatcherPauseOnAckStatePersistent(tpName).join();
} else if (type == TypeOfUpdateTopicConfig.NAMESPACE_LEVEL_POLICY) {
admin.namespaces().removeDispatcherPauseOnAckStatePersistent(TopicName.get(tpName).getNamespace());
}
Awaitility.await().untilAsserted(() -> {
PersistentTopic persistentTopic =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4648,4 +4648,36 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem
*/
void updateMigrationState(String namespace, boolean migrated) throws PulsarAdminException;

/**
* Set DispatcherPauseOnAckStatePersistent for a namespace asynchronously.
*/
CompletableFuture<Void> setDispatcherPauseOnAckStatePersistentAsync(String namespace);

/**
* Remove entry filters of a namespace.
* @param namespace Namespace name
* @throws PulsarAdminException
*/
void setDispatcherPauseOnAckStatePersistent(String namespace) throws PulsarAdminException;

/**
* Removes the dispatcherPauseOnAckStatePersistentEnabled policy for a given namespace asynchronously.
*/
CompletableFuture<Void> removeDispatcherPauseOnAckStatePersistentAsync(String namespace);

/**
* Removes the dispatcherPauseOnAckStatePersistentEnabled policy for a given namespace.
*/
void removeDispatcherPauseOnAckStatePersistent(String namespace) throws PulsarAdminException;

/**
* Get the dispatcherPauseOnAckStatePersistentEnabled policy for a given namespace asynchronously.
*/
CompletableFuture<Boolean> getDispatcherPauseOnAckStatePersistentAsync(String namespace);

/**
* Get the dispatcherPauseOnAckStatePersistentEnabled policy for a given namespace.
*/
boolean getDispatcherPauseOnAckStatePersistent(String namespace) throws PulsarAdminException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public class Policies {

public boolean migrated;

public Boolean dispatcherPauseOnAckStatePersistentEnabled;

public enum BundleType {
LARGEST, HOT;
}
Expand Down Expand Up @@ -158,7 +160,8 @@ public int hashCode() {
offload_policies,
subscription_types_enabled,
properties,
resource_group_name, entryFilters, migrated);
resource_group_name, entryFilters, migrated,
dispatcherPauseOnAckStatePersistentEnabled);
}

@Override
Expand Down Expand Up @@ -206,7 +209,9 @@ public boolean equals(Object obj) {
&& Objects.equals(properties, other.properties)
&& Objects.equals(migrated, other.migrated)
&& Objects.equals(resource_group_name, other.resource_group_name)
&& Objects.equals(entryFilters, other.entryFilters);
&& Objects.equals(entryFilters, other.entryFilters)
&& Objects.equals(dispatcherPauseOnAckStatePersistentEnabled,
other.dispatcherPauseOnAckStatePersistentEnabled);
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1880,7 +1880,6 @@ public void removeNamespaceResourceGroup(String namespace) throws PulsarAdminExc
@Override
public CompletableFuture<Void> clearPropertiesAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
final CompletableFuture<String> future = new CompletableFuture<>();
WebTarget path = namespacePath(ns, "properties");
return asyncDeleteRequest(path);
}
Expand Down Expand Up @@ -1958,4 +1957,40 @@ public CompletableFuture<Void> removeNamespaceEntryFiltersAsync(String namespace
WebTarget path = namespacePath(ns, "entryFilters");
return asyncDeleteRequest(path);
}

@Override
public CompletableFuture<Void> setDispatcherPauseOnAckStatePersistentAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "dispatcherPauseOnAckStatePersistent");
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}

@Override
public void setDispatcherPauseOnAckStatePersistent(String namespace) throws PulsarAdminException {
sync(() -> setDispatcherPauseOnAckStatePersistentAsync(namespace));
}

@Override
public CompletableFuture<Void> removeDispatcherPauseOnAckStatePersistentAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "dispatcherPauseOnAckStatePersistent");
return asyncDeleteRequest(path);
}

@Override
public void removeDispatcherPauseOnAckStatePersistent(String namespace) throws PulsarAdminException {
sync(() -> removeDispatcherPauseOnAckStatePersistentAsync(namespace));
}

@Override
public CompletableFuture<Boolean> getDispatcherPauseOnAckStatePersistentAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "dispatcherPauseOnAckStatePersistent");
return asyncGetRequest(path, new FutureCallback<Boolean>(){});
}

@Override
public boolean getDispatcherPauseOnAckStatePersistent(String namespace) throws PulsarAdminException {
return sync(() -> getDispatcherPauseOnAckStatePersistentAsync(namespace));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,15 @@ public void namespaces() throws Exception {
namespaces.run(split("remove-deduplication-snapshot-interval myprop/clust/ns1"));
verify(mockNamespaces).removeDeduplicationSnapshotInterval("myprop/clust/ns1");

namespaces.run(split("set-dispatcher-pause-on-ack-state-persistent myprop/clust/ns1"));
verify(mockNamespaces).setDispatcherPauseOnAckStatePersistent("myprop/clust/ns1");

namespaces.run(split("get-dispatcher-pause-on-ack-state-persistent myprop/clust/ns1"));
verify(mockNamespaces).getDispatcherPauseOnAckStatePersistent("myprop/clust/ns1");

namespaces.run(split("remove-dispatcher-pause-on-ack-state-persistent myprop/clust/ns1"));
verify(mockNamespaces).removeDispatcherPauseOnAckStatePersistent("myprop/clust/ns1");

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2592,6 +2592,42 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Enable dispatcherPauseOnAckStatePersistent for a namespace")
private class SetDispatcherPauseOnAckStatePersistent extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().setDispatcherPauseOnAckStatePersistent(namespace);
}
}

@Parameters(commandDescription = "Get the dispatcherPauseOnAckStatePersistent for a namespace")
private class GetDispatcherPauseOnAckStatePersistent extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
print(getAdmin().namespaces().getDispatcherPauseOnAckStatePersistent(namespace));
}
}

@Parameters(commandDescription = "Remove dispatcherPauseOnAckStatePersistent for a namespace")
private class RemoveDispatcherPauseOnAckStatePersistent extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().removeDispatcherPauseOnAckStatePersistent(namespace);
}
}

public CmdNamespaces(Supplier<PulsarAdmin> admin) {
super("namespaces", admin);
jcommander.addCommand("list", new GetNamespacesPerProperty());
Expand Down Expand Up @@ -2778,5 +2814,12 @@ public CmdNamespaces(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("remove-entry-filters", new RemoveEntryFiltersPerTopic());

jcommander.addCommand("update-migration-state", new UpdateMigrationState());

jcommander.addCommand("set-dispatcher-pause-on-ack-state-persistent",
new SetDispatcherPauseOnAckStatePersistent());
jcommander.addCommand("get-dispatcher-pause-on-ack-state-persistent",
new GetDispatcherPauseOnAckStatePersistent());
jcommander.addCommand("remove-dispatcher-pause-on-ack-state-persistent",
new RemoveDispatcherPauseOnAckStatePersistent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ public enum PolicyName {
MAX_TOPICS,
RESOURCEGROUP,
ENTRY_FILTERS,
SHADOW_TOPIC
SHADOW_TOPIC,
DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class TopicPolicies {
private Integer maxUnackedMessagesOnSubscription;
private Long delayedDeliveryTickTimeMillis;
private Boolean delayedDeliveryEnabled;
private Boolean dispatcherPauseOnAckStatePersistentEnabled;;
private Boolean dispatcherPauseOnAckStatePersistentEnabled;
private OffloadPoliciesImpl offloadPolicies;
private InactiveTopicPolicies inactiveTopicPolicies;
private DispatchRateImpl dispatchRate;
Expand Down

0 comments on commit 17bb322

Please sign in to comment.