Skip to content

Commit cb8a3bb

Browse files
authored
Use store/tracking lingo instead of commit (#48)
1 parent ead0716 commit cb8a3bb

File tree

5 files changed

+9
-9
lines changed

5 files changed

+9
-9
lines changed

examples/offsetTracking/offsetTracking.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ func main() {
6969
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
7070
if atomic.AddInt32(&count, 1)%1000 == 0 {
7171
fmt.Printf("cousumed %d messages \n", atomic.LoadInt32(&count))
72-
// AVOID to commit for each single message, it will reduce the performances
73-
err := consumerContext.Consumer.Commit()
72+
// AVOID to store for each single message, it will reduce the performances
73+
err := consumerContext.Consumer.StoreOffset()
7474
if err != nil {
7575
CheckErr(err)
7676
}

perfTest/cmd/silent.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -310,11 +310,11 @@ func handleConsumerClose(channelClose stream.ChannelClose) {
310310
func startConsumer(consumerName string, streamName string) error {
311311

312312
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
313-
//logError("consumerMessageCount Commit: %s", consumerMessageCount)
313+
//logError("consumerMessageCount StoreOffset: %s", consumerMessageCount)
314314
if atomic.AddInt32(&consumerMessageCount, 1)%500 == 0 {
315-
err := consumerContext.Consumer.Commit()
315+
err := consumerContext.Consumer.StoreOffset()
316316
if err != nil {
317-
logError("Error Commit: %s", err)
317+
logError("Error StoreOffset: %s", err)
318318
}
319319
}
320320
}

pkg/stream/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ const (
1717
commandSubscribe = 7
1818
commandDeliver = 8
1919
commandCredit = 9
20-
commandCommitOffset = 10
20+
commandStoreOffset = 10
2121
CommandQueryOffset = 11
2222
CommandUnsubscribe = 12
2323
commandCreateStream = 13

pkg/stream/consumer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,14 +150,14 @@ func (consumer *Consumer) Close() error {
150150
return err.Err
151151
}
152152

153-
func (consumer *Consumer) Commit() error {
153+
func (consumer *Consumer) StoreOffset() error {
154154
if consumer.options.streamName == "" {
155155
return fmt.Errorf("stream Name can't be empty")
156156
}
157157
length := 2 + 2 + 2 + len(consumer.options.ConsumerName) + 2 +
158158
len(consumer.options.streamName) + 8
159159
var b = bytes.NewBuffer(make([]byte, 0, length+4))
160-
writeProtocolHeader(b, length, commandCommitOffset)
160+
writeProtocolHeader(b, length, commandStoreOffset)
161161

162162
writeString(b, consumer.options.ConsumerName)
163163
writeString(b, consumer.options.streamName)

pkg/stream/consumer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ var _ = Describe("Streaming Consumers", func() {
218218
consumer, err := env.NewConsumer(streamName,
219219
func(consumerContext ConsumerContext, message *amqp.Message) {
220220
atomic.AddInt32(&messagesCount, 1)
221-
_ = consumerContext.Consumer.Commit()
221+
_ = consumerContext.Consumer.StoreOffset()
222222
}, NewConsumerOptions().SetOffset(OffsetSpecification{}.First()).SetConsumerName("consumer_test"))
223223
Expect(err).NotTo(HaveOccurred())
224224
time.Sleep(500 * time.Millisecond)

0 commit comments

Comments
 (0)