Skip to content

Commit

Permalink
[feat] Add startPaused setting to consumer (apache#416)
Browse files Browse the repository at this point in the history
### Motivation

The Java client consumer has a setting called `startPaused`. If this is set to true, the created consumer will not fetch messages from the broker until resume is called. Currently, this setting does not seem to exist in the C++ client consumer, so I will add it.
  • Loading branch information
massakam authored Mar 14, 2024
1 parent ee1d7b9 commit 234a55d
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 3 deletions.
15 changes: 15 additions & 0 deletions include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,21 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
bool isAckReceiptEnabled() const;

/**
* Starts the consumer in a paused state.
*
* When enabled, the consumer does not immediately fetch messages when the consumer is created.
* Instead, the consumer waits to fetch messages until Consumer::resumeMessageListener is called.
*
* Default: false
*/
ConsumerConfiguration& setStartPaused(bool startPaused);

/**
* The associated getter of setStartPaused.
*/
bool isStartPaused() const;

friend class PulsarWrapper;
friend class PulsarFriend;

Expand Down
6 changes: 6 additions & 0 deletions include/pulsar/c/consumer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,12 @@ PULSAR_PUBLIC pulsar_consumer_regex_subscription_mode
pulsar_consumer_configuration_get_regex_subscription_mode(
pulsar_consumer_configuration_t *consumer_configuration);

PULSAR_PUBLIC void pulsar_consumer_configuration_set_start_paused(
pulsar_consumer_configuration_t *consumer_configuration, int start_paused);

PULSAR_PUBLIC int pulsar_consumer_configuration_is_start_paused(
pulsar_consumer_configuration_t *consumer_configuration);

/**
* Set batch receive policy.
*
Expand Down
7 changes: 7 additions & 0 deletions lib/ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,13 @@ ConsumerConfiguration& ConsumerConfiguration::setAckReceiptEnabled(bool ackRecei

bool ConsumerConfiguration::isAckReceiptEnabled() const { return impl_->ackReceiptEnabled; }

ConsumerConfiguration& ConsumerConfiguration::setStartPaused(bool startPaused) {
impl_->startPaused = startPaused;
return *this;
}

bool ConsumerConfiguration::isStartPaused() const { return impl_->startPaused; }

ConsumerConfiguration& ConsumerConfiguration::setRegexSubscriptionMode(
RegexSubscriptionMode regexSubscriptionMode) {
impl_->regexSubscriptionMode = regexSubscriptionMode;
Expand Down
1 change: 1 addition & 0 deletions lib/ConsumerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ struct ConsumerConfigurationImpl {
bool batchIndexAckEnabled{false};
std::vector<ConsumerInterceptorPtr> interceptors;
bool ackReceiptEnabled{false};
bool startPaused{false};
};
} // namespace pulsar
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
2 changes: 1 addition & 1 deletion lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2),
consumerId_(client->newConsumerId()),
consumerStr_("[" + topic + ", " + subscriptionName + ", " + std::to_string(consumerId_) + "] "),
messageListenerRunning_(true),
messageListenerRunning_(!conf.isStartPaused()),
negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client, *this, conf)),
readCompacted_(conf.isReadCompacted()),
startMessageId_(startMessageId),
Expand Down
9 changes: 9 additions & 0 deletions lib/c/c_ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,15 @@ pulsar_consumer_regex_subscription_mode pulsar_consumer_configuration_get_regex_
consumer_configuration->consumerConfiguration.getRegexSubscriptionMode();
}

void pulsar_consumer_configuration_set_start_paused(pulsar_consumer_configuration_t *consumer_configuration,
int start_paused) {
consumer_configuration->consumerConfiguration.setStartPaused(start_paused);
}

int pulsar_consumer_configuration_is_start_paused(pulsar_consumer_configuration_t *consumer_configuration) {
return consumer_configuration->consumerConfiguration.isStartPaused();
}

int pulsar_consumer_configuration_set_batch_receive_policy(
pulsar_consumer_configuration_t *consumer_configuration,
const pulsar_consumer_batch_receive_policy_t *batch_receive_policy_t) {
Expand Down
56 changes: 54 additions & 2 deletions tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -917,8 +917,7 @@ TEST(BasicEndToEndTest, testMessageListenerPause) {
std::string topicName = "partition-testMessageListenerPause";

// call admin api to make it partitioned
std::string url =
adminUrl + "admin/v2/persistent/public/default/partition-testMessageListener-pauses/partitions";
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
int res = makePutRequest(url, "5");

LOG_INFO("res = " << res);
Expand Down Expand Up @@ -968,6 +967,59 @@ TEST(BasicEndToEndTest, testMessageListenerPause) {
client.close();
}

void testStartPaused(bool isPartitioned) {
Client client(lookupUrl);
std::string topicName =
isPartitioned ? "testStartPausedWithPartitionedTopic" : "testStartPausedWithNonPartitionedTopic";
std::string subName = "sub";

if (isPartitioned) {
// Call admin api to make it partitioned
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
int res = makePutRequest(url, "5");
LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);
}

Producer producer;
Result result = client.createProducer(topicName, producer);

// Initializing global Count
globalCount = 0;

ConsumerConfiguration consumerConfig;
consumerConfig.setMessageListener(
std::bind(messageListenerFunction, std::placeholders::_1, std::placeholders::_2));
consumerConfig.setStartPaused(true);
Consumer consumer;
// Removing dangling subscription from previous test failures
result = client.subscribe(topicName, subName, consumerConfig, consumer);
consumer.unsubscribe();

result = client.subscribe(topicName, subName, consumerConfig, consumer);
ASSERT_EQ(ResultOk, result);

int numOfMessages = 50;
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = "msg-" + std::to_string(i);
Message msg = MessageBuilder().setContent(messageContent).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::this_thread::sleep_for(std::chrono::microseconds(2 * 1000 * 1000));
ASSERT_EQ(globalCount, 0);
consumer.resumeMessageListener();
ASSERT_TRUE(waitUntil(std::chrono::seconds(5), [&]() -> bool { return globalCount >= numOfMessages; }));

consumer.unsubscribe();
producer.close();
client.close();
}

TEST(BasicEndToEndTest, testStartPausedWithNonPartitionedTopic) { testStartPaused(false); }

TEST(BasicEndToEndTest, testStartPausedWithPartitionedTopic) { testStartPaused(true); }

TEST(BasicEndToEndTest, testResendViaSendCallback) {
ClientConfiguration clientConfiguration;
clientConfiguration.setIOThreads(1);
Expand Down

0 comments on commit 234a55d

Please sign in to comment.