Skip to content

Commit

Permalink
[improve][broker] Support revoking permission for AuthorizationProvid…
Browse files Browse the repository at this point in the history
…er (#20456)
  • Loading branch information
Technoboy- authored Jun 2, 2023
1 parent 49174a9 commit e220a5d
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,18 @@ CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String
CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
String authDataJson);

/**
* Revoke authorization-action permission on a namespace to the given client.
* @param namespace
* @param role
* @return CompletableFuture<Void>
*/
default CompletableFuture<Void> revokePermissionAsync(NamespaceName namespace, String role) {
return FutureUtil.failedFuture(new IllegalStateException(
String.format("revokePermissionAsync on namespace %s is not supported by the Authorization",
namespace)));
}

/**
* Grant permission to roles that can access subscription-admin api.
*
Expand All @@ -193,7 +205,7 @@ CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAc
* @param roles
* @param authDataJson
* additional authdata in json format
* @return
* @return CompletableFuture<Void>
*/
CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
Set<String> roles, String authDataJson);
Expand All @@ -203,7 +215,7 @@ CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace
* @param namespace
* @param subscriptionName
* @param role
* @return
* @return CompletableFuture<Void>
*/
CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
String role, String authDataJson);
Expand All @@ -226,6 +238,19 @@ CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespac
CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
String authDataJson);


/**
* Revoke authorization-action permission on a topic to the given client.
* @param topicName
* @param role
* @return CompletableFuture<Void>
*/
default CompletableFuture<Void> revokePermissionAsync(TopicName topicName, String role) {
return FutureUtil.failedFuture(new IllegalStateException(
String.format("revokePermissionAsync on topicName %s is not supported by the Authorization",
topicName)));
}

/**
* Check if a given <tt>role</tt> is allowed to execute a given <tt>operation</tt> on the tenant.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,17 @@ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set
return provider.grantPermissionAsync(namespace, actions, role, authDataJson);
}

/**
*
* Revoke authorization-action permission on a namespace to the given client.
*
* @param namespace
* @param role
*/
public CompletableFuture<Void> revokePermissionAsync(NamespaceName namespace, String role) {
return provider.revokePermissionAsync(namespace, role);
}

/**
* Grant permission to roles that can access subscription-admin api.
*
Expand Down Expand Up @@ -157,16 +168,26 @@ public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName n
* NOTE: used to complete with {@link IllegalArgumentException} when namespace not found or with
* {@link IllegalStateException} when failed to grant permission.
*
* @param topicname
* @param topicName
* @param role
* @param authDataJson
* additional authdata in json for targeted authorization provider
* @completesWith null when the permissions are updated successfully.
* @completesWith {@link MetadataStoreException} when the MetadataStore is not updated.
*/
public CompletableFuture<Void> grantPermissionAsync(TopicName topicname, Set<AuthAction> actions, String role,
public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
String authDataJson) {
return provider.grantPermissionAsync(topicname, actions, role, authDataJson);
return provider.grantPermissionAsync(topicName, actions, role, authDataJson);
}

/**
* Revoke authorization-action permission on a topic to the given client.
*
* @param topicName
* @param role
*/
public CompletableFuture<Void> revokePermissionAsync(TopicName topicName, String role) {
return provider.revokePermissionAsync(topicName, role);
}

/**
Expand Down Expand Up @@ -418,7 +439,7 @@ private boolean isValidOriginalPrincipal(AuthenticationParameters authParams) {
/**
* Whether the authenticatedPrincipal and the originalPrincipal form a valid pair. This method assumes that
* authenticatedPrincipal and originalPrincipal can be equal, as long as they are not a proxy role. This use
* case is relvant for the admin server because of the way the proxy handles authentication. The binary protocol
* case is relevant for the admin server because of the way the proxy handles authentication. The binary protocol
* should not use this method.
* @return true when roles are a valid combination and false when roles are an invalid combination
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,33 @@ public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<Aut
});
}

@Override
public CompletableFuture<Void> revokePermissionAsync(TopicName topicName, String role) {
return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
if (readonly) {
if (log.isDebugEnabled()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new IllegalStateException("policies are in readonly mode");
}
return pulsarResources.getNamespaceResources()
.setPoliciesAsync(topicName.getNamespaceObject(), policies -> {
policies.auth_policies.getTopicAuthentication()
.computeIfPresent(topicName.toString(), (k, v) -> {
v.remove(role);
return null;
});
return policies;
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to revoke permissions for role {} on topic {}", role, topicName, ex);
} else {
log.info("Successfully revoke permissions for role {} on topic {}", role, topicName);
}
});
});
}

@Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespaceName, Set<AuthAction> actions,
String role, String authDataJson) {
Expand All @@ -274,6 +301,29 @@ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespaceName,
});
}

@Override
public CompletableFuture<Void> revokePermissionAsync(NamespaceName namespaceName, String role) {
return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
if (readonly) {
if (log.isDebugEnabled()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new IllegalStateException("policies are in readonly mode");
}
return pulsarResources.getNamespaceResources()
.setPoliciesAsync(namespaceName, policies -> {
policies.auth_policies.getNamespaceAuthentication().remove(role);
return policies;
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to revoke permissions for role {} namespace {}", role, namespaceName, ex);
} else {
log.info("Successfully revoke permissions for role {} namespace {}", role, namespaceName);
}
});
});
}

@Override
public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
Set<String> roles, String authDataJson) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,54 @@ public void testGrantPermission() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testRevokePermission() throws Exception {
log.info("-- Starting {} test --", methodName);
cleanup();
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
setup();

Authentication adminAuthentication = new ClientAuthentication("superUser");

@Cleanup
PulsarAdmin admin = spy(
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build());

Authentication authentication = new ClientAuthentication(clientRole);

replacePulsarClient(PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(authentication));

admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());

admin.tenants().createTenant("public",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("public/default", Sets.newHashSet("test"));

AuthorizationService authorizationService = new AuthorizationService(conf, pulsar.getPulsarResources());
TopicName topicName = TopicName.get("persistent://public/default/t1");
NamespaceName namespaceName = NamespaceName.get("public/default");
String role = "test-role";
Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, AuthAction.consume);
Assert.assertFalse(authorizationService.canProduce(topicName, role, null));
Assert.assertFalse(authorizationService.canConsume(topicName, role, null, "sub1"));
authorizationService.grantPermissionAsync(topicName, actions, role, "auth-json").get();
Assert.assertTrue(authorizationService.canProduce(topicName, role, null));
Assert.assertTrue(authorizationService.canConsume(topicName, role, null, "sub1"));

authorizationService.revokePermissionAsync(topicName, role).get();
Assert.assertFalse(authorizationService.canProduce(topicName, role, null));
Assert.assertFalse(authorizationService.canConsume(topicName, role, null, "sub1"));

authorizationService.grantPermissionAsync(namespaceName, actions, role, null).get();
Assert.assertTrue(authorizationService.allowNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPIC, role, null).get());
authorizationService.revokePermissionAsync(namespaceName, role).get();
Assert.assertFalse(authorizationService.allowNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPIC, role, null).get());

log.info("-- Exiting {} test --", methodName);
}

@Test
public void testAuthData() throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down

0 comments on commit e220a5d

Please sign in to comment.