Skip to content

Commit

Permalink
PIP-180 Part III : Add shadow topic in TopicPolicy (apache#17242)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 authored Aug 31, 2022
1 parent 4facef2 commit 4434bf2
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -5382,4 +5383,65 @@ protected CompletableFuture<Void> internalRemoveEntryFilters(boolean isGlobal) {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
}));
}

protected CompletableFuture<Void> validateShadowTopics(List<String> shadowTopics) {
List<CompletableFuture<Void>> futures = new ArrayList<>(shadowTopics.size());
for (String shadowTopic : shadowTopics) {
try {
TopicName shadowTopicName = TopicName.get(shadowTopic);
if (!shadowTopicName.isPersistent()) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Only persistent topic can be set as shadow topic"));
}
futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName)
.thenAccept(isExists -> {
if (!isExists) {
throw new RestException(Status.PRECONDITION_FAILED,
"Shadow topic [" + shadowTopic + "] not exists.");
}
}));
} catch (IllegalArgumentException e) {
return FutureUtil.failedFuture(new RestException(Status.FORBIDDEN,
"Invalid shadow topic name: " + shadowTopic));
}
}
return FutureUtil.waitForAll(futures);
}

protected CompletableFuture<Void> internalSetShadowTopic(List<String> shadowTopics) {
if (!topicName.isPersistent()) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Only persistent source topic is supported with shadow topics."));
}
if (CollectionUtils.isEmpty(shadowTopics)) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Cannot specify empty shadow topics, please use remove command instead."));
}
return validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> validateShadowTopics(shadowTopics))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setShadowTopics(shadowTopics);
return pulsar().getTopicPoliciesService().
updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

protected CompletableFuture<Void> internalDeleteShadowTopics() {
return validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(shadowTopicName -> getTopicPoliciesAsyncWithRetry(topicName))
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
List<String> shadowTopics = topicPolicies.getShadowTopics();
if (CollectionUtils.isEmpty(shadowTopics)) {
return CompletableFuture.completedFuture(null);
}
topicPolicies.setShadowTopics(null);
return pulsar().getTopicPoliciesService().
updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4363,5 +4363,84 @@ public void removeEntryFilters(@Suspended final AsyncResponse asyncResponse,
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/shadowTopics")
@ApiOperation(value = "Get the shadow topic list for a topic")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message =
"Topic level policy is disabled, enable the topic level policy and retry")})
public void getShadowTopics(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC,
PolicyOperation.READ))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
.thenAccept(op -> asyncResponse.resume(op.map(TopicPolicies::getShadowTopics).orElse(null)))
.exceptionally(ex -> {
handleTopicPolicyException("getShadowTopics", ex, asyncResponse);
return null;
});
}

@PUT
@Path("/{tenant}/{namespace}/{topic}/shadowTopics")
@ApiOperation(value = "Set shadow topic list for a topic")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message =
"Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
})
public void setShadowTopics(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "List of shadow topics", required = true) List<String> shadowTopics) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetShadowTopic(shadowTopics))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setShadowTopic", ex, asyncResponse);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/shadowTopics")
@ApiOperation(value = "Delete shadow topics for a topic")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message =
"Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
})
public void deleteShadowTopics(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalDeleteShadowTopics())
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("deleteShadowTopic", ex, asyncResponse);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

@Slf4j
@Test(groups = "broker-admin")
Expand Down Expand Up @@ -3059,4 +3060,38 @@ public void testMaxMessageSizeWithChunking() throws Exception {
producer.send(new byte[2000]);
}

@Test(timeOut = 30000)
public void testShadowTopics() throws Exception {
final String sourceTopic = "persistent://" + myNamespace + "/source-test-" + UUID.randomUUID();
final String shadowTopic1 = "persistent://" + myNamespace + "/shadow-test1-" + UUID.randomUUID();
final String shadowTopic2 = "persistent://" + myNamespace + "/shadow-test2-" + UUID.randomUUID();

pulsarClient.newProducer().topic(sourceTopic).create().close();

Awaitility.await().untilAsserted(() ->
Assert.assertNull(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(sourceTopic))));

//shadow topic must exist
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, ()->
admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic1)));

//shadow topic must be persistent topic
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, ()->
admin.topics().setShadowTopics(sourceTopic,
Lists.newArrayList("non-persistent://" + myNamespace + "/shadow-test1-" + UUID.randomUUID())));

pulsarClient.newProducer().topic(shadowTopic1).create().close();
pulsarClient.newProducer().topic(shadowTopic2).create().close();

admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic1));
Awaitility.await().untilAsserted(() -> Assert.assertEquals(admin.topics().getShadowTopics(sourceTopic),
Lists.newArrayList(shadowTopic1)));
admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic1, shadowTopic2));
Awaitility.await().untilAsserted(() -> Assert.assertEquals(admin.topics().getShadowTopics(sourceTopic),
Lists.newArrayList(shadowTopic1, shadowTopic2)));

admin.topics().removeShadowTopics(sourceTopic);
Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getShadowTopics(sourceTopic)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4335,4 +4335,48 @@ CompletableFuture<Message<byte[]>> examineMessageAsync(String topic, String init
* @param topic topic name
*/
CompletableFuture<Void> setSchemaValidationEnforcedAsync(String topic, boolean enable);

/**
* Set shadow topic list for a source topic.
*
* @param sourceTopic source topic name
* @param shadowTopics list of shadow topic name
*/
void setShadowTopics(String sourceTopic, List<String> shadowTopics) throws PulsarAdminException;

/**
* Remove all shadow topics for a source topic.
*
* @param sourceTopic source topic name
*/
void removeShadowTopics(String sourceTopic) throws PulsarAdminException;

/**
* Get shadow topic list of the source topic.
*
* @param sourceTopic source topic name
* @return shadow topic list
*/
List<String> getShadowTopics(String sourceTopic) throws PulsarAdminException;

/**
* Set shadow topic list for a source topic asynchronously.
*
* @param sourceTopic source topic name
*/
CompletableFuture<Void> setShadowTopicsAsync(String sourceTopic, List<String> shadowTopics);

/**
* Remove all shadow topics for a source topic asynchronously.
*
* @param sourceTopic source topic name
*/
CompletableFuture<Void> removeShadowTopicsAsync(String sourceTopic);

/**
* Get shadow topic list of the source topic asynchronously.
*
* @param sourceTopic source topic name
*/
CompletableFuture<List<String>> getShadowTopicsAsync(String sourceTopic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2714,5 +2714,41 @@ public CompletableFuture<Void> removeReplicationClustersAsync(String topic) {
return asyncDeleteRequest(path);
}

@Override
public void setShadowTopics(String sourceTopic, List<String> shadowTopics) throws PulsarAdminException {
sync(() -> setShadowTopicsAsync(sourceTopic, shadowTopics));
}

@Override
public void removeShadowTopics(String sourceTopic) throws PulsarAdminException {
sync(() -> removeShadowTopicsAsync(sourceTopic));
}

@Override
public List<String> getShadowTopics(String sourceTopic) throws PulsarAdminException {
return sync(() -> getShadowTopicsAsync(sourceTopic));
}

@Override
public CompletableFuture<Void> setShadowTopicsAsync(String sourceTopic, List<String> shadowTopics) {
TopicName tn = validateTopic(sourceTopic);
WebTarget path = topicPath(tn, "shadowTopics");
return asyncPutRequest(path, Entity.entity(shadowTopics, MediaType.APPLICATION_JSON));
}

@Override
public CompletableFuture<Void> removeShadowTopicsAsync(String sourceTopic) {
TopicName tn = validateTopic(sourceTopic);
WebTarget path = topicPath(tn, "shadowTopics");
return asyncDeleteRequest(path);
}

@Override
public CompletableFuture<List<String>> getShadowTopicsAsync(String sourceTopic) {
TopicName tn = validateTopic(sourceTopic);
WebTarget path = topicPath(tn, "shadowTopics");
return asyncGetRequest(path, new FutureCallback<List<String>>(){});
}

private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1850,6 +1850,16 @@ public boolean matches(Long timestamp) {

cmdTopics.run(split("remove-replication-clusters persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeReplicationClusters("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-shadow-topics persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getShadowTopics("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("set-shadow-topics persistent://myprop/clust/ns1/ds1 -t test"));
verify(mockTopics).setShadowTopics("persistent://myprop/clust/ns1/ds1", Lists.newArrayList("test"));

cmdTopics.run(split("remove-shadow-topics persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeShadowTopics("persistent://myprop/clust/ns1/ds1");

}

private static LedgerInfo newLedger(long id, long entries, long size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("set-replication-clusters", new SetReplicationClusters());
jcommander.addCommand("remove-replication-clusters", new RemoveReplicationClusters());

jcommander.addCommand("get-shadow-topics", new GetShadowTopics());
jcommander.addCommand("set-shadow-topics", new SetShadowTopics());
jcommander.addCommand("remove-shadow-topics", new RemoveShadowTopics());

jcommander.addCommand("get-schema-validation-enforce", new GetSchemaValidationEnforced());
jcommander.addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced());

Expand Down Expand Up @@ -1653,6 +1657,47 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get the shadow topics for a topic")
private class GetShadowTopics extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopics().getShadowTopics(persistentTopic));
}
}

@Parameters(commandDescription = "Set the shadow topics for a topic")
private class SetShadowTopics extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--topics",
"-t" }, description = "Shadow topic list (comma separated values)", required = true)
private String shadowTopics;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
List<String> topics = Lists.newArrayList(shadowTopics.split(","));
getTopics().setShadowTopics(persistentTopic, topics);
}
}

@Parameters(commandDescription = "Remove the shadow topics for a topic")
private class RemoveShadowTopics extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopics().removeShadowTopics(persistentTopic);
}
}

@Parameters(commandDescription = "Get the delayed delivery policy for a topic")
private class GetDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace/topic", required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ public enum PolicyName {
TTL,
MAX_TOPICS,
RESOURCEGROUP,
ENTRY_FILTERS
ENTRY_FILTERS,
SHADOW_TOPIC
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class TopicPolicies {
@Builder.Default
private List<SubType> subscriptionTypesEnabled = new ArrayList<>();
private List<String> replicationClusters;
private List<String> shadowTopics;
private Boolean isGlobal = false;
private PersistencePolicies persistence;
private RetentionPolicies retentionPolicies;
Expand Down

0 comments on commit 4434bf2

Please sign in to comment.