Skip to content

Commit

Permalink
Add subscription properties for ConsumerOptions (apache#671)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolongran <xiaolongran@tencent.com>

### Motivation

In apache/pulsar#12869, we introduce pluggable entry filter in Dispatcher, the pull request is the Go SDK implementation of this PIP


### Modifications

*Describe the modifications you've done.*

- Add subscription properties for ConsumerOptions 
- Update `pulsarApi.proto` file
  • Loading branch information
wolfstudy authored Nov 29, 2021
1 parent 7773d27 commit 6385727
Show file tree
Hide file tree
Showing 5 changed files with 1,274 additions and 461 deletions.
6 changes: 6 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ type ConsumerOptions struct {
// Those properties will be visible in the topic stats
Properties map[string]string

// SubscriptionProperties specify the subscription properties for this subscription.
//
// > Notice: SubscriptionProperties are immutable, and consumers under the same subscription will fail to create a
// > subscription if they use different properties.
SubscriptionProperties map[string]string

// Type specifies the subscription type to be used when subscribing to a topic.
// Default is `Exclusive`
Type SubscriptionType
Expand Down
2 changes: 2 additions & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {

receiverQueueSize := c.options.ReceiverQueueSize
metadata := c.options.Properties
subProperties := c.options.SubscriptionProperties

startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions
Expand Down Expand Up @@ -333,6 +334,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: c.options.NackBackoffPolicy,
metadata: metadata,
subProperties: subProperties,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: trackingMessageID{},
subscriptionMode: durable,
Expand Down
6 changes: 6 additions & 0 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type partitionConsumerOpts struct {
nackRedeliveryDelay time.Duration
nackBackoffPolicy NackBackoffPolicy
metadata map[string]string
subProperties map[string]string
replicateSubscriptionState bool
startMessageID trackingMessageID
startMessageIDInclusive bool
Expand Down Expand Up @@ -1058,6 +1059,7 @@ func (pc *partitionConsumer) grabConn() error {
PriorityLevel: nil,
Durable: proto.Bool(pc.options.subscriptionMode == durable),
Metadata: internal.ConvertFromStringMap(pc.options.metadata),
SubscriptionProperties: internal.ConvertFromStringMap(pc.options.subProperties),
ReadCompacted: proto.Bool(pc.options.readCompacted),
Schema: pbSchema,
InitialPosition: initialPosition.Enum(),
Expand All @@ -1075,6 +1077,10 @@ func (pc *partitionConsumer) grabConn() error {
cmdSubscribe.Metadata = toKeyValues(pc.options.metadata)
}

if len(pc.options.subProperties) > 0 {
cmdSubscribe.SubscriptionProperties = toKeyValues(pc.options.subProperties)
}

// force topic creation is enabled by default so
// we only need to set the flag when disabling it
if pc.options.disableForceTopicCreation {
Expand Down
Loading

0 comments on commit 6385727

Please sign in to comment.