Skip to content

Commit ead0716

Browse files
authored
first implementation (#47)
* handle duplication
1 parent f5e8f51 commit ead0716

File tree

17 files changed

+213
-66
lines changed

17 files changed

+213
-66
lines changed

examples/getting_started.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package main
22

33
import (
44
"bufio"
5-
"context"
65
"fmt"
76
"github.com/google/uuid"
87
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
8+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
99
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
1010
"os"
1111
"strconv"
@@ -19,8 +19,8 @@ func CheckErr(err error) {
1919
}
2020
}
2121

22-
func CreateArrayMessagesForTesting(bacthMessages int) []*amqp.Message {
23-
var arr []*amqp.Message
22+
func CreateArrayMessagesForTesting(bacthMessages int) []message.StreamMessage {
23+
var arr []message.StreamMessage
2424
for z := 0; z < bacthMessages; z++ {
2525
arr = append(arr, amqp.NewMessage([]byte("hello_world_"+strconv.Itoa(z))))
2626
}
@@ -32,9 +32,9 @@ func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
3232
for confirmed := range confirms {
3333
for _, msg := range confirmed {
3434
if msg.Confirmed {
35-
fmt.Printf("message %s stored \n ", msg.Message.Data)
35+
fmt.Printf("message %s stored \n ", msg.Message.GetData())
3636
} else {
37-
fmt.Printf("message %s failed \n ", msg.Message.Data)
37+
fmt.Printf("message %s failed \n ", msg.Message.GetData())
3838
}
3939

4040
}
@@ -102,7 +102,7 @@ func main() {
102102
fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetName(), message.Data)
103103
}
104104

105-
consumer, err := env.NewConsumer(context.TODO(),
105+
consumer, err := env.NewConsumer(
106106
streamName,
107107
handleMessages,
108108
stream.NewConsumerOptions().

examples/haProducer/producer.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package main
22

33
import (
44
"bufio"
5-
"context"
65
"fmt"
76
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
87
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
8+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
99
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
1010
"os"
1111
"strconv"
@@ -22,8 +22,8 @@ func CheckErr(err error) {
2222

2323
var idx = 0
2424

25-
func CreateArrayMessagesForTesting(bacthMessages int) []*amqp.Message {
26-
var arr []*amqp.Message
25+
func CreateArrayMessagesForTesting(bacthMessages int) []message.StreamMessage {
26+
var arr []message.StreamMessage
2727
for z := 0; z < bacthMessages; z++ {
2828
idx++
2929
arr = append(arr, amqp.NewMessage([]byte(strconv.Itoa(idx))))
@@ -38,7 +38,7 @@ func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
3838
for _, m := range messagesIds {
3939
if !m.Confirmed {
4040
if atomic.AddInt32(&counter, 1)%10 == 0 {
41-
fmt.Printf("Confirmed %s message - status %t - %d \n ", m.Message.Data, m.Confirmed, atomic.LoadInt32(&counter))
41+
fmt.Printf("Confirmed %s message - status %t - %d \n ", m.Message.GetData(), m.Confirmed, atomic.LoadInt32(&counter))
4242
}
4343
}
4444
}
@@ -80,7 +80,7 @@ func main() {
8080
time.Sleep(4 * time.Second)
8181
for i := 0; i < 1000000; i++ {
8282
err := rProducer.BatchPublish(CreateArrayMessagesForTesting(10))
83-
time.Sleep(10 * time.Millisecond)
83+
time.Sleep(500 * time.Millisecond)
8484
if i%1000 == 0 {
8585
fmt.Println("sent.. " + strconv.Itoa(i))
8686
}
@@ -94,7 +94,7 @@ func main() {
9494
fmt.Printf("messages consumed: %s \n ", message.Data)
9595
}
9696

97-
consumer, err := env.NewConsumer(context.TODO(), streamName,
97+
consumer, err := env.NewConsumer(streamName,
9898
handleMessages,
9999
stream.NewConsumerOptions().
100100
SetConsumerName("my_consumer"))

examples/offset/offset.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package main
22

33
import (
44
"bufio"
5-
"context"
65
"fmt"
76
"github.com/google/uuid"
87
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
8+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
99
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
1010
"os"
1111
"time"
@@ -18,8 +18,8 @@ func CheckErr(err error) {
1818
}
1919
}
2020

21-
func CreateArrayMessagesForTesting(batchMessages int) []*amqp.Message {
22-
var arr []*amqp.Message
21+
func CreateArrayMessagesForTesting(batchMessages int) []message.StreamMessage {
22+
var arr []message.StreamMessage
2323
for z := 0; z < batchMessages; z++ {
2424
arr = append(arr, amqp.NewMessage([]byte("1234567890")))
2525
}
@@ -70,7 +70,7 @@ func main() {
7070
fmt.Printf("messages consumed: %d \n ", counter)
7171
}
7272

73-
consumer, err := env.NewConsumer(context.TODO(), streamName,
73+
consumer, err := env.NewConsumer(streamName,
7474
handleMessages,
7575
stream.NewConsumerOptions().
7676
SetConsumerName("my_consumer"). // set a consumer name

examples/offsetTracking/offsetTracking.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package main
22

33
import (
44
"bufio"
5-
"context"
65
"fmt"
76
"github.com/google/uuid"
87
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
8+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
99
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
1010
"os"
1111
"strconv"
@@ -20,8 +20,8 @@ func CheckErr(err error) {
2020
}
2121
}
2222

23-
func CreateArrayMessagesForTesting(bacthMessages int) []*amqp.Message {
24-
var arr []*amqp.Message
23+
func CreateArrayMessagesForTesting(bacthMessages int) []message.StreamMessage {
24+
var arr []message.StreamMessage
2525
for z := 0; z < bacthMessages; z++ {
2626
arr = append(arr, amqp.NewMessage([]byte("hello_world_"+strconv.Itoa(z))))
2727
}
@@ -78,7 +78,7 @@ func main() {
7878

7979
}
8080

81-
consumer, err := env.NewConsumer(context.TODO(),
81+
consumer, err := env.NewConsumer(
8282
streamName,
8383
handleMessages,
8484
stream.NewConsumerOptions().

examples/publishersError/publisherError.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"github.com/google/uuid"
77
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
8+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
89
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
910

1011
//"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
@@ -20,8 +21,8 @@ func CheckErr(err error) {
2021
}
2122
}
2223

23-
func CreateArrayMessagesForTesting(bacthMessages int) []*amqp.Message {
24-
var arr []*amqp.Message
24+
func CreateArrayMessagesForTesting(bacthMessages int) []message.StreamMessage {
25+
var arr []message.StreamMessage
2526
for z := 0; z < bacthMessages; z++ {
2627
arr = append(arr, amqp.NewMessage([]byte("hello_world_"+strconv.Itoa(z))))
2728
}
@@ -96,7 +97,7 @@ func handlePublishError(publishError stream.ChannelPublishError) {
9697
atomic.AddInt32(&totalMessages, 1)
9798
var data [][]byte
9899
if pError.UnConfirmedMessage != nil {
99-
data = pError.UnConfirmedMessage.Message.Data
100+
data = pError.UnConfirmedMessage.Message.GetData()
100101
}
101102
fmt.Printf("Error during publish, message:%s , error: %s. Total %d \n", data, pError.Err, totalMessages)
102103
}

perfTest/cmd/silent.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package cmd
22

33
import (
4-
"context"
54
"fmt"
65
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
76
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
7+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
88
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
99
"github.com/spf13/cobra"
1010
"math/rand"
@@ -204,7 +204,7 @@ func startPublisher(streamName string) error {
204204
chPublishError := rPublisher.NotifyPublishError()
205205
handlePublishError(chPublishError)
206206

207-
var arr []*amqp.Message
207+
var arr []message.StreamMessage
208208
var body string
209209
for z := 0; z < batchSize; z++ {
210210
if variableBody > 0 {
@@ -220,7 +220,7 @@ func startPublisher(streamName string) error {
220220
arr = append(arr, amqp.NewMessage([]byte(body)))
221221
}
222222

223-
go func(prod *ha.ReliableProducer, messages []*amqp.Message) {
223+
go func(prod *ha.ReliableProducer, messages []message.StreamMessage) {
224224
for {
225225
if rate > 0 {
226226
var v1 float64
@@ -318,7 +318,7 @@ func startConsumer(consumerName string, streamName string) error {
318318
}
319319
}
320320
}
321-
consumer, err := simulEnvironment.NewConsumer(context.TODO(),
321+
consumer, err := simulEnvironment.NewConsumer(
322322
streamName,
323323
handleMessages,
324324
stream.NewConsumerOptions().

pkg/amqp/types.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,12 +390,44 @@ type Message struct {
390390
doneSignal chan struct{}
391391
}
392392

393+
type AMQP10 struct {
394+
publishingId int64
395+
message *Message
396+
}
397+
398+
func NewMessage(data []byte) *AMQP10 {
399+
return &AMQP10{
400+
message: newMessage(data),
401+
publishingId: -1,
402+
}
403+
}
404+
405+
func (amqp *AMQP10) SetPublishingId(id int64) {
406+
amqp.publishingId = id
407+
}
408+
409+
func (amqp *AMQP10) GetPublishingId() int64 {
410+
return amqp.publishingId
411+
}
412+
413+
func (amqp *AMQP10) MarshalBinary() ([]byte, error) {
414+
return amqp.message.MarshalBinary()
415+
}
416+
417+
func (amqp *AMQP10) UnmarshalBinary(data []byte) error {
418+
return amqp.message.UnmarshalBinary(data)
419+
}
420+
421+
func (amqp *AMQP10) GetData() [][]byte {
422+
return amqp.message.Data
423+
}
424+
393425
// NewMessage returns a *Message with data as the payload.
394426
//
395427
// This constructor is intended as a helper for basic Messages with a
396428
// single data payload. It is valid to construct a Message directly for
397429
// more complex usages.
398-
func NewMessage(data []byte) *Message {
430+
func newMessage(data []byte) *Message {
399431
return &Message{
400432
Data: [][]byte{data},
401433
doneSignal: make(chan struct{}),

pkg/ha/ha_publisher.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"github.com/pkg/errors"
66
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
7+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
78
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
89
"net"
910
"sync"
@@ -69,7 +70,7 @@ func (p *ReliableProducer) NotifyPublishConfirmation() stream.ChannelPublishConf
6970

7071
}
7172

72-
func (p *ReliableProducer) BatchPublish(messages []*amqp.Message) error {
73+
func (p *ReliableProducer) BatchPublish(messages []message.StreamMessage) error {
7374
if p.getStatus() == StatusStreamDoesNotExist {
7475
return stream.StreamDoesNotExist
7576
}

pkg/message/interface.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package message
2+
3+
type StreamMessage interface {
4+
MarshalBinary() ([]byte, error)
5+
UnmarshalBinary(data []byte) error
6+
SetPublishingId(id int64)
7+
GetPublishingId() int64
8+
GetData() [][]byte
9+
}

pkg/stream/buffer_writer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ func writeString(inputBuff *bytes.Buffer, value string) {
4545
inputBuff.Write([]byte(value))
4646
}
4747

48+
func writeBytes(inputBuff *bytes.Buffer, value []byte) {
49+
inputBuff.Write(value)
50+
}
51+
4852
// writeProtocolHeader protocol utils functions
4953
func writeProtocolHeader(inputBuff *bytes.Buffer,
5054
length int, command int16,

0 commit comments

Comments
 (0)