Skip to content

Commit

Permalink
Up to 500M messages per second.
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliver, Jonathan committed May 19, 2014
1 parent 279cc74 commit 873ee95
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 69 deletions.
15 changes: 0 additions & 15 deletions example/consumer.go

This file was deleted.

21 changes: 0 additions & 21 deletions example/consumer_handler.go

This file was deleted.

40 changes: 40 additions & 0 deletions example/example_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
"fmt"
"time"

"github.com/smartystreets/go-disruptor"
)

const Mod = 1000000 * 10 // 1 million * N

func consume(writerBarrier disruptor.Barrier, writerCursor, readerCursor *disruptor.Cursor) {
reader := disruptor.NewReader(writerBarrier, writerCursor, readerCursor)
started := time.Now()

for {
sequence, remaining := reader.Receive()
if remaining == disruptor.Idle {
} else if remaining == disruptor.Gating {
} else {
for ; remaining >= 0; remaining-- {
sequence++

if sequence%Mod == 0 {
finished := time.Now()
fmt.Println(sequence, finished.Sub(started))
started = time.Now()
}

if sequence != ringBuffer[sequence&RingMask] {
message := ringBuffer[sequence&RingMask]
panic(fmt.Sprintf("Sequence: %d, Message %d\n", sequence, message))
}

}

reader.Commit(sequence)
}
}
}
18 changes: 18 additions & 0 deletions example/example_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package main

import "github.com/smartystreets/go-disruptor"

func publish(writer *disruptor.Writer) {
for {
_, upper := writer.Reserve(1)
if upper == disruptor.Gating {
continue
}

// ringBuffer[(upper-2)&RingMask] = upper - 2
// ringBuffer[(upper-1)&RingMask] = upper - 1
ringBuffer[upper&RingMask] = upper

writer.Commit(upper)
}
}
13 changes: 0 additions & 13 deletions example/publisher.go

This file was deleted.

23 changes: 7 additions & 16 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,28 @@ const (

type Reader struct {
upstreamBarrier Barrier
callback Consumer
writerCursor *Cursor
readerCursor *Cursor
}

func NewReader(upstreamBarrier Barrier, callback Consumer, writerCursor, readerCursor *Cursor) *Reader {
func NewReader(upstreamBarrier Barrier, writerCursor, readerCursor *Cursor) *Reader {
return &Reader{
upstreamBarrier: upstreamBarrier,
callback: callback,
writerCursor: writerCursor,
readerCursor: readerCursor,
}
}

// IDEA: Read returns remaining and consumer calls Commit(seq) once they're done reading
func (this *Reader) Process() int64 {
next := this.readerCursor.Load() + 1
func (this *Reader) Receive() (int64, int64) {
current := this.readerCursor.Load()
next := current + 1
ready := this.upstreamBarrier()

if next <= ready {
for next <= ready {
this.callback.Consume(next, ready-next)
next++
}

next--
this.readerCursor.Store(next)
return next
return current, ready - next
} else if next <= this.writerCursor.Load() {
return Gating
return current, Gating
} else {
return Idle
return current, Idle
}
}
5 changes: 5 additions & 0 deletions reader_amd64.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package disruptor

func (this *Reader) Commit(sequence int64) {
this.readerCursor.value = sequence
}
3 changes: 2 additions & 1 deletion reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ func BenchmarkReader(b *testing.B) {

for i := int64(0); i < iterations; i++ {
readerCursor.Store(0)
reader.Process()
sequence := reader.Receive()
reader.Commit(sequence)
}
}

Expand Down
7 changes: 4 additions & 3 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@ func isPowerOfTwo(value int32) bool {
return value > 0 && (value&(value-1)) == 0
}

func (this *Writer) Reserve(items int64) int64 {
func (this *Writer) Reserve(items int64) (int64, int64) {
current := this.previous + 1
next := this.previous + items
wrap := next - this.ringSize

if wrap > this.gate {
min := this.readerBarrier()
if wrap > min {
return Gating
return current, Gating
}

this.gate = min
}

this.previous = next
return next
return current, next
}

0 comments on commit 873ee95

Please sign in to comment.