Skip to content

Commit

Permalink
Added significant logging to drive out shared writer race condition.
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliver, Jonathan committed May 23, 2014
1 parent 109cfdf commit cd8fbe6
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 35 deletions.
36 changes: 19 additions & 17 deletions example/example_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"fmt"
"time"

"github.com/smartystreets/go-disruptor"
)
Expand All @@ -13,31 +12,26 @@ func consume0(reader *disruptor.SimpleReader) {
}
}
func consume1(reader *disruptor.Reader) {
started := time.Now()

debug := make([]int64, 5)
debug = debug[0:]
// started := time.Now()

fmt.Printf("\t\t\t\t\t[CONSUMER] Starting consumer...\n")
for {
sequence, remaining := reader.Receive()
if remaining >= 0 {
for remaining >= 0 {
fmt.Printf("\t\t\t\t\t[CONSUMER] Received messages starting at sequence %d, with %d messages remaining\n", sequence, remaining)

if sequence%ReportingFrequency == 0 {
finished := time.Now()
fmt.Println(sequence, finished.Sub(started))
started = time.Now()
}
for remaining >= 0 {
// if sequence%ReportingFrequency == 0 {
// finished := time.Now()
// fmt.Println(sequence, finished.Sub(started))
// started = time.Now()
// }

message := ringBuffer[sequence&RingMask]
fmt.Printf("\t\t\t\t\t[CONSUMER] Consuming sequence %d. Message Payload: %d\n", sequence, message)
if sequence != message {
for i := sequence - 5; i <= sequence; i++ {
debug = append(debug, ringBuffer[i&RingMask])
}

alert := fmt.Sprintf("Race Condition::Sequence: %d, Message %d\n", sequence, message)
alert := fmt.Sprintf("--------------\n\t\t\t\t\t[CONSUMER] ***Race Condition***::Sequence: %d, Message %d\n", sequence, message)
fmt.Println(alert)
fmt.Println("Partial Ring Buffer Snapshot:", debug)
panic(alert)
}

Expand All @@ -46,8 +40,16 @@ func consume1(reader *disruptor.Reader) {
remaining--
sequence++
}

fmt.Println("\t\t\t\t\t[CONSUMER] All messages consumed, committing up to sequence ", sequence-1)
reader.Commit(sequence - 1)
} else {
if remaining == disruptor.Gating {
fmt.Println("\t\t\t\t\t[CONSUMER] Consumer gating at sequence", sequence)
} else if remaining == disruptor.Idling {
fmt.Println("\t\t\t\t\t[CONSUMER] Consumer idling at sequence", sequence)
}
//time.Sleep(time.Millisecond * 10)
}
}
}
Expand Down
24 changes: 15 additions & 9 deletions example/example_producer.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package main

import "github.com/smartystreets/go-disruptor"
import (
"fmt"

func publish(writer *disruptor.SharedWriter) {
for {
sequence := writer.Reserve(ItemsToPublish)
"github.com/smartystreets/go-disruptor"
)

func publish(id int, writer *disruptor.SharedWriter) {

if sequence != disruptor.Gating {
for lower := sequence - ItemsToPublish; lower < sequence; {
lower++
ringBuffer[(lower)&RingMask] = lower
}
fmt.Printf("[PRODUCER %d] Starting producer...\n", id)

for {
if sequence := writer.Reserve(id, ItemsToPublish); sequence != disruptor.Gating {
fmt.Printf("[PRODUCER %d] Writing %d to slot %d\n", id, sequence, sequence)
ringBuffer[sequence&RingMask] = sequence
fmt.Printf("[PRODUCER %d] Committing sequence %d\n", id, sequence)
writer.Commit(sequence)
} else {
// fmt.Println("[PRODUCER] Gating")
//time.Sleep(time.Millisecond)
}
}
}
10 changes: 5 additions & 5 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ const (
MaxConsumersPerGroup = 1
MaxConsumerGroups = 1
MaxProducers = 2
ItemsToPublish = 1
ReportingFrequency = 1000000 * 10 // 1 million * N
RingSize = 1024 * 16
ItemsToPublish = 2
ReportingFrequency = 10000 //1000000 * 10 // 1 million * N
RingSize = 2
RingMask = RingSize - 1
)

Expand All @@ -30,10 +30,10 @@ func main() {
}
func startProducers(writer *disruptor.SharedWriter) {
for i := 0; i < MaxProducers-1; i++ {
go publish(writer)
go publish(i, writer)
}

publish(writer)
publish(MaxProducers-1, writer)
}

func startConsumerGroups(upstream disruptor.Barrier, writer *disruptor.Cursor) disruptor.Barrier {
Expand Down
8 changes: 7 additions & 1 deletion reader.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package disruptor

import "fmt"

const (
Gating = -2
Idling = -3
Expand All @@ -24,12 +26,16 @@ func NewReader(upstreamBarrier Barrier, writerCursor, readerCursor *Cursor) *Rea
func (this *Reader) Receive() (int64, int64) {
next := this.readerCursor.Load() + 1
ready := this.upstreamBarrier.Load()
fmt.Printf("\t\t\t\t\t[READER] Next: %d, Ready: %d\n", next, ready)

if next <= ready {
fmt.Printf("\t\t\t\t\t[READER] Next Sequence: %d, Remaining: %d\n", next, ready-next)
return next, ready - next
} else if next <= this.writerCursor.Load() {
} else if gate := this.writerCursor.Load(); next <= gate {
fmt.Println("\t\t\t\t\t[READER] Gating at sequence:", gate)
return next, Gating
} else {
fmt.Println("\t\t\t\t\t[READER] Gating")
return next, Idling
}
}
16 changes: 14 additions & 2 deletions shared_writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package disruptor

import "sync/atomic"
import (
"fmt"
"sync/atomic"
)

type SharedWriter struct {
capacity int64
Expand All @@ -24,23 +27,32 @@ func NewSharedWriter(shared *SharedWriterBarrier, upstream Barrier) *SharedWrite
}
}

func (this *SharedWriter) Reserve(count int64) int64 {
func (this *SharedWriter) Reserve(id int, count int64) int64 {
for {
previous := this.reservation.Load()
next := previous + count
wrap := next - this.capacity

fmt.Printf("[WRITER %d] Previous: %d, Next: %d, Wrap: %d\n", id, previous, next, wrap)

if wrap > this.gate {
fmt.Printf("[WRITER %d] Previous gate: %d\n", id, this.gate)
min := this.upstream.Load()
if wrap > min {
fmt.Printf("[WRITER %d] New gate (need to wait more): %d\n", id, min)
return Gating
}

fmt.Printf("[WRITER %d] New gate (accepted): %d\n", id, min)
this.gate = min // doesn't matter which write wins, BUT will most likely need to be a Cursor
}

fmt.Printf("[WRITER %d] Updating reservation. Previous: %d, Next: %d\n", id, previous, next)
if atomic.CompareAndSwapInt64(&this.reservation.value, previous, next) {
fmt.Printf("[WRITER %d] CAS accepted\n", id)
return next
} else {
fmt.Printf("[WRITER] CAS failed, retrying\n", id)
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion shared_writer_barrier.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package disruptor

import "math"
import (
"fmt"
"math"
)

type SharedWriterBarrier struct {
committed []int32
Expand Down Expand Up @@ -30,11 +33,14 @@ func prepareCommitBuffer(capacity int64) []int32 {
}

func (this *SharedWriterBarrier) Load() int64 {

for sequence := this.reservation.Load(); sequence >= 0; sequence-- {
if this.committed[sequence&this.mask] == int32(sequence>>this.shift) {
fmt.Printf("[SHARED-WRITER-BARRIER] Barrier Sequence: %d\n", sequence)
return sequence
}
}

fmt.Printf("[SHARED-WRITER-BARRIER] Barrier Sequence: -1\n")
return InitialSequenceValue
}

0 comments on commit cd8fbe6

Please sign in to comment.