Skip to content

Commit

Permalink
refactor grant,revoke,get permission module
Browse files Browse the repository at this point in the history
  • Loading branch information
fanjianye committed Aug 31, 2022
1 parent 7e4e134 commit 0e5003a
Showing 1 changed file with 58 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,24 @@ protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissions
}
}
}

// If topic is partitioned, add based topic permission
if (topicName.isPartitioned() && auth.getTopicAuthentication().containsKey(
topicName.getPartitionedTopicName())) {
for (Map.Entry<String, Set<AuthAction>> entry :
auth.getTopicAuthentication().get(topicName.getPartitionedTopicName()).entrySet()) {
String role = entry.getKey();
Set<AuthAction> topicPermissions = entry.getValue();

if (!permissions.containsKey(role)) {
permissions.put(role, topicPermissions);
} else {
// Do the union between namespace and topic level
Set<AuthAction> union = Sets.union(permissions.get(role), topicPermissions);
permissions.put(role, union);
}
}
}
return permissions;
}));
}
Expand Down Expand Up @@ -324,32 +342,41 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
getPartitionedTopicMetadataAsync(topicName, true, false)
.thenCompose(metadata -> {
int numPartitions = metadata.partitions;
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if (numPartitions > 0) {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
future = future.thenCompose(unused -> grantPermissionsAsync(topicNamePartition, role,
actions));
}
}
return future.thenCompose(unused -> grantPermissionsAsync(topicName, role, actions))
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
}))).exceptionally(ex -> {
grantPermissionsAsync(TopicName.get(topicName.getPartitionedTopicName()), role, actions)
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()))))
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause);
resumeAsyncResponseExceptionally(asyncResponse, realCause);
return null;
});
}

private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String role) {
private CompletableFuture<Void> revokePermissionsAsync(TopicName topicName, String role, int numPartitions) {
String topicUri = topicName.toString();
return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
policiesOptional -> {
Policies policies = policiesOptional.orElseThrow(() ->
new RestException(Status.NOT_FOUND, "Namespace does not exist"));
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
// do compatible with previous pulsar version
// revoke all the partition permissions granted in previous version
future = future.thenComposeAsync(unused ->
namespaceResources().setPoliciesAsync(namespaceName, p -> {
if (numPartitions > 0) {
for (int i = 0; i < numPartitions; i++) {
p.auth_policies.getTopicAuthentication().computeIfPresent(
topicName.getPartition(i).toString(), (k, roles) -> {
roles.remove(role);
if (roles.isEmpty()) {
return null;
}
return roles;
});
}
}
return p;
}));
if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
|| !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}",
Expand All @@ -358,12 +385,18 @@ private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String r
"Permissions are not set at the topic level"));
}
// Write the new policies to metadata store
return namespaceResources().setPoliciesAsync(namespaceName, p -> {
p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
return future.thenComposeAsync(unused -> namespaceResources().setPoliciesAsync(namespaceName, p -> {
p.auth_policies.getTopicAuthentication().computeIfPresent(topicUri, (k, roles) -> {
roles.remove(role);
if (roles.isEmpty()) {
return null;
}
return roles;
});
return p;
}).thenAccept(__ ->
})).thenAccept(__ ->
log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role,
topicUri)
topicUri)
);
}
);
Expand All @@ -372,22 +405,12 @@ private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String r
protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
getPartitionedTopicMetadataAsync(topicName, true, false)
.thenCompose(metadata -> {
int numPartitions = metadata.partitions;
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if (numPartitions > 0) {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
future = future.thenComposeAsync(unused ->
revokePermissionsAsync(topicNamePartition.toString(), role));
}
}
return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role))
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
}))
).exceptionally(ex -> {
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 -> {
TopicName basedTopic = TopicName.get(topicName.getPartitionedTopicName());
return getPartitionedTopicMetadataAsync(basedTopic, true, false)
.thenCompose(metadata -> revokePermissionsAsync(basedTopic, role, metadata.partitions)
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build())));
})).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicName, realCause);
resumeAsyncResponseExceptionally(asyncResponse, realCause);
Expand Down

0 comments on commit 0e5003a

Please sign in to comment.