Skip to content

Commit 498297d

Browse files
committed
Add limits check
fixes: #60
1 parent 765a42b commit 498297d

File tree

9 files changed

+109
-47
lines changed

9 files changed

+109
-47
lines changed

Makefile

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,21 @@ check: $(STATICCHECK)
2323
$(STATICCHECK) ./pkg/stream
2424

2525
test: vet fmt check
26-
go test --tags=debug -v ./pkg/stream -race -coverprofile=coverage.txt -covermode=atomic #-ginkgo.v
26+
go test --tags=debug -v ./pkg/stream -coverprofile=coverage.txt -covermode=atomic #-ginkgo.v
27+
28+
build-all: vet fmt check build-darwin build-windows build-linux
29+
go test --tags=debug -v -race ./pkg/stream -coverprofile=coverage.txt -covermode=atomic #-ginkgo.v
2730

2831
integration-test: vet fmt check
2932
cd ./pkg/system_integration && go test -v . -race -coverprofile=coverage.txt -covermode=atomic -tags debug -timeout 99999s
3033

34+
build-%: vet fmt check
35+
GOOS=$(*) GOARCH=amd64 go build -ldflags=$(LDFLAGS) -v ./...
36+
3137
build: vet fmt check
3238
go build -ldflags=$(LDFLAGS) -v ./...
3339

34-
PERFTEST_FLAGS ?= --producers 1 --consumers 1
40+
PERFTEST_FLAGS ?= --publishers 1 --consumers 1
3541
perf-test-run: perf-test-build
3642
go run perfTest/perftest.go silent $(PERFTEST_FLAGS)
3743

examples/publishersError/publisherError.go

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bufio"
55
"fmt"
66
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
7-
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
87
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
98

109
//"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
@@ -20,14 +19,6 @@ func CheckErr(err error) {
2019
}
2120
}
2221

23-
func CreateArrayMessagesForTesting(bacthMessages int) []message.StreamMessage {
24-
var arr []message.StreamMessage
25-
for z := 0; z < bacthMessages; z++ {
26-
arr = append(arr, amqp.NewMessage([]byte("hello_world_"+strconv.Itoa(z))))
27-
}
28-
return arr
29-
}
30-
3122
func main() {
3223
reader := bufio.NewReader(os.Stdin)
3324

@@ -41,45 +32,43 @@ func main() {
4132
SetUser("test").
4233
SetPassword("test"))
4334
CheckErr(err)
44-
streamName := "no"
45-
//err = env.DeclareStream(streamName,
46-
// &stream.StreamOptions{
47-
// MaxLengthBytes: stream.ByteCapacity{}.GB(2),
48-
// },
49-
//)
50-
//CheckErr(err)
35+
streamName := "pub-error"
36+
err = env.DeclareStream(streamName,
37+
&stream.StreamOptions{
38+
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
39+
},
40+
)
5141

52-
producer, err := env.NewProducer(streamName, &stream.ProducerOptions{Name: "myProducer"})
42+
CheckErr(err)
43+
44+
producer, err := env.NewProducer(streamName, stream.NewProducerOptions().SetProducerName("myProducer"))
5345
CheckErr(err)
5446

5547
// This channel receives the callback in case the is some error during the
5648
// publisher.
5749
chPublishError := producer.NotifyPublishError()
5850
handlePublishError(chPublishError)
5951

60-
go func() {
61-
for i := 0; i < 100; i++ {
62-
err := producer.BatchSend(CreateArrayMessagesForTesting(2))
63-
CheckErr(err)
64-
}
65-
}()
66-
// Here we close the producer during the publishing
67-
// so an publish error is raised
68-
69-
err = producer.Close()
70-
CheckErr(err)
52+
for i := 0; i < 100; i++ {
53+
msg := amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i)))
54+
err := producer.Send(msg)
55+
CheckErr(err)
56+
}
7157

7258
fmt.Println("Press any key to stop ")
7359
_, _ = reader.ReadString('\n')
7460
CheckErr(err)
61+
err = env.DeleteStream(streamName)
62+
CheckErr(err)
63+
err = env.Close()
64+
CheckErr(err)
7565

7666
}
7767

7868
func handlePublishError(publishError stream.ChannelPublishError) {
7969
go func() {
8070
var totalMessages int32
81-
for {
82-
pError := <-publishError
71+
for pError := range publishError {
8372
atomic.AddInt32(&totalMessages, 1)
8473
var data [][]byte
8574
if pError.UnConfirmedMessage != nil {

pkg/stream/client.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,17 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (
440440
if options == nil {
441441
options = NewProducerOptions()
442442
}
443+
444+
if options.QueueSize < minQueuePublisherSize || options.QueueSize > maxQueuePublisherSize {
445+
return nil, fmt.Errorf("QueueSize values must be between %d and %d",
446+
minQueuePublisherSize, maxQueuePublisherSize)
447+
}
448+
449+
if options.BatchSize < minBatchSize || options.BatchSize > maxBatchSize {
450+
return nil, fmt.Errorf("BatchSize values must be between %d and %d",
451+
minBatchSize, maxBatchSize)
452+
}
453+
443454
producer, err := c.coordinator.NewProducer(&ProducerOptions{
444455
client: c,
445456
streamName: streamName,
@@ -621,6 +632,10 @@ func (c *Client) DeclareSubscriber(streamName string,
621632
options = NewConsumerOptions()
622633
}
623634

635+
if options.Offset.typeOfs <= 0 || options.Offset.typeOfs > 6 {
636+
return nil, fmt.Errorf("specify a valid Offset")
637+
}
638+
624639
options.client = c
625640
options.streamName = streamName
626641
consumer := c.coordinator.NewConsumer(messagesHandler, options)

pkg/stream/constants.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,14 @@ const (
7676
LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f"
7777

7878
///
79-
defaultSocketBuffer = 4096
80-
79+
defaultSocketBuffer = 4096
80+
defaultQueuePublisherSize = 10000
81+
minQueuePublisherSize = 100
82+
maxQueuePublisherSize = 1_000_000
83+
84+
minBatchSize = 1
85+
maxBatchSize = 10_000
86+
defaultBatchSize = 100
8187
//
8288
ClientVersion = "0.10-alpha"
8389

pkg/stream/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ type ConsumerContext struct {
7474

7575
type MessagesHandler func(consumerContext ConsumerContext, message *amqp.Message)
7676

77-
type ConsumerOptions struct {
77+
type /**/ ConsumerOptions struct {
7878
client *Client
7979
ConsumerName string
8080
streamName string

pkg/stream/consumer_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,4 +266,14 @@ var _ = Describe("Streaming Consumers", func() {
266266

267267
})
268268

269+
It("Validation", func() {
270+
_, err := env.NewConsumer(streamName,
271+
func(consumerContext ConsumerContext, message *amqp.Message) {
272+
}, &ConsumerOptions{
273+
Offset: OffsetSpecification{},
274+
})
275+
Expect(err).To(HaveOccurred())
276+
277+
})
278+
269279
})

pkg/stream/enviroment.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,14 +189,6 @@ func (env *Environment) NewConsumer(streamName string,
189189
return env.consumers.NewSubscriber(client, streamName, messagesHandler, options)
190190
}
191191

192-
//func (env *Environment) NewStreamMessage(data []byte) message.StreamMessage {
193-
// env.options.Codec
194-
// return &AMQP10{
195-
// message: newMessage(data),
196-
// publishingId: -1,
197-
// }
198-
//}
199-
200192
func (env *Environment) Close() error {
201193
_ = env.producers.close()
202194
_ = env.consumers.close()

pkg/stream/producer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions {
7171

7272
func NewProducerOptions() *ProducerOptions {
7373
return &ProducerOptions{
74-
QueueSize: 10000,
75-
BatchSize: 100,
74+
QueueSize: defaultQueuePublisherSize,
75+
BatchSize: defaultBatchSize,
7676
}
7777
}
7878

pkg/stream/producer_test.go

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ var _ = Describe("Streaming Producers", func() {
183183
Expect(err).To(HaveOccurred())
184184
})
185185

186-
It("Smart Send Split frame", func() {
186+
It("Smart Send Split frame/BatchSize", func() {
187187
producer, err := testEnvironment.NewProducer(testProducerStream,
188188
NewProducerOptions().SetBatchSize(50))
189189
Expect(err).NotTo(HaveOccurred())
@@ -206,9 +206,32 @@ var _ = Describe("Streaming Producers", func() {
206206
s := make([]byte, 1148576)
207207
err = producer.Send(amqp.NewMessage(s))
208208
Expect(err).To(HaveOccurred())
209+
err = producer.Close()
210+
Expect(err).NotTo(HaveOccurred())
211+
212+
producer, err = testEnvironment.NewProducer(testProducerStream,
213+
NewProducerOptions().SetBatchSize(2))
214+
Expect(err).NotTo(HaveOccurred())
215+
var messagesCountBatch int32
216+
chConfirmBatch := producer.NotifyPublishConfirmation()
217+
go func(ch ChannelPublishConfirm) {
218+
for ids := range ch {
219+
atomic.AddInt32(&messagesCountBatch, int32(len(ids)))
220+
}
221+
}(chConfirmBatch)
222+
223+
for i := 0; i < 100; i++ {
224+
s := make([]byte, 11)
225+
err = producer.Send(amqp.NewMessage(s))
226+
Expect(err).NotTo(HaveOccurred())
227+
}
228+
229+
time.Sleep(800 * time.Millisecond)
230+
Expect(atomic.LoadInt32(&messagesCountBatch)).To(Equal(int32(100)))
209231

210232
err = producer.Close()
211233
Expect(err).NotTo(HaveOccurred())
234+
212235
})
213236

214237
It("Smart Send send after", func() {
@@ -244,7 +267,7 @@ var _ = Describe("Streaming Producers", func() {
244267
Expect(err).NotTo(HaveOccurred())
245268
})
246269

247-
It("Already Closed", func() {
270+
It("Already Closed/Limits", func() {
248271
env, err := NewEnvironment(NewEnvironmentOptions().SetMaxProducersPerClient(5))
249272
Expect(err).NotTo(HaveOccurred())
250273
producer, err := env.NewProducer(testProducerStream, nil)
@@ -254,6 +277,27 @@ var _ = Describe("Streaming Producers", func() {
254277

255278
err = producer.Close()
256279
Expect(err).To(Equal(AlreadyClosed))
280+
281+
/// validation limits
282+
/// options.QueueSize and options.BatchSize
283+
_, err = env.NewProducer(testProducerStream, &ProducerOptions{
284+
QueueSize: 1,
285+
})
286+
Expect(err).To(HaveOccurred())
287+
288+
_, err = env.NewProducer(testProducerStream, NewProducerOptions().SetQueueSize(5000000))
289+
Expect(err).To(HaveOccurred())
290+
291+
_, err = env.NewProducer(testProducerStream, &ProducerOptions{
292+
BatchSize: 0,
293+
})
294+
Expect(err).To(HaveOccurred())
295+
296+
_, err = env.NewProducer(testProducerStream, &ProducerOptions{
297+
BatchSize: 5000000,
298+
})
299+
Expect(err).To(HaveOccurred())
300+
257301
err = env.Close()
258302
Expect(err).NotTo(HaveOccurred())
259303
})

0 commit comments

Comments
 (0)