Skip to content

Commit

Permalink
[improve][broker] Make AutoSubscriptionCreation async (apache#16329)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaozhangmin authored Jul 13, 2022
1 parent 67cfb22 commit 7b0d489
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -922,44 +922,35 @@ protected CompletableFuture<Void> internalSetAutoTopicCreationAsync(
}));
}

protected void internalSetAutoSubscriptionCreation(
AsyncResponse asyncResponse, AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) {
validateNamespacePolicyOperation(namespaceName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();

protected CompletableFuture<Void> internalSetAutoSubscriptionCreationAsync(AutoSubscriptionCreationOverride
autoSubscriptionCreationOverride) {
// Force to read the data s.t. the watch to the cache content is setup.
namespaceResources().setPoliciesAsync(namespaceName, policies -> {
policies.autoSubscriptionCreationOverride = autoSubscriptionCreationOverride;
return policies;
}).thenApply(r -> {
if (autoSubscriptionCreationOverride != null) {
String autoOverride = autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation() ? "enabled"
: "disabled";
log.info("[{}] Successfully {} autoSubscriptionCreation on namespace {}", clientAppId(),
autoOverride != null ? autoOverride : "removed", namespaceName);
}
asyncResponse.resume(Response.noContent().build());
return null;
}).exceptionally(e -> {
log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(),
namespaceName, e.getCause());
if (e.getCause() instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
return null;
}
asyncResponse.resume(new RestException(e.getCause()));
return null;
});
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.AUTO_SUBSCRIPTION_CREATION,
PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(unused -> namespaceResources().setPoliciesAsync(namespaceName, policies -> {
policies.autoSubscriptionCreationOverride = autoSubscriptionCreationOverride;
return policies;
}))
.thenAccept(r -> {
if (autoSubscriptionCreationOverride != null) {
String autoOverride = autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation()
? "enabled" : "disabled";
log.info("[{}] Successfully {} autoSubscriptionCreation on namespace {}", clientAppId(),
autoOverride, namespaceName);
} else {
log.info("[{}] Successfully remove autoSubscriptionCreation on namespace {}",
clientAppId(), namespaceName);
}
});
}

protected AutoSubscriptionCreationOverride internalGetAutoSubscriptionCreation() {
validateNamespacePolicyOperation(namespaceName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
return policies.autoSubscriptionCreationOverride;
}
protected CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSubscriptionCreationAsync() {

protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncResponse) {
internalSetAutoSubscriptionCreation(asyncResponse, null);
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.AUTO_SUBSCRIPTION_CREATION,
PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.autoSubscriptionCreationOverride);
}

protected CompletableFuture<Void> internalModifyDeduplicationAsync(Boolean enableDeduplication) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,26 +724,48 @@ public void setAutoSubscriptionCreation(
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) {
try {
validateNamespaceName(property, cluster, namespace);
internalSetAutoSubscriptionCreation(asyncResponse, autoSubscriptionCreationOverride);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(property, cluster, namespace);
internalSetAutoSubscriptionCreationAsync(autoSubscriptionCreationOverride)
.thenAccept(__ -> {
log.info("[{}] Successfully set autoSubscriptionCreation on namespace {}",
clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to set autoSubscriptionCreation on namespace {}", clientAppId(),
namespaceName, ex);
if (ex instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@GET
@Path("/{property}/{cluster}/{namespace}/autoSubscriptionCreation")
@ApiOperation(value = "Get autoSubscriptionCreation info in a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")})
public AutoSubscriptionCreationOverride getAutoSubscriptionCreation(@PathParam("property") String property,
public void getAutoSubscriptionCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetAutoSubscriptionCreation();
internalGetAutoSubscriptionCreationAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("Failed to get autoSubscriptionCreation for namespace {}", namespaceName, ex);
if (ex instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@DELETE
Expand All @@ -754,14 +776,24 @@ public AutoSubscriptionCreationOverride getAutoSubscriptionCreation(@PathParam("
public void removeAutoSubscriptionCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
try {
validateNamespaceName(property, cluster, namespace);
internalRemoveAutoSubscriptionCreation(asyncResponse);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(property, cluster, namespace);
internalSetAutoSubscriptionCreationAsync(null)
.thenAccept(__ -> {
log.info("[{}] Successfully set autoSubscriptionCreation on namespace {}",
clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to remove autoSubscriptionCreation on namespace {}", clientAppId(),
namespaceName, ex);
if (ex instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,25 +655,42 @@ public void setAutoSubscriptionCreation(
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@ApiParam(value = "Settings for automatic subscription creation")
AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) {
try {
validateNamespaceName(tenant, namespace);
internalSetAutoSubscriptionCreation(asyncResponse, autoSubscriptionCreationOverride);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(tenant, namespace);
internalSetAutoSubscriptionCreationAsync(autoSubscriptionCreationOverride)
.thenAccept(__ -> {
log.info("[{}] Successfully set autoSubscriptionCreation on namespace {}",
clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to set autoSubscriptionCreation on namespace {}", clientAppId(),
namespaceName, ex);
if (ex instanceof MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/autoSubscriptionCreation")
@ApiOperation(value = "Get autoSubscriptionCreation info in a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")})
public AutoSubscriptionCreationOverride getAutoSubscriptionCreation(@PathParam("tenant") String tenant,
public void getAutoSubscriptionCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetAutoSubscriptionCreation();
internalGetAutoSubscriptionCreationAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get autoSubscriptionCreation for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
Expand All @@ -683,14 +700,24 @@ public AutoSubscriptionCreationOverride getAutoSubscriptionCreation(@PathParam("
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public void removeAutoSubscriptionCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
try {
validateNamespaceName(tenant, namespace);
internalRemoveAutoSubscriptionCreation(asyncResponse);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(tenant, namespace);
internalSetAutoSubscriptionCreationAsync(null)
.thenAccept(__ -> {
log.info("[{}] Successfully set autoSubscriptionCreation on namespace {}",
clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to set autoSubscriptionCreation on namespace {}", clientAppId(),
namespaceName, ex);
if (ex instanceof MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@GET
Expand Down

0 comments on commit 7b0d489

Please sign in to comment.