Skip to content

Commit dd20421

Browse files
rsperlRichard SuggGsantomaggio
authored
do not panic during chunk dispatching if consumer suddenly closed (#393)
* do not panic during chunk dispatching if consumer suddenly closed --------- Co-authored-by: Richard Sugg <richard.sugg@sas.com> Co-authored-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent fa4a8a1 commit dd20421

File tree

1 file changed

+10
-4
lines changed

1 file changed

+10
-4
lines changed

pkg/stream/server_frame.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package stream
33
import (
44
"bufio"
55
"bytes"
6-
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
7-
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
86
"hash/crc32"
97
"io"
108
"time"
9+
10+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
11+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
1112
)
1213

1314
type ReaderProtocol struct {
@@ -281,7 +282,6 @@ func (c *Client) queryPublisherSequenceFrameHandler(readProtocol *ReaderProtocol
281282
res.data <- sequence
282283
}
283284
func (c *Client) handleDeliver(r *bufio.Reader) {
284-
285285
subscriptionId := readByte(r)
286286
consumer, err := c.coordinator.GetConsumerById(subscriptionId)
287287
consumerFound := err == nil
@@ -407,7 +407,13 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
407407
// dispatch the messages with offset to the consumer
408408
chunk.offsetMessages = batchConsumingMessages
409409
if consumer.getStatus() == open {
410-
consumer.chunkForConsumer <- chunk
410+
select {
411+
case consumer.chunkForConsumer <- chunk:
412+
default:
413+
logs.LogDebug("The consumer %s for the stream %s reports as open but is probably "+
414+
"closed during chunk dispatching. Messages won't be dispatched. ",
415+
consumer.GetName(), consumer.GetStreamName())
416+
}
411417
} else {
412418
logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+
413419
"Messages won't dispatched", consumer.GetName(), consumer.GetStreamName())

0 commit comments

Comments
 (0)