From 6edc8f4ef954477eb36eed92e88464f56b4c48ee Mon Sep 17 00:00:00 2001 From: cckellogg Date: Sat, 25 Apr 2020 06:39:51 -0700 Subject: [PATCH] Add flag to disable forced topic creation. (#226) This flag is needed for the regex consumer. --- pulsar/consumer_impl.go | 32 +++++++++++++++++--------------- pulsar/consumer_partition.go | 7 +++++++ pulsar/consumer_regex.go | 2 +- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index ba7d24d00c..3a27defcfd 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -42,11 +42,12 @@ type acker interface { type consumer struct { sync.Mutex - topic string - client *client - options ConsumerOptions - consumers []*partitionConsumer - consumerName string + topic string + client *client + options ConsumerOptions + consumers []*partitionConsumer + consumerName string + disableForceTopicCreation bool // channel used to deliver message to clients messageCh chan ConsumerMessage @@ -123,17 +124,18 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } func newInternalConsumer(client *client, options ConsumerOptions, topic string, - messageCh chan ConsumerMessage, dlq *dlqRouter) (*consumer, error) { + messageCh chan ConsumerMessage, dlq *dlqRouter, disableForceTopicCreation bool) (*consumer, error) { consumer := &consumer{ - topic: topic, - client: client, - options: options, - messageCh: messageCh, - closeCh: make(chan struct{}), - errorCh: make(chan error), - dlq: dlq, - log: log.WithField("topic", topic), + topic: topic, + client: client, + options: options, + disableForceTopicCreation: disableForceTopicCreation, + messageCh: messageCh, + closeCh: make(chan struct{}), + errorCh: make(chan error), + dlq: dlq, + log: log.WithField("topic", topic), } if options.Name != "" { @@ -275,7 +277,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { func topicSubscribe(client *client, options ConsumerOptions, topic string, messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) { - return newInternalConsumer(client, options, topic, messageCh, dlqRouter) + return newInternalConsumer(client, options, topic, messageCh, dlqRouter, false) } func (c *consumer) Subscription() string { diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 33fc605124..498f2ae006 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -76,6 +76,7 @@ type partitionConsumerOpts struct { startMessageIDInclusive bool subscriptionMode subscriptionMode readCompacted bool + disableForceTopicCreation bool } type partitionConsumer struct { @@ -748,6 +749,12 @@ func (pc *partitionConsumer) grabConn() error { cmdSubscribe.Metadata = toKeyValues(pc.options.metadata) } + // force topic creation is enabled by default so + // we only need to set the flag when disabling it + if pc.options.disableForceTopicCreation { + cmdSubscribe.ForceTopicCreation = proto.Bool(false) + } + res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID, pb.BaseCommand_SUBSCRIBE, cmdSubscribe) diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 00cbb88c3d..3d0aebe434 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -361,7 +361,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum for _, t := range topics { go func(topic string) { defer wg.Done() - c, err := newInternalConsumer(c, opts, topic, ch, dlq) + c, err := newInternalConsumer(c, opts, topic, ch, dlq, true) consumerErrorCh <- consumerError{ err: err, topic: topic,