Skip to content

Commit

Permalink
[enh] Issue 15455: Pulsar Admin: create subscripion with Properties (…
Browse files Browse the repository at this point in the history
…PIP-105) (apache#15503)
  • Loading branch information
eolivelli authored May 20, 2022
1 parent a985936 commit e2fa189
Show file tree
Hide file tree
Showing 22 changed files with 266 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2127,7 +2127,7 @@ private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncRespon
}

protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName,
MessageIdImpl messageId, boolean authoritative, boolean replicated) {
MessageIdImpl messageId, boolean authoritative, boolean replicated, Map<String, String> properties) {
CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
Expand All @@ -2136,12 +2136,12 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
}
ret.thenAccept(__ -> {
final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.latest : messageId;
log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), topicName, subscriptionName,
targetMessageId);
log.info("[{}][{}] Creating subscription {} at message id {} with properties {}", clientAppId(),
topicName, subscriptionName, targetMessageId, properties);
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated);
subscriptionName, targetMessageId, authoritative, replicated, properties);
} else {
boolean allowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
getPartitionedTopicMetadataAsync(topicName,
Expand All @@ -2159,7 +2159,7 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
try {
pulsar().getAdminClient().topics()
.createSubscriptionAsync(topicNamePartition.toString(),
subscriptionName, targetMessageId)
subscriptionName, targetMessageId, false, properties)
.handle((r, ex) -> {
if (ex != null) {
// fail the operation on unknown exception or
Expand Down Expand Up @@ -2213,7 +2213,7 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
});
} else {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated);
subscriptionName, targetMessageId, authoritative, replicated, properties);
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
Expand All @@ -2238,7 +2238,8 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su

private void internalCreateSubscriptionForNonPartitionedTopic(
AsyncResponse asyncResponse, String subscriptionName,
MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) {
MessageIdImpl targetMessageId, boolean authoritative, boolean replicated,
Map<String, String> properties) {

boolean isAllowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);

Expand All @@ -2258,7 +2259,7 @@ private void internalCreateSubscriptionForNonPartitionedTopic(
throw new RestException(Status.CONFLICT, "Subscription already exists for topic");
}

return topic.createSubscription(subscriptionName, InitialPosition.Latest, replicated);
return topic.createSubscription(subscriptionName, InitialPosition.Latest, replicated, properties);
}).thenCompose(subscription -> {
// Mark the cursor as "inactive" as it was created without a real consumer connected
((PersistentSubscription) subscription).deactivateCursor();
Expand Down Expand Up @@ -4022,7 +4023,7 @@ private Subscription getSubscriptionReference(String subName, PersistentTopic to
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
sub = topic.createSubscription(subName,
InitialPosition.Earliest, false).get();
InitialPosition.Earliest, false, null).get();
}

return checkNotNull(sub);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,8 @@ public void createSubscription(@Suspended final AsyncResponse asyncResponse, @Pa
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic "
+ "can only be done through client");
}
internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated);
internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated,
null);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1473,10 +1473,13 @@ public void createSubscription(
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic "
+ "can only be done through client");
}
Map<String, String> subscriptionProperties = resetCursorData == null ? null :
resetCursorData.getProperties();
MessageIdImpl messageId = resetCursorData == null ? null :
new MessageIdImpl(resetCursorData.getLedgerId(), resetCursorData.getEntryId(),
resetCursorData.getPartitionIndex());
internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated);
internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative,
replicated, subscriptionProperties);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}
createInitSubFuture =
topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest,
false);
false, null);
} else {
createInitSubFuture = CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ default long getNumberOfEntriesDelayed() {

void addUnAckedMessages(int unAckMessages);

Map<String, String> getSubscriptionProperties();

default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
// Default is no-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ CompletableFuture<Consumer> subscribe(TransportCnx cnx, String subscriptionName,
CompletableFuture<Consumer> subscribe(SubscriptionOption option);

CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
boolean replicateSubscriptionState);
boolean replicateSubscriptionState, Map<String, String> properties);

CompletableFuture<Void> unsubscribe(String subName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,24 @@ public class NonPersistentSubscription implements Subscription {

private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
private final Map<String, String> subscriptionProperties;

// If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription.
private final boolean isDurable;

private KeySharedMode keySharedMode = null;

public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, boolean isDurable) {
public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, boolean isDurable,
Map<String, String> properties) {
this.topic = topic;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
IS_FENCED_UPDATER.set(this, FALSE);
this.lastActive = System.currentTimeMillis();
this.isDurable = isDurable;
this.subscriptionProperties = properties != null
? Collections.unmodifiableMap(properties) : Collections.emptyMap();
}

@Override
Expand Down Expand Up @@ -518,4 +522,8 @@ public long getLastActive() {
public void updateLastActive() {
this.lastActive = System.currentTimeMillis();
}

public Map<String, String> getSubscriptionProperties() {
return subscriptionProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
option.isDurable(), option.getStartMessageId(), option.getMetadata(),
option.isReadCompacted(),
option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
option.getKeySharedMeta());
option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null));
}

@Override
Expand All @@ -262,7 +262,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
KeySharedMeta keySharedMeta) {
return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted, resetStartMessageBackInSec,
replicateSubscriptionState, keySharedMeta);
replicateSubscriptionState, keySharedMeta, null);
}

private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
Expand All @@ -272,7 +272,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
boolean readCompacted,
long resetStartMessageBackInSec,
boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta) {
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties) {

return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
Expand Down Expand Up @@ -312,7 +313,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
}

NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new NonPersistentSubscription(this, subscriptionName, isDurable));
name -> new NonPersistentSubscription(this, subscriptionName, isDurable, subscriptionProperties));

Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
false, cnx, cnx.getAuthRole(), metadata, readCompacted, keySharedMeta,
Expand Down Expand Up @@ -364,8 +365,9 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St

@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
boolean replicateSubscriptionState) {
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true));
boolean replicateSubscriptionState, Map<String, String> properties) {
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true,
properties));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,7 @@ void topicTerminated() {
}
}

@Override
public Map<String, String> getSubscriptionProperties() {
return subscriptionProperties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,9 +1014,10 @@ private void resetSubscriptionCursor(Subscription subscription, CompletableFutur

@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
boolean replicateSubscriptionState) {
boolean replicateSubscriptionState,
Map<String, String> subscriptionProperties) {
return getDurableSubscription(subscriptionName, initialPosition,
0 /*avoid reseting cursor*/, replicateSubscriptionState, null);
0 /*avoid reseting cursor*/, replicateSubscriptionState, subscriptionProperties);
}

/**
Expand Down Expand Up @@ -1489,7 +1490,7 @@ public CompletableFuture<Void> preCreateSubscriptionForCompactionIfNeeded() {
return isCompactionEnabled()
// If a topic has a compaction policy setup, we must make sure that the compaction cursor
// is pre-created, in order to ensure all the data will be seen by the compactor.
? createSubscription(COMPACTION_SUBSCRIPTION, CommandSubscribe.InitialPosition.Earliest, false)
? createSubscription(COMPACTION_SUBSCRIPTION, CommandSubscribe.InitialPosition.Earliest, false, null)
.thenCompose(__ -> CompletableFuture.completedFuture(null))
: CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription",
topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos);
topic.createSubscription(update.getSubscriptionName(),
InitialPosition.Latest, true /* replicateSubscriptionState */);
InitialPosition.Latest, true /* replicateSubscriptionState */, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@
package org.apache.pulsar.broker.admin;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -106,4 +113,66 @@ public void tesSkipMessageWithNonExistTopicAndNotExistSub() {
assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode());
assertEquals(exception.getMessage(), String.format("Topic %s not found", topic));
}

@DataProvider(name = "partitioned")
public static Object[][] partitioned() {
return new Object[][] {
{true},
{false}
};
}

@Test(dataProvider = "partitioned")
public void testCreateSubscriptionWithProperties(boolean partitioned) throws Exception {
String uuid = UUID.randomUUID().toString();
String topic = uuid + "-" + partitioned;

if (partitioned) {
admin.topics().createPartitionedTopic(topic, 4);
} else {
admin.topics().createNonPartitionedTopic(topic);
}

String subscriptionName = "sub";
Map<String, String> properties = new HashMap<>();
// test characters that often have problems in query strings
String value = "bar{}€/&:#[] ?'\"";
properties.put("foo", value);
admin.topics().createSubscription(topic, subscriptionName,
MessageId.latest, false, properties);

// null properties (old clients)
String subscriptionName2 = "sub2";
admin.topics().createSubscription(topic, subscriptionName2,
MessageId.latest, false, null);

if (partitioned) {
PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic);
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
.getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));
}

// properties are never null, but an empty map
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i)
.getSubscriptions().get(subscriptionName2);
assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty());
}

// aggregated properties
SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false)
.getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));

} else {
SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo"));

SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2);
assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic
.createSubscription("test",
CommandSubscribe.InitialPosition.Earliest, false).get();
CommandSubscribe.InitialPosition.Earliest, false, null).get();

ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
doReturn(true).when(managedCursor).hasMoreEntries();
Expand Down
Loading

0 comments on commit e2fa189

Please sign in to comment.