Skip to content

Commit 765a42b

Browse files
committed
handle closing
1 parent 89ba0a8 commit 765a42b

File tree

12 files changed

+85
-25
lines changed

12 files changed

+85
-25
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Experimental client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rab
88
---
99

1010
```
11-
go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v0.9-alpha
11+
go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v0.10-alpha
1212
```
1313

1414
### Getting started

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.9-alpha
1+
0.10-alpha

pkg/ha/ha_publisher.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,6 @@ func NewHAProducer(env *stream.Environment, streamName string, producerOptions *
7373

7474
func (p *ReliableProducer) newProducer() error {
7575

76-
//if p.producer != nil && len(p.producer.GetUnConfirmed()) > 0 {
77-
// for _, msg := range p.producer.GetUnConfirmed() {
78-
// msg.Confirmed = false
79-
// p.channelPublishConfirm <- []*stream.UnConfirmedMessage{msg}
80-
// }
81-
//
82-
//}
83-
8476
producer, err := p.env.NewProducer(p.streamName, p.producerOptions)
8577
if err != nil {
8678
return err

pkg/stream/constants.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import (
1111
// for example the producer status
1212

1313
const (
14-
running = iota
15-
closed = iota
14+
open = iota
15+
closed = iota
1616
)
1717

1818
const initBufferPublishSize = 2 + 2 + 1 + 4
@@ -79,11 +79,13 @@ const (
7979
defaultSocketBuffer = 4096
8080

8181
//
82-
ClientVersion = "0.9-alpha"
82+
ClientVersion = "0.10-alpha"
8383

8484
StreamTcpPort = "5552"
8585
)
8686

87+
var AlreadyClosed = errors.New("Already Closed")
88+
8789
var PreconditionFailed = errors.New("Precondition Failed")
8890
var AuthenticationFailure = errors.New("Authentication Failure")
8991
var StreamDoesNotExist = errors.New("Stream Does Not Exist")

pkg/stream/consumer.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,21 @@ type Consumer struct {
1818
// different form ConsumerOptions.offset. ConsumerOptions.offset is just the configuration
1919
// and won't change. currentOffset is the status of the offset
2020
currentOffset int64
21-
CloseHandler chan Event
21+
closeHandler chan Event
22+
23+
status int
24+
}
25+
26+
func (consumer *Consumer) setStatus(status int) {
27+
consumer.mutex.Lock()
28+
defer consumer.mutex.Unlock()
29+
consumer.status = status
30+
}
31+
32+
func (consumer *Consumer) getStatus() int {
33+
consumer.mutex.Lock()
34+
defer consumer.mutex.Unlock()
35+
return consumer.status
2236
}
2337

2438
func (consumer *Consumer) GetStreamName() string {
@@ -50,7 +64,7 @@ func (consumer *Consumer) GetOffset() int64 {
5064

5165
func (consumer *Consumer) NotifyClose() ChannelClose {
5266
ch := make(chan Event, 1)
53-
consumer.CloseHandler = ch
67+
consumer.closeHandler = ch
5468
return ch
5569
}
5670

@@ -105,6 +119,10 @@ func (c *Client) credit(subscriptionId byte, credit int16) {
105119
}
106120

107121
func (consumer *Consumer) Close() error {
122+
if consumer.getStatus() == closed {
123+
return AlreadyClosed
124+
}
125+
consumer.setStatus(closed)
108126
_, errGet := consumer.options.client.coordinator.GetConsumerById(consumer.ID)
109127
if errGet != nil {
110128
return nil

pkg/stream/consumer_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,4 +239,31 @@ var _ = Describe("Streaming Consumers", func() {
239239
Expect(err).NotTo(HaveOccurred())
240240
})
241241

242+
It("Check already closed", func() {
243+
producer, err := env.NewProducer(streamName, nil)
244+
Expect(err).NotTo(HaveOccurred())
245+
err = producer.BatchSend(CreateArrayMessagesForTesting(500)) // batch send
246+
Expect(err).NotTo(HaveOccurred())
247+
defer func(producer *Producer) {
248+
err := producer.Close()
249+
Expect(err).NotTo(HaveOccurred())
250+
}(producer)
251+
252+
var messagesCount int32 = 0
253+
consumer, err := env.NewConsumer(streamName,
254+
func(consumerContext ConsumerContext, message *amqp.Message) {
255+
if atomic.AddInt32(&messagesCount, 1) >= 250 {
256+
err := consumerContext.Consumer.Close()
257+
if err != nil {
258+
return
259+
}
260+
}
261+
}, NewConsumerOptions().SetOffset(OffsetSpecification{}.First()).SetConsumerName("consumer_test"))
262+
Expect(err).NotTo(HaveOccurred())
263+
time.Sleep(500 * time.Millisecond)
264+
err = consumer.Close()
265+
Expect(err).To(Equal(AlreadyClosed))
266+
267+
})
268+
242269
})

pkg/stream/coordinator.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (coordinator *Coordinator) NewProducer(
5656
options: parameters,
5757
mutex: &sync.Mutex{},
5858
unConfirmedMessages: map[int64]*UnConfirmedMessage{},
59-
status: running,
59+
status: open,
6060
messageSequenceCh: make(chan messageSequence, size),
6161
pendingMessages: pendingMessagesSequence{
6262
messages: make([]messageSequence, 0),
@@ -74,8 +74,8 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event)
7474
reason.StreamName = consumer.GetStreamName()
7575
reason.Name = consumer.GetName()
7676

77-
if consumer.CloseHandler != nil {
78-
consumer.CloseHandler <- reason
77+
if consumer.closeHandler != nil {
78+
consumer.closeHandler <- reason
7979
}
8080
return coordinator.removeById(id, coordinator.consumers)
8181
}
@@ -121,7 +121,7 @@ func (coordinator *Coordinator) ProducersCount() int {
121121
func newResponse(commandDescription string) *Response {
122122
res := &Response{}
123123
res.commandDescription = commandDescription
124-
res.code = make(chan Code)
124+
res.code = make(chan Code, 1)
125125
res.data = make(chan interface{})
126126
res.messages = make(chan []*amqp.Message, 100)
127127
return res
@@ -188,7 +188,9 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler,
188188
defer coordinator.mutex.Unlock()
189189
var lastId, _ = coordinator.getNextConsumerItem()
190190
var item = &Consumer{ID: lastId, options: parameters,
191-
response: newResponse(lookUpCommand(commandSubscribe)), mutex: &sync.Mutex{},
191+
response: newResponse(lookUpCommand(commandSubscribe)),
192+
status: open,
193+
mutex: &sync.Mutex{},
192194
MessagesHandler: messagesHandler,
193195
}
194196

pkg/stream/dialer_posix.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,4 @@ func init() {
3030

3131
})
3232
}
33-
}
33+
}

pkg/stream/dialer_windows.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,4 @@ func init() {
3030

3131
})
3232
}
33-
}
33+
}

pkg/stream/producer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,9 +302,12 @@ func (producer *Producer) FlushUnConfirmedMessages() {
302302
}
303303

304304
func (producer *Producer) Close() error {
305+
if producer.getStatus() == closed {
306+
return AlreadyClosed
307+
}
305308
producer.setStatus(closed)
306309
if !producer.options.client.socket.isOpen() {
307-
return fmt.Errorf("connection already closed")
310+
return fmt.Errorf("tcp connection is closed")
308311
}
309312

310313
err := producer.options.client.deletePublisher(producer.ID)

0 commit comments

Comments
 (0)