Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@ check: $(STATICCHECK)
$(STATICCHECK) ./pkg/stream

test: vet fmt check
go test --tags=debug -v ./pkg/stream -race -coverprofile=coverage.txt -covermode=atomic #-ginkgo.v
go test --tags=debug -v ./pkg/stream -coverprofile=coverage.txt -covermode=atomic #-ginkgo.v

build-all: vet fmt check build-darwin build-windows build-linux
go test --tags=debug -v -race ./pkg/stream -coverprofile=coverage.txt -covermode=atomic #-ginkgo.v

integration-test: vet fmt check
cd ./pkg/system_integration && go test -v . -race -coverprofile=coverage.txt -covermode=atomic -tags debug -timeout 99999s

build-%: vet fmt check
GOOS=$(*) GOARCH=amd64 go build -ldflags=$(LDFLAGS) -v ./...

build: vet fmt check
go build -ldflags=$(LDFLAGS) -v ./...

PERFTEST_FLAGS ?= --producers 1 --consumers 1
PERFTEST_FLAGS ?= --publishers 1 --consumers 1
perf-test-run: perf-test-build
go run perfTest/perftest.go silent $(PERFTEST_FLAGS)

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# GO stream client for RabbitMQ streaming queues
---
![Build](https://github.com/rabbitmq/rabbitmq-stream-go-client/workflows/Build/badge.svg)
[![codecov](https://codecov.io/gh/rabbitmq/rabbitmq-stream-go-client/branch/main/graph/badge.svg?token=HZD4S71QIM)](https://codecov.io/gh/rabbitmq/rabbitmq-stream-go-client)

Experimental client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/rabbitmq_stream)

### Download
---

```
go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v0.9-alpha
go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v0.10-alpha
```

### Getting started
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9-alpha
0.10-alpha
49 changes: 19 additions & 30 deletions examples/publishersError/publisherError.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"

//"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
Expand All @@ -20,14 +19,6 @@ func CheckErr(err error) {
}
}

func CreateArrayMessagesForTesting(bacthMessages int) []message.StreamMessage {
var arr []message.StreamMessage
for z := 0; z < bacthMessages; z++ {
arr = append(arr, amqp.NewMessage([]byte("hello_world_"+strconv.Itoa(z))))
}
return arr
}

func main() {
reader := bufio.NewReader(os.Stdin)

Expand All @@ -41,45 +32,43 @@ func main() {
SetUser("test").
SetPassword("test"))
CheckErr(err)
streamName := "no"
//err = env.DeclareStream(streamName,
// &stream.StreamOptions{
// MaxLengthBytes: stream.ByteCapacity{}.GB(2),
// },
//)
//CheckErr(err)
streamName := "pub-error"
err = env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)

producer, err := env.NewProducer(streamName, &stream.ProducerOptions{Name: "myProducer"})
CheckErr(err)

producer, err := env.NewProducer(streamName, stream.NewProducerOptions().SetProducerName("myProducer"))
CheckErr(err)

// This channel receives the callback in case the is some error during the
// publisher.
chPublishError := producer.NotifyPublishError()
handlePublishError(chPublishError)

go func() {
for i := 0; i < 100; i++ {
err := producer.BatchSend(CreateArrayMessagesForTesting(2))
CheckErr(err)
}
}()
// Here we close the producer during the publishing
// so an publish error is raised

err = producer.Close()
CheckErr(err)
for i := 0; i < 100; i++ {
msg := amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i)))
err := producer.Send(msg)
CheckErr(err)
}

fmt.Println("Press any key to stop ")
_, _ = reader.ReadString('\n')
CheckErr(err)
err = env.DeleteStream(streamName)
CheckErr(err)
err = env.Close()
CheckErr(err)

}

func handlePublishError(publishError stream.ChannelPublishError) {
go func() {
var totalMessages int32
for {
pError := <-publishError
for pError := range publishError {
atomic.AddInt32(&totalMessages, 1)
var data [][]byte
if pError.UnConfirmedMessage != nil {
Expand Down
8 changes: 0 additions & 8 deletions pkg/ha/ha_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,6 @@ func NewHAProducer(env *stream.Environment, streamName string, producerOptions *

func (p *ReliableProducer) newProducer() error {

//if p.producer != nil && len(p.producer.GetUnConfirmed()) > 0 {
// for _, msg := range p.producer.GetUnConfirmed() {
// msg.Confirmed = false
// p.channelPublishConfirm <- []*stream.UnConfirmedMessage{msg}
// }
//
//}

producer, err := p.env.NewProducer(p.streamName, p.producerOptions)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (br *Broker) mergeWithDefault() {
if br.Password == "" {
br.Password = broker.Password
}
if br.Port == "" {
if br.Port == "" || br.Port == "0" {
br.Port = broker.Port
}
if br.Scheme == "" {
Expand Down
15 changes: 15 additions & 0 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,17 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (
if options == nil {
options = NewProducerOptions()
}

if options.QueueSize < minQueuePublisherSize || options.QueueSize > maxQueuePublisherSize {
return nil, fmt.Errorf("QueueSize values must be between %d and %d",
minQueuePublisherSize, maxQueuePublisherSize)
}

if options.BatchSize < minBatchSize || options.BatchSize > maxBatchSize {
return nil, fmt.Errorf("BatchSize values must be between %d and %d",
minBatchSize, maxBatchSize)
}

producer, err := c.coordinator.NewProducer(&ProducerOptions{
client: c,
streamName: streamName,
Expand Down Expand Up @@ -621,6 +632,10 @@ func (c *Client) DeclareSubscriber(streamName string,
options = NewConsumerOptions()
}

if options.Offset.typeOfs <= 0 || options.Offset.typeOfs > 6 {
return nil, fmt.Errorf("specify a valid Offset")
}

options.client = c
options.streamName = streamName
consumer := c.coordinator.NewConsumer(messagesHandler, options)
Expand Down
18 changes: 13 additions & 5 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
// for example the producer status

const (
running = iota
closed = iota
open = iota
closed = iota
)

const initBufferPublishSize = 2 + 2 + 1 + 4
Expand Down Expand Up @@ -76,14 +76,22 @@ const (
LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f"

///
defaultSocketBuffer = 4096

defaultSocketBuffer = 4096
defaultQueuePublisherSize = 10000
minQueuePublisherSize = 100
maxQueuePublisherSize = 1_000_000

minBatchSize = 1
maxBatchSize = 10_000
defaultBatchSize = 100
//
ClientVersion = "0.9-alpha"
ClientVersion = "0.10-alpha"

StreamTcpPort = "5552"
)

var AlreadyClosed = errors.New("Already Closed")

var PreconditionFailed = errors.New("Precondition Failed")
var AuthenticationFailure = errors.New("Authentication Failure")
var StreamDoesNotExist = errors.New("Stream Does Not Exist")
Expand Down
24 changes: 21 additions & 3 deletions pkg/stream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,21 @@ type Consumer struct {
// different form ConsumerOptions.offset. ConsumerOptions.offset is just the configuration
// and won't change. currentOffset is the status of the offset
currentOffset int64
CloseHandler chan Event
closeHandler chan Event

status int
}

func (consumer *Consumer) setStatus(status int) {
consumer.mutex.Lock()
defer consumer.mutex.Unlock()
consumer.status = status
}

func (consumer *Consumer) getStatus() int {
consumer.mutex.Lock()
defer consumer.mutex.Unlock()
return consumer.status
}

func (consumer *Consumer) GetStreamName() string {
Expand Down Expand Up @@ -50,7 +64,7 @@ func (consumer *Consumer) GetOffset() int64 {

func (consumer *Consumer) NotifyClose() ChannelClose {
ch := make(chan Event, 1)
consumer.CloseHandler = ch
consumer.closeHandler = ch
return ch
}

Expand All @@ -60,7 +74,7 @@ type ConsumerContext struct {

type MessagesHandler func(consumerContext ConsumerContext, message *amqp.Message)

type ConsumerOptions struct {
type /**/ ConsumerOptions struct {
client *Client
ConsumerName string
streamName string
Expand Down Expand Up @@ -105,6 +119,10 @@ func (c *Client) credit(subscriptionId byte, credit int16) {
}

func (consumer *Consumer) Close() error {
if consumer.getStatus() == closed {
return AlreadyClosed
}
consumer.setStatus(closed)
_, errGet := consumer.options.client.coordinator.GetConsumerById(consumer.ID)
if errGet != nil {
return nil
Expand Down
37 changes: 37 additions & 0 deletions pkg/stream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,41 @@ var _ = Describe("Streaming Consumers", func() {
Expect(err).NotTo(HaveOccurred())
})

It("Check already closed", func() {
producer, err := env.NewProducer(streamName, nil)
Expect(err).NotTo(HaveOccurred())
err = producer.BatchSend(CreateArrayMessagesForTesting(500)) // batch send
Expect(err).NotTo(HaveOccurred())
defer func(producer *Producer) {
err := producer.Close()
Expect(err).NotTo(HaveOccurred())
}(producer)

var messagesCount int32 = 0
consumer, err := env.NewConsumer(streamName,
func(consumerContext ConsumerContext, message *amqp.Message) {
if atomic.AddInt32(&messagesCount, 1) >= 250 {
err := consumerContext.Consumer.Close()
if err != nil {
return
}
}
}, NewConsumerOptions().SetOffset(OffsetSpecification{}.First()).SetConsumerName("consumer_test"))
Expect(err).NotTo(HaveOccurred())
time.Sleep(500 * time.Millisecond)
err = consumer.Close()
Expect(err).To(Equal(AlreadyClosed))

})

It("Validation", func() {
_, err := env.NewConsumer(streamName,
func(consumerContext ConsumerContext, message *amqp.Message) {
}, &ConsumerOptions{
Offset: OffsetSpecification{},
})
Expect(err).To(HaveOccurred())

})

})
12 changes: 7 additions & 5 deletions pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (coordinator *Coordinator) NewProducer(
options: parameters,
mutex: &sync.Mutex{},
unConfirmedMessages: map[int64]*UnConfirmedMessage{},
status: running,
status: open,
messageSequenceCh: make(chan messageSequence, size),
pendingMessages: pendingMessagesSequence{
messages: make([]messageSequence, 0),
Expand All @@ -74,8 +74,8 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event)
reason.StreamName = consumer.GetStreamName()
reason.Name = consumer.GetName()

if consumer.CloseHandler != nil {
consumer.CloseHandler <- reason
if consumer.closeHandler != nil {
consumer.closeHandler <- reason
}
return coordinator.removeById(id, coordinator.consumers)
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func (coordinator *Coordinator) ProducersCount() int {
func newResponse(commandDescription string) *Response {
res := &Response{}
res.commandDescription = commandDescription
res.code = make(chan Code)
res.code = make(chan Code, 1)
res.data = make(chan interface{})
res.messages = make(chan []*amqp.Message, 100)
return res
Expand Down Expand Up @@ -188,7 +188,9 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler,
defer coordinator.mutex.Unlock()
var lastId, _ = coordinator.getNextConsumerItem()
var item = &Consumer{ID: lastId, options: parameters,
response: newResponse(lookUpCommand(commandSubscribe)), mutex: &sync.Mutex{},
response: newResponse(lookUpCommand(commandSubscribe)),
status: open,
mutex: &sync.Mutex{},
MessagesHandler: messagesHandler,
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/dialer_posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ func init() {

})
}
}
}
2 changes: 1 addition & 1 deletion pkg/stream/dialer_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ func init() {

})
}
}
}
Loading