Skip to content

Commit dd2f9a0

Browse files
authored
Send advertised host and port in open (#40)
* Send Send advertised host and port in open https://github.com/rabbitmq/rabbitmq-server/pull/3060/files fix offest client side and test * Change error handling remove pre-declared option on the perf test, in the same way as Java client does. It checks the PreconditionFailed condition. cc @gerhard * Change test * Change socket return type
1 parent 2e27e46 commit dd2f9a0

22 files changed

+317
-132
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ check: $(STATICCHECK)
2323
$(STATICCHECK) ./pkg/stream
2424

2525
test: vet fmt check
26-
go test -v ./pkg/stream -race -coverprofile=coverage.txt -covermode=atomic
26+
go test -v ./pkg/stream -race -coverprofile=coverage.txt -covermode=atomic -tags debug
2727

2828
integration-test: vet fmt check
2929
go test -v ./pkg/system_integration -race -coverprofile=coverage.txt -covermode=atomic -tags debug

examples/getting_started.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ func main() {
5353
)
5454

5555
CheckErr(err)
56-
5756
//optional publish confirmation channel
5857
chPublishConfirm := make(chan []int64, 1)
5958
go func(ch chan []int64) {
@@ -81,10 +80,10 @@ func main() {
8180
//}, nil)
8281
// if you need to track the offset you need a consumer name like:
8382
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
84-
fmt.Printf("consumer id: %d, text: %s \n ", consumerContext.Consumer.ID, message.Data)
83+
fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetConsumerName(), message.Data)
8584
err := consumerContext.Consumer.Commit()
8685
if err != nil {
87-
fmt.Printf("Error during commit")
86+
fmt.Printf("Error during commit: %s", err)
8887
}
8988
}
9089

@@ -96,7 +95,8 @@ func main() {
9695
fmt.Printf("Consumer: %s closed on the stream: %s, reason: %s \n", event.Name, event.StreamName, event.Reason)
9796
}()
9897

99-
consumer, err := env.NewConsumer(context.TODO(), streamName,
98+
consumer, err := env.NewConsumer(context.TODO(),
99+
streamName,
100100
handleMessages,
101101
channelClose,
102102
stream.NewConsumerOptions().
@@ -106,7 +106,7 @@ func main() {
106106

107107
fmt.Println("Press any key to stop ")
108108
_, _ = reader.ReadString('\n')
109-
err = consumer.UnSubscribe()
109+
err = consumer.Close()
110110
time.Sleep(200 * time.Millisecond)
111111
CheckErr(err)
112112
err = env.Close()

examples/offset/offset.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"fmt"
7+
"github.com/google/uuid"
8+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
9+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
10+
"os"
11+
"time"
12+
)
13+
14+
func CheckErr(err error) {
15+
if err != nil {
16+
fmt.Printf("%s ", err)
17+
os.Exit(1)
18+
}
19+
}
20+
21+
func CreateArrayMessagesForTesting(batchMessages int) []*amqp.Message {
22+
var arr []*amqp.Message
23+
for z := 0; z < batchMessages; z++ {
24+
arr = append(arr, amqp.NewMessage([]byte("1234567890")))
25+
}
26+
return arr
27+
}
28+
29+
func main() {
30+
reader := bufio.NewReader(os.Stdin)
31+
// Set log level, not mandatory by default is INFO
32+
//stream.SetLevelInfo(stream.DEBUG)
33+
34+
fmt.Println("Getting started with Streaming client for RabbitMQ")
35+
fmt.Println("Connecting to RabbitMQ streaming ...")
36+
37+
env, err := stream.NewEnvironment(
38+
stream.NewEnvironmentOptions().
39+
SetHost("localhost").
40+
SetPort(5552).
41+
SetUser("guest").
42+
SetPassword("guest").
43+
SetMaxConsumersPerClient(1))
44+
CheckErr(err)
45+
// Create a stream, you can create streams without any option like:
46+
// err = env.DeclareStream(streamName, nil)
47+
// it is a best practise to define a size, 1GB for example:
48+
streamName := uuid.New().String()
49+
err = env.DeclareStream(streamName,
50+
&stream.StreamOptions{
51+
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
52+
},
53+
)
54+
55+
CheckErr(err)
56+
57+
producer, err := env.NewProducer(streamName, nil, nil)
58+
CheckErr(err)
59+
60+
go func() {
61+
for i := 0; i < 2; i++ {
62+
_, err = producer.BatchPublish(context.Background(), CreateArrayMessagesForTesting(100))
63+
time.Sleep(1 * time.Second)
64+
}
65+
}()
66+
67+
counter := 0
68+
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
69+
counter = counter + 1
70+
fmt.Printf("messages consumed: %d \n ", counter)
71+
}
72+
73+
consumer, err := env.NewConsumer(context.TODO(), streamName,
74+
handleMessages,
75+
nil,
76+
stream.NewConsumerOptions().
77+
SetConsumerName("my_consumer"). // set a consumer name
78+
SetOffset(stream.OffsetSpecification{}.Offset(100))) // start specific offset, in this case we start from the 100 so it will consume 100 messages
79+
CheckErr(err)
80+
81+
fmt.Println("Press any key to stop ")
82+
_, _ = reader.ReadString('\n')
83+
err = producer.Close()
84+
CheckErr(err)
85+
err = consumer.Close()
86+
CheckErr(err)
87+
err = env.DeleteStream(streamName)
88+
CheckErr(err)
89+
90+
}

examples/publishersError/publisherError.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func main() {
3939

4040
for {
4141
pError := <-ch
42-
fmt.Printf("Error during publish message id:%d, error: %s \n", pError.PublishingId, pError.ErrorMessage)
42+
fmt.Printf("Error during publish message id:%d, error: %s \n", pError.PublishingId, pError.Err)
4343
}
4444

4545
}(chPublishError)

perfTest/cmd/commands.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ var (
2828
consumers int
2929
publishersPerClient int
3030
consumersPerClient int
31-
preDeclared bool
3231
streams []string
3332
maxLengthBytes string
3433
maxSegmentSizeBytes string
@@ -54,7 +53,6 @@ func setupCli(baseCmd *cobra.Command) {
5453
baseCmd.PersistentFlags().IntVarP(&rate, "rate", "", 0, "Limit publish rate")
5554
baseCmd.PersistentFlags().IntVarP(&variableRate, "variable-rate", "", 0, "Variable rate to value")
5655
baseCmd.PersistentFlags().IntVarP(&variableBody, "variable-body", "", 0, "Variable body size")
57-
baseCmd.PersistentFlags().BoolVarP(&preDeclared, "pre-declared", "", false, "Pre created stream")
5856
baseCmd.PersistentFlags().BoolVarP(&exitOnError, "exit-on-error", "", true, "Close the app in case of error")
5957
baseCmd.PersistentFlags().BoolVarP(&printStatsV, "print-stats", "", true, "Print stats")
6058
baseCmd.PersistentFlags().StringSliceVarP(&streams, "streams", "", []string{"perf-test-go"}, "Stream names")

perfTest/cmd/silent.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -138,33 +138,31 @@ func randomSleep() {
138138
}
139139

140140
func initStreams() error {
141-
if !preDeclared {
142-
logInfo("Declaring streams: %s", streams)
143-
env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().SetUri(
144-
rabbitmqBrokerUrl))
145-
if err != nil {
146-
logError("Error init stream connection: %s", err)
147-
return err
148-
}
141+
logInfo("Declaring streams: %s", streams)
142+
env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().SetUri(
143+
rabbitmqBrokerUrl))
144+
if err != nil {
145+
logError("Error init stream connection: %s", err)
146+
return err
147+
}
149148

150-
for _, streamName := range streams {
149+
for _, streamName := range streams {
151150

152-
err = env.DeclareStream(
153-
streamName,
154-
stream.NewStreamOptions().
155-
SetMaxLengthBytes(stream.ByteCapacity{}.From(maxLengthBytes)).
156-
SetMaxSegmentSizeBytes(stream.ByteCapacity{}.From(maxSegmentSizeBytes)))
157-
if err != nil {
158-
logError("Error declaring stream: %s", err)
151+
err = env.DeclareStream(
152+
streamName,
153+
stream.NewStreamOptions().
154+
SetMaxLengthBytes(stream.ByteCapacity{}.From(maxLengthBytes)).
155+
SetMaxSegmentSizeBytes(stream.ByteCapacity{}.From(maxSegmentSizeBytes)))
156+
if err != nil {
157+
if err == stream.PreconditionFailed {
158+
logError("The stream: %s already exist with different parameters", streamName)
159159
_ = env.Close()
160160
return err
161161
}
162162
}
163-
logInfo("End Init streams :%s\n", streams)
164-
return env.Close()
165163
}
166-
logInfo("Predeclared streams: %s\n", streams)
167-
return nil
164+
logInfo("End Init streams :%s\n", streams)
165+
return env.Close()
168166
}
169167
func startPublishers() error {
170168
env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().SetUri(

pkg/stream/client.go

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (c *Client) peerProperties() error {
139139
writeString(b, element)
140140
}
141141

142-
return c.handleWrite(b.Bytes(), resp)
142+
return c.handleWrite(b.Bytes(), resp).Err
143143
}
144144

145145
func (c *Client) authenticate(user string, password string) error {
@@ -193,14 +193,14 @@ func (c *Client) sendSaslAuthenticate(saslMechanism string, challengeResponse []
193193
writeInt(b, len(challengeResponse))
194194
b.Write(challengeResponse)
195195
err := c.handleWrite(b.Bytes(), resp)
196-
if err != nil {
197-
return err
196+
if err.Err != nil {
197+
return err.Err
198198
}
199199
// double read for TUNE
200200
tuneData := <-respTune.data
201-
err = c.coordinator.RemoveResponseByName("tune")
202-
if err != nil {
203-
return err
201+
errR := c.coordinator.RemoveResponseByName("tune")
202+
if errR != nil {
203+
return errR
204204
}
205205

206206
return c.socket.writeAndFlush(tuneData.([]byte))
@@ -214,7 +214,18 @@ func (c *Client) open(virtualHost string) error {
214214
writeProtocolHeader(b, length, commandOpen,
215215
correlationId)
216216
writeString(b, virtualHost)
217-
return c.handleWrite(b.Bytes(), resp)
217+
err := c.handleWriteWithResponse(b.Bytes(), resp, false)
218+
if err.Err != nil {
219+
return err.Err
220+
}
221+
222+
advHostPort := <-resp.data
223+
for k, v := range advHostPort.(ClientProperties).items {
224+
c.clientProperties.items[k] = v
225+
}
226+
_ = c.coordinator.RemoveResponseById(resp.correlationid)
227+
return nil
228+
218229
}
219230

220231
func (c *Client) DeleteStream(streamName string) error {
@@ -227,7 +238,7 @@ func (c *Client) DeleteStream(streamName string) error {
227238

228239
writeString(b, streamName)
229240

230-
return c.handleWrite(b.Bytes(), resp)
241+
return c.handleWrite(b.Bytes(), resp).Err
231242
}
232243

233244
func (c *Client) heartBeat() {
@@ -330,7 +341,7 @@ func (c *Client) DeclarePublisher(streamName string, channelConfirmListener Publ
330341
writeShort(b, int16(publisherReferenceSize))
331342
writeString(b, streamName)
332343
res := c.handleWrite(b.Bytes(), resp)
333-
return producer, res
344+
return producer, res.Err
334345
}
335346

336347
func (c *Client) metaData(streams ...string) *StreamsMetadata {
@@ -353,7 +364,7 @@ func (c *Client) metaData(streams ...string) *StreamsMetadata {
353364
}
354365

355366
err := c.handleWrite(b.Bytes(), resp)
356-
if err != nil {
367+
if err.Err != nil {
357368
return nil
358369
}
359370

@@ -369,7 +380,7 @@ func (c *Client) BrokerLeader(stream string) (*Broker, error) {
369380

370381
streamMetadata := streamsMetadata.Get(stream)
371382
if streamMetadata.responseCode != responseCodeOk {
372-
return nil, fmt.Errorf("leader error for stream: %s, error:%s", stream, lookErrorCode(streamMetadata.responseCode))
383+
return nil, lookErrorCode(streamMetadata.responseCode)
373384
}
374385
return streamMetadata.Leader, nil
375386
}
@@ -405,7 +416,7 @@ func (c *Client) DeclareStream(streamName string, options *StreamOptions) error
405416
writeString(b, element)
406417
}
407418

408-
return c.handleWrite(b.Bytes(), resp)
419+
return c.handleWrite(b.Bytes(), resp).Err
409420

410421
}
411422

@@ -438,6 +449,12 @@ func (c *Client) DeclareSubscriber(ctx context.Context, streamName string,
438449
// here we change the type since typeLastConsumed is not part of the protocol
439450
options.Offset.typeOfs = typeOffset
440451
}
452+
453+
// copy the option offset to the consumer offset
454+
// the option.offset won't change ( in case we need to retrive the original configuration)
455+
// consumer.current offset will be moved when reading
456+
consumer.setCurrentOffset(options.Offset.offset)
457+
441458
resp := c.coordinator.NewResponse(commandSubscribe, streamName)
442459
correlationId := resp.correlationid
443460
var b = bytes.NewBuffer(make([]byte, 0, length+4))
@@ -467,7 +484,7 @@ func (c *Client) DeclareSubscriber(ctx context.Context, streamName string,
467484
}
468485

469486
case data := <-consumer.response.data:
470-
consumer.setOffset(data.(int64))
487+
consumer.setCurrentOffset(data.(int64))
471488

472489
case messages := <-consumer.response.messages:
473490
for _, message := range messages {
@@ -476,5 +493,5 @@ func (c *Client) DeclareSubscriber(ctx context.Context, streamName string,
476493
}
477494
}
478495
}()
479-
return consumer, err
496+
return consumer, err.Err
480497
}

pkg/stream/client_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ var _ = AfterSuite(func() {
2323
err := testEnvironment.Close()
2424
Expect(err).NotTo(HaveOccurred())
2525
time.Sleep(500 * time.Millisecond)
26-
//Expect(testEnvironment.clientLocator.coordinator.ProducersCount()).To(Equal(0))
26+
//Expect(testEnvironment.Coordinators()[0].ProducersCount()).To(Equal(0))
2727
//Expect(testEnvironment.clientLocator.coordinator.ResponsesCount()).To(Equal(0))
2828
//Expect(testEnvironment.clientLocator.coordinator.ConsumersCount()).To(Equal(0))
2929
})
@@ -112,8 +112,7 @@ var _ = Describe("Streaming testEnvironment", func() {
112112
Expect(err).NotTo(HaveOccurred())
113113
err = testEnvironment.DeclareStream(testStreamName, nil)
114114
Expect(err).To(HaveOccurred())
115-
Expect(fmt.Sprintf("%s", err)).
116-
To(ContainSubstring("stream already exists"))
115+
Expect(err).To(Equal(StreamAlreadyExists))
117116
err = testEnvironment.DeleteStream(testStreamName)
118117
Expect(err).NotTo(HaveOccurred())
119118
})
@@ -127,8 +126,7 @@ var _ = Describe("Streaming testEnvironment", func() {
127126
MaxLengthBytes: ByteCapacity{}.MB(100),
128127
})
129128
Expect(err).To(HaveOccurred())
130-
Expect(fmt.Sprintf("%s", err)).
131-
To(ContainSubstring("precondition failed"))
129+
Expect(err).To(Equal(PreconditionFailed))
132130
err = testEnvironment.DeleteStream(testStreamName)
133131
Expect(err).NotTo(HaveOccurred())
134132
})

0 commit comments

Comments
 (0)