Skip to content
This repository was archived by the owner on Apr 1, 2024. It is now read-only.

Commit 6ec473e

Browse files
authored
[improve][broker] Add fine-grain authorization to retention admin API (apache#22163)
1 parent 72cedb7 commit 6ec473e

File tree

5 files changed

+246
-10
lines changed

5 files changed

+246
-10
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2435,7 +2435,8 @@ public void getRetention(@Suspended final AsyncResponse asyncResponse,
24352435
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
24362436
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
24372437
validateTopicName(tenant, namespace, encodedTopic);
2438-
preValidation(authoritative)
2438+
validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.READ)
2439+
.thenCompose(__ -> preValidation(authoritative))
24392440
.thenCompose(__ -> internalGetRetention(applied, isGlobal))
24402441
.thenAccept(asyncResponse::resume)
24412442
.exceptionally(ex -> {
@@ -2462,7 +2463,8 @@ public void setRetention(@Suspended final AsyncResponse asyncResponse,
24622463
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
24632464
@ApiParam(value = "Retention policies for the specified topic") RetentionPolicies retention) {
24642465
validateTopicName(tenant, namespace, encodedTopic);
2465-
preValidation(authoritative)
2466+
validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE)
2467+
.thenCompose(__ -> preValidation(authoritative))
24662468
.thenCompose(__ -> internalSetRetention(retention, isGlobal))
24672469
.thenRun(() -> {
24682470
try {
@@ -2498,7 +2500,8 @@ public void removeRetention(@Suspended final AsyncResponse asyncResponse,
24982500
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
24992501
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
25002502
validateTopicName(tenant, namespace, encodedTopic);
2501-
preValidation(authoritative)
2503+
validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE)
2504+
.thenCompose(__ -> preValidation(authoritative))
25022505
.thenCompose(__ -> internalRemoveRetention(isGlobal))
25032506
.thenRun(() -> {
25042507
log.info("[{}] Successfully remove retention: namespace={}, topic={}",
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.admin;
20+
21+
import io.jsonwebtoken.Jwts;
22+
import java.util.Set;
23+
import java.util.UUID;
24+
import lombok.Cleanup;
25+
import lombok.SneakyThrows;
26+
import org.apache.pulsar.client.admin.PulsarAdmin;
27+
import org.apache.pulsar.client.admin.PulsarAdminException;
28+
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
29+
import org.apache.pulsar.common.policies.data.AuthAction;
30+
import org.apache.pulsar.common.policies.data.RetentionPolicies;
31+
import org.apache.pulsar.common.policies.data.TenantInfo;
32+
import org.apache.pulsar.security.MockedPulsarStandalone;
33+
import org.testng.Assert;
34+
import org.testng.annotations.AfterClass;
35+
import org.testng.annotations.BeforeClass;
36+
import org.testng.annotations.Test;
37+
38+
import static org.awaitility.Awaitility.await;
39+
40+
41+
public final class TopicPoliciesAuthZTest extends MockedPulsarStandalone {
42+
43+
private PulsarAdmin superUserAdmin;
44+
45+
private PulsarAdmin tenantManagerAdmin;
46+
47+
private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString();
48+
private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
49+
.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
50+
51+
@SneakyThrows
52+
@BeforeClass
53+
public void before() {
54+
configureTokenAuthentication();
55+
configureDefaultAuthorization();
56+
start();
57+
this.superUserAdmin =PulsarAdmin.builder()
58+
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
59+
.authentication(new AuthenticationToken(SUPER_USER_TOKEN))
60+
.build();
61+
final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public");
62+
tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
63+
superUserAdmin.tenants().updateTenant("public", tenantInfo);
64+
this.tenantManagerAdmin = PulsarAdmin.builder()
65+
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
66+
.authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
67+
.build();
68+
}
69+
70+
71+
@SneakyThrows
72+
@AfterClass
73+
public void after() {
74+
close();
75+
}
76+
77+
78+
@SneakyThrows
79+
@Test
80+
public void testRetention() {
81+
final String random = UUID.randomUUID().toString();
82+
final String topic = "persistent://public/default/" + random;
83+
final String subject = UUID.randomUUID().toString();
84+
final String token = Jwts.builder()
85+
.claim("sub", subject).signWith(SECRET_KEY).compact();
86+
superUserAdmin.topics().createNonPartitionedTopic(topic);
87+
88+
@Cleanup
89+
final PulsarAdmin subAdmin = PulsarAdmin.builder()
90+
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
91+
.authentication(new AuthenticationToken(token))
92+
.build();
93+
final RetentionPolicies definedRetentionPolicy = new RetentionPolicies(1, 1);
94+
// test superuser
95+
superUserAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
96+
97+
// because the topic policies is eventual consistency, we should wait here
98+
await().untilAsserted(() -> {
99+
final RetentionPolicies receivedRetentionPolicy = superUserAdmin.topicPolicies().getRetention(topic);
100+
Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy);
101+
});
102+
superUserAdmin.topicPolicies().removeRetention(topic);
103+
104+
await().untilAsserted(() -> {
105+
final RetentionPolicies retention = superUserAdmin.topicPolicies().getRetention(topic);
106+
Assert.assertNull(retention);
107+
});
108+
109+
// test tenant manager
110+
111+
tenantManagerAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
112+
await().untilAsserted(() -> {
113+
final RetentionPolicies receivedRetentionPolicy = tenantManagerAdmin.topicPolicies().getRetention(topic);
114+
Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy);
115+
});
116+
tenantManagerAdmin.topicPolicies().removeRetention(topic);
117+
await().untilAsserted(() -> {
118+
final RetentionPolicies retention = tenantManagerAdmin.topicPolicies().getRetention(topic);
119+
Assert.assertNull(retention);
120+
});
121+
122+
// test nobody
123+
124+
try {
125+
subAdmin.topicPolicies().getRetention(topic);
126+
Assert.fail("unexpected behaviour");
127+
} catch (PulsarAdminException ex) {
128+
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
129+
}
130+
131+
try {
132+
133+
subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
134+
Assert.fail("unexpected behaviour");
135+
} catch (PulsarAdminException ex) {
136+
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
137+
}
138+
139+
try {
140+
subAdmin.topicPolicies().removeRetention(topic);
141+
Assert.fail("unexpected behaviour");
142+
} catch (PulsarAdminException ex) {
143+
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
144+
}
145+
146+
// test sub user with permissions
147+
for (AuthAction action : AuthAction.values()) {
148+
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
149+
subject, Set.of(action));
150+
try {
151+
subAdmin.topicPolicies().getRetention(topic);
152+
Assert.fail("unexpected behaviour");
153+
} catch (PulsarAdminException ex) {
154+
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
155+
}
156+
157+
try {
158+
159+
subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
160+
Assert.fail("unexpected behaviour");
161+
} catch (PulsarAdminException ex) {
162+
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
163+
}
164+
165+
try {
166+
subAdmin.topicPolicies().removeRetention(topic);
167+
Assert.fail("unexpected behaviour");
168+
} catch (PulsarAdminException ex) {
169+
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
170+
}
171+
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
172+
}
173+
}
174+
175+
}
Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,36 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pulsar.security.tls;
19+
package org.apache.pulsar.security;
2020

2121
import static org.apache.pulsar.utils.ResourceUtils.getAbsolutePath;
22+
import io.jsonwebtoken.Jwts;
23+
import io.jsonwebtoken.SignatureAlgorithm;
2224
import com.fasterxml.jackson.databind.ObjectMapper;
2325
import com.google.common.collect.Sets;
2426
import java.util.HashMap;
2527
import java.util.Map;
2628
import java.util.Optional;
29+
import java.util.Properties;
30+
import java.util.Set;
31+
import javax.crypto.SecretKey;
2732
import lombok.Getter;
2833
import lombok.SneakyThrows;
2934
import org.apache.pulsar.broker.PulsarService;
3035
import org.apache.pulsar.broker.ServiceConfiguration;
3136
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
37+
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
38+
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
39+
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
3240
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
3341
import org.apache.pulsar.client.admin.PulsarAdmin;
3442
import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
3543
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
44+
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
3645
import org.apache.pulsar.common.policies.data.ClusterData;
3746
import org.apache.pulsar.common.policies.data.TenantInfo;
47+
import org.apache.pulsar.common.util.ObjectMapperFactory;
48+
3849

3950

4051
public abstract class MockedPulsarStandalone implements AutoCloseable {
@@ -60,6 +71,50 @@ public abstract class MockedPulsarStandalone implements AutoCloseable {
6071
serviceConfiguration.setExposeBundlesMetricsInPrometheus(true);
6172
}
6273

74+
75+
protected static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
76+
77+
private static final String BROKER_INTERNAL_CLIENT_SUBJECT = "broker_internal";
78+
private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder()
79+
.claim("sub", BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact();
80+
protected static final String SUPER_USER_SUBJECT = "super-user";
81+
protected static final String SUPER_USER_TOKEN = Jwts.builder()
82+
.claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact();
83+
protected static final String NOBODY_SUBJECT = "nobody";
84+
protected static final String NOBODY_TOKEN = Jwts.builder()
85+
.claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact();
86+
87+
88+
@SneakyThrows
89+
protected void configureTokenAuthentication() {
90+
serviceConfiguration.setAuthenticationEnabled(true);
91+
serviceConfiguration.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
92+
// internal client
93+
serviceConfiguration.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
94+
final Map<String, String> brokerClientAuthParams = new HashMap<>();
95+
brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN);
96+
final String brokerClientAuthParamStr = MAPPER.writeValueAsString(brokerClientAuthParams);
97+
serviceConfiguration.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr);
98+
99+
Properties properties = serviceConfiguration.getProperties();
100+
if (properties == null) {
101+
properties = new Properties();
102+
serviceConfiguration.setProperties(properties);
103+
}
104+
properties.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
105+
106+
}
107+
108+
109+
110+
protected void configureDefaultAuthorization() {
111+
serviceConfiguration.setAuthorizationEnabled(true);
112+
serviceConfiguration.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
113+
serviceConfiguration.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT, BROKER_INTERNAL_CLIENT_SUBJECT));
114+
}
115+
116+
117+
63118
@SneakyThrows
64119
protected void loadECTlsCertificateWithFile() {
65120
serviceConfiguration.setTlsEnabled(true);
@@ -176,4 +231,7 @@ public void close() throws Exception {
176231
protected static final String TLS_EC_KS_TRUSTED_STORE =
177232
getAbsolutePath("certificate-authority/ec/jks/ca.truststore.jks");
178233
protected static final String TLS_EC_KS_TRUSTED_STORE_PASS = "rootpw";
234+
235+
236+
private static final ObjectMapper MAPPER = ObjectMapperFactory.getMapper().getObjectMapper();
179237
}

pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.UUID;
2626
import lombok.Cleanup;
2727
import lombok.SneakyThrows;
28-
import org.apache.pulsar.security.tls.MockedPulsarStandalone;
28+
import org.apache.pulsar.security.MockedPulsarStandalone;
2929
import org.apache.pulsar.client.admin.PulsarAdmin;
3030
import org.apache.pulsar.client.api.Consumer;
3131
import org.apache.pulsar.client.api.Message;

pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121

2222
import static org.testng.Assert.assertEquals;
2323
import static org.testng.Assert.assertNotNull;
24+
import java.nio.charset.StandardCharsets;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.UUID;
2428
import lombok.Cleanup;
2529
import lombok.SneakyThrows;
26-
import org.apache.pulsar.security.tls.MockedPulsarStandalone;
30+
import org.apache.pulsar.security.MockedPulsarStandalone;
2731
import org.apache.pulsar.client.admin.PulsarAdmin;
2832
import org.apache.pulsar.client.api.Consumer;
2933
import org.apache.pulsar.client.api.Message;
@@ -35,10 +39,6 @@
3539
import org.testng.annotations.AfterClass;
3640
import org.testng.annotations.BeforeClass;
3741
import org.testng.annotations.Test;
38-
import java.nio.charset.StandardCharsets;
39-
import java.util.HashMap;
40-
import java.util.Map;
41-
import java.util.UUID;
4242

4343

4444
@Test

0 commit comments

Comments
 (0)