diff --git a/queue.go b/queue.go new file mode 100644 index 000000000000..eb36ca9e9179 --- /dev/null +++ b/queue.go @@ -0,0 +1,102 @@ +package servicebus + +import ( + "context" + mgmt "github.com/Azure/azure-sdk-for-go/services/servicebus/mgmt/2017-04-01/servicebus" + "github.com/Azure/go-autorest/autorest" + log "github.com/sirupsen/logrus" +) + +// QueueOption represents named options for assisting queue creation +type QueueOption func(queue *mgmt.SBQueue) error + +/* +QueueWithPartitioning ensure the created queue will be a partitioned queue. Partitioned queues offer increased +storage and availability compared to non-partitioned queues with the trade-off of requiring the following to ensure +FIFO message retreival: + +SessionId. If a message has the SessionId property set, then Service Bus uses the SessionId property as the +partition key. This way, all messages that belong to the same session are assigned to the same fragment and handled +by the same message broker. This allows Service Bus to guarantee message ordering as well as the consistency of +session states. + +PartitionKey. If a message has the PartitionKey property set but not the SessionId property, then Service Bus uses +the PartitionKey property as the partition key. Use the PartitionKey property to send non-sessionful transactional +messages. The partition key ensures that all messages that are sent within a transaction are handled by the same +messaging broker. + +MessageId. If the queue or topic has the RequiresDuplicationDetection property set to true, then the MessageId +property serves as the partition key if the SessionId or a PartitionKey properties are not set. This ensures that +all copies of the same message are handled by the same message broker and, thus, allows Service Bus to detect and +eliminate duplicate messages +*/ +func QueueWithPartitioning() QueueOption { + return func(queue *mgmt.SBQueue) error { + queue.SBQueueProperties.EnablePartitioning = ptrBool(true) + return nil + } +} + +// QueueWithDuplicateDetection will ensure the queue has the ability to detected duplicate messages based on +// the message's MessageID +func QueueWithDuplicateDetection() QueueOption { + return func(queue *mgmt.SBQueue) error { + queue.SBQueueProperties.RequiresDuplicateDetection = ptrBool(true) + return nil + } +} + +// QueueWithRequiredSessions will ensure the queue requires senders and receivers to have sessionIDs +func QueueWithRequiredSessions() QueueOption { + return func(queue *mgmt.SBQueue) error { + queue.SBQueueProperties.RequiresSession = ptrBool(true) + return nil + } +} + +// QueueWithMessageExpiration will ensure the queue sends expired messages to the dead letter queue +func QueueWithMessageExpiration() QueueOption { + return func(queue *mgmt.SBQueue) error { + queue.DeadLetteringOnMessageExpiration = ptrBool(true) + return nil + } +} + +// EnsureQueue makes sure a queue exists in the given namespace. If the queue doesn't exist, it will create it with +// the specified name and properties. If properties are not specified, it will build a default partitioned queue. +func (sb *serviceBus) EnsureQueue(ctx context.Context, queueName string, opts ...QueueOption) (*mgmt.SBQueue, error) { + log.Debugf("ensuring exists queue %s", queueName) + queueClient := sb.getQueueMgmtClient() + queue, err := queueClient.Get(ctx, sb.resourceGroup, sb.namespace, queueName) + + // TODO: check if the queue properties are the same as the requested. If not, throw error or build new queue?? + if err != nil { + newQueue := &mgmt.SBQueue{ + Name: &queueName, + SBQueueProperties: &mgmt.SBQueueProperties{}, + } + + for _, opt := range opts { + opt(newQueue) + } + + queue, err = queueClient.CreateOrUpdate(ctx, sb.resourceGroup, sb.namespace, queueName, *newQueue) + if err != nil { + return nil, err + } + } + return &queue, nil +} + +// DeleteQueue deletes an existing queue +func (sb *serviceBus) DeleteQueue(ctx context.Context, queueName string) error { + queueClient := sb.getQueueMgmtClient() + _, err := queueClient.Delete(ctx, sb.resourceGroup, sb.namespace, queueName) + return err +} + +func (sb *serviceBus) getQueueMgmtClient() mgmt.QueuesClient { + client := mgmt.NewQueuesClientWithBaseURI(sb.environment.ResourceManagerEndpoint, sb.subscriptionID) + client.Authorizer = autorest.NewBearerAuthorizer(sb.token) + return client +} diff --git a/receiver.go b/receiver.go index f80b8c07042d..f6df7f3c3cb0 100644 --- a/receiver.go +++ b/receiver.go @@ -105,15 +105,18 @@ func (r *Receiver) listenForMessages(msgChan chan *amqp.Message) { close(msgChan) return default: - log.Debug("attempting to receive messages") - waitCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + //log.Debug("attempting to receive messages") + waitCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) msg, err := r.receiver.Receive(waitCtx) cancel() + + // TODO: handle receive errors better. It's not sufficient to check only for timeout if err, ok := err.(net.Error); ok && err.Timeout() { log.Debug("attempting to receive messages timed out") continue } else if err != nil { log.Fatalln(err) + time.Sleep(10 * time.Second) } if msg != nil { id := interface{}("null") diff --git a/sender.go b/sender.go index 8ef334adc7d9..bfe1b601b4ae 100644 --- a/sender.go +++ b/sender.go @@ -58,10 +58,14 @@ func (s *Sender) Close() error { return nil } -// Send will send a message using the session and link -func (s *Sender) Send(ctx context.Context, msg *amqp.Message) error { +// Send will send a message to the entity path with options +func (s *Sender) Send(ctx context.Context, msg *amqp.Message, opts ...SendOption) error { // TODO: Add in recovery logic in case the link / session has gone down s.prepareMessage(msg) + for _, opt := range opts { + opt(msg) + } + err := s.sender.Send(ctx, msg) if err != nil { return err @@ -95,3 +99,24 @@ func (s *Sender) newSessionAndLink() error { s.sender = amqpSender return nil } + +// SendOption provides a way to customize a message on sending +type SendOption func(message *amqp.Message) error + +// SendWithMessageID provides an option of adding a message ID for the sent message +func SendWithMessageID(msgID interface{}) SendOption { + return func(msg *amqp.Message) error { + msg.Properties.MessageID = msgID + return nil + } +} + +// SendWithoutSessionID will set the SessionID to nil. If sending to a partitioned Service Bus queue, this will cause +// the queue distributed the message in a round robin fashion to the next available partition with the effect of not +// enforcing FIFO ordering of messages, but enabling more efficient distribution of messages across partitions. +func SendWithoutSessionID() SendOption { + return func(msg *amqp.Message) error { + msg.Properties.GroupID = "" + return nil + } +} diff --git a/servicebus.go b/servicebus.go index 70f5abed8aa2..8ce65acf0b3b 100644 --- a/servicebus.go +++ b/servicebus.go @@ -19,14 +19,14 @@ var ( // SenderReceiver provides the ability to send and receive messages type SenderReceiver interface { - Send(ctx context.Context, entityPath string, msg *amqp.Message) error + Send(ctx context.Context, entityPath string, msg *amqp.Message, opts ...SendOption) error Receive(entityPath string, handler Handler) error Close() error } // EntityManager provides the ability to manage Service Bus entities (Queues, Topics, Subscriptions, etc.) type EntityManager interface { - EnsureQueue(ctx context.Context, queueName string, properties *mgmt.SBQueueProperties) (*mgmt.SBQueue, error) + EnsureQueue(ctx context.Context, queueName string, opts ...QueueOption) (*mgmt.SBQueue, error) DeleteQueue(ctx context.Context, queueName string) error } @@ -215,14 +215,14 @@ func (sb *serviceBus) Receive(entityPath string, handler Handler) error { return nil } -// Send sends a message to a provided entity path. -func (sb *serviceBus) Send(ctx context.Context, entityPath string, msg *amqp.Message) error { +// Send sends a message to a provided entity path with options +func (sb *serviceBus) Send(ctx context.Context, entityPath string, msg *amqp.Message, opts ...SendOption) error { sender, err := sb.fetchSender(entityPath) if err != nil { return err } - return sender.Send(ctx, msg) + return sender.Send(ctx, msg, opts...) } func (sb *serviceBus) fetchSender(entityPath string) (*Sender, error) { @@ -231,7 +231,6 @@ func (sb *serviceBus) fetchSender(entityPath string) (*Sender, error) { entry, ok := sb.senders[entityPath] if ok { - log.Debugf("found sender for entity path %s", entityPath) return entry, nil } @@ -243,45 +242,3 @@ func (sb *serviceBus) fetchSender(entityPath string) (*Sender, error) { sb.senders[entityPath] = sender return sender, nil } - -// EnsureQueue makes sure a queue exists in the given namespace. If the queue doesn't exist, it will create it with -// the specified name and properties. If properties are not specified, it will build a default partitioned queue. -func (sb *serviceBus) EnsureQueue(ctx context.Context, queueName string, properties *mgmt.SBQueueProperties) (*mgmt.SBQueue, error) { - log.Debugf("ensuring exists queue %s", queueName) - queueClient := sb.getQueueMgmtClient() - queue, err := queueClient.Get(ctx, sb.resourceGroup, sb.namespace, queueName) - // TODO: check if the queue properties are the same as the requested. If not, throw error or build new queue?? - - if properties == nil { - log.Debugf("no properties specified, so using default partitioned queue for %s", queueName) - properties = &mgmt.SBQueueProperties{ - EnablePartitioning: ptrBool(true), - } - } - - if err != nil { - log.Debugf("building a new queue %s", queueName) - newQueue := mgmt.SBQueue{ - Name: &queueName, - SBQueueProperties: properties, - } - queue, err = queueClient.CreateOrUpdate(ctx, sb.resourceGroup, sb.namespace, queueName, newQueue) - if err != nil { - return nil, err - } - } - return &queue, nil -} - -// DeleteQueue deletes an existing queue -func (sb *serviceBus) DeleteQueue(ctx context.Context, queueName string) error { - queueClient := sb.getQueueMgmtClient() - _, err := queueClient.Delete(ctx, sb.resourceGroup, sb.namespace, queueName) - return err -} - -func (sb *serviceBus) getQueueMgmtClient() mgmt.QueuesClient { - client := mgmt.NewQueuesClientWithBaseURI(sb.environment.ResourceManagerEndpoint, sb.subscriptionID) - client.Authorizer = autorest.NewBearerAuthorizer(sb.token) - return client -} diff --git a/servicebus_test.go b/servicebus_test.go index c96526143e5a..ddccbf90639d 100644 --- a/servicebus_test.go +++ b/servicebus_test.go @@ -7,6 +7,7 @@ import ( "github.com/Azure/go-autorest/autorest" "github.com/Azure/go-autorest/autorest/adal" "github.com/Azure/go-autorest/autorest/azure" + "github.com/satori/go.uuid" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" @@ -69,8 +70,9 @@ func (suite *ServiceBusSuite) TearDownSuite() { func (suite *ServiceBusSuite) TestQueue() { tests := map[string]func(*testing.T, SenderReceiver, string){ - "SimpleSend": testQueueSend, - "SendAndReceive": testQueueSendAndReceive, + "SimpleSend": testQueueSend, + "SendAndReceive": testQueueSendAndReceive, + "DuplicateDetection": testDuplicateDetection, } spToken := suite.servicePrincipalToken() @@ -86,7 +88,11 @@ func (suite *ServiceBusSuite) TestQueue() { for name, testFunc := range tests { queueName := randomName("gosbtest", 10) - _, err := sb.EnsureQueue(context.Background(), queueName, nil) + _, err := sb.EnsureQueue( + context.Background(), + queueName, + QueueWithPartitioning(), + QueueWithDuplicateDetection()) if err != nil { log.Fatalln(err) } @@ -109,8 +115,6 @@ func testQueueSendAndReceive(t *testing.T, sb SenderReceiver, queueName string) numMessages := rand.Intn(100) + 20 var wg sync.WaitGroup wg.Add(numMessages + 1) - log.Debugf("SendAndReceive: sending and receiving %d messages", numMessages) - messages := make([]string, numMessages) for i := 0; i < numMessages; i++ { messages[i] = randomName("hello", 10) @@ -139,6 +143,49 @@ func testQueueSendAndReceive(t *testing.T, sb SenderReceiver, queueName string) wg.Wait() } +func testDuplicateDetection(t *testing.T, sb SenderReceiver, queueName string) { + dupID := uuid.NewV4().String() + messages := []struct { + ID string + Data string + }{ + { + ID: dupID, + Data: "hello 1!", + }, + { + ID: dupID, + Data: "hello duplicate!", + }, + { + ID: uuid.NewV4().String(), + Data: "hello 2!", + }, + } + + var wg sync.WaitGroup + wg.Add(3) + go func() { + for _, msg := range messages { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + err := sb.Send(ctx, queueName, &amqp.Message{Data: []byte(msg.Data)}, SendWithMessageID(msg.ID)) + cancel() + if err != nil { + log.Fatalln(err) + } + } + defer wg.Done() + }() + + sb.Receive(queueName, func(ctx context.Context, msg *amqp.Message) error { + // we should get 2 messages discarding the duplicate ID + assert.NotEqual(t, messages[1].Data, string(msg.Data)) + wg.Done() + return nil + }) + wg.Wait() +} + func TestServiceBusSuite(t *testing.T) { suite.Run(t, new(ServiceBusSuite)) } @@ -186,37 +233,6 @@ func BenchmarkSend(b *testing.B) { } } -//func BenchmarkReceive(b *testing.B) { -// sbSuite := &ServiceBusSuite{} -// sbSuite.SetupSuite() -// defer sbSuite.TearDownSuite() -// -// spToken := sbSuite.servicePrincipalToken() -// sb, err := NewWithSPToken(spToken, sbSuite.SubscriptionID, ResourceGroupName, sbSuite.Namespace, RootRuleName, sbSuite.Environment) -// if err != nil { -// log.Fatalln(err) -// } -// -// queueName := randomName("gosbbench", 10) -// _, err = sb.EnsureQueue(context.Background(), queueName, nil) -// if err != nil { -// log.Fatalln(err) -// } -// -// for i := 0; i < b.N; i++ { -// sb.Send(context.Background(), queueName, &amqp.Message{ -// Data: []byte("Hello!"), -// }) -// } -// -// b.ResetTimer() -// sb.Listen(queueName, func(ctx context.Context, msg *amqp.Message) error { -// -// }) -// -// b.StopTimer() -//} - func mustGetenv(key string) string { v := os.Getenv(key) if v == "" { @@ -264,7 +280,6 @@ func (suite *ServiceBusSuite) getServiceBusNamespaceClient() *sbmgmt.NamespacesC } func (suite *ServiceBusSuite) ensureProvisioned(tier sbmgmt.SkuTier) error { - log.Debug("ensuring test resource group is provisioned") groupsClient := suite.getRmGroupClient() location := Location _, err := groupsClient.CreateOrUpdate(context.Background(), ResourceGroupName, rm.Group{Location: &location}) @@ -275,7 +290,6 @@ func (suite *ServiceBusSuite) ensureProvisioned(tier sbmgmt.SkuTier) error { nsClient := suite.getServiceBusNamespaceClient() _, err = nsClient.Get(context.Background(), ResourceGroupName, suite.Namespace) if err != nil { - log.Debug("namespace is not there, create it") ns := sbmgmt.SBNamespace{ Sku: &sbmgmt.SBSku{ Name: sbmgmt.SkuName(tier), @@ -288,10 +302,8 @@ func (suite *ServiceBusSuite) ensureProvisioned(tier sbmgmt.SkuTier) error { return err } - log.Debug("waiting for namespace to provision") return res.WaitForCompletion(context.Background(), nsClient.Client) } - log.Debug("namespace was already provisioned") return nil }