Skip to content

Commit

Permalink
Updating reader to use new Consumer interface--it's still slower.
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliver, Jonathan committed May 29, 2014
1 parent d94cb2e commit bfb8c88
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 61 deletions.
5 changes: 5 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package disruptor

type Consumer interface {
Consume(int64, int64) int64
}
98 changes: 53 additions & 45 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@ func main() {
runtime.GOMAXPROCS(2)

written, read := disruptor.NewCursor(), disruptor.NewCursor()

consumer := SampleConsumer{}
reader := disruptor.NewReader(read, written, written, consumer)

started := time.Now()

go publish(written, read)
consume(written, read)
go reader.Start()
publish(written, read, reader)
// consume(written, read)

finished := time.Now()
fmt.Println(Iterations, finished.Sub(started))
}

func publish(written, read *disruptor.Cursor) {
func publish(written, read *disruptor.Cursor, reader *disruptor.Reader) {
previous := disruptor.InitialSequenceValue
gate := disruptor.InitialSequenceValue

Expand All @@ -49,66 +54,69 @@ func publish(written, read *disruptor.Cursor) {
written.Sequence = next
previous = next
}

reader.Stop()
}
func consume(written, read *disruptor.Cursor) {
sleeps := 0

consumer := SampleConsumer{}
// func consume(reader *disruptor.Reader) {
// sleeps := 0

previous := int64(-1)
gate := int64(-1)
for previous < Iterations {
current := previous + 1
gate = written.Sequence
// // consumer := SampleConsumer{}

if current <= gate {
// // previous := int64(-1)
// // gate := int64(-1)
// // for previous < Iterations {
// // current := previous + 1
// // gate = written.Sequence

for current < gate {
current += consumer.Consume(current, gate)
}
// for current <= gate {
// // if current <= gate {

// if ringBuffer[current&BufferMask] > 0 {
// }
// // for current < gate {
// // current += consumer.Consume(current, gate)
// // }
// // // for current <= gate {

// current++
// }
// // // if ringBuffer[current&BufferMask] > 0 {
// // // }

previous = gate
read.Sequence = gate
} else {
sleeps++
time.Sleep(time.Microsecond)
}
}
// // // current++
// // // }

fmt.Println("Consumer sleeps:", sleeps)
// // previous = gate
// // read.Sequence = gate
// // } else {
// // sleeps++
// // time.Sleep(time.Microsecond)
// // }
// // }

// for sequence, gate := int64(0), int64(0); sequence < Iterations; sequence++ {
// fmt.Println("Consumer sleeps:", sleeps)

// // for gate <= sequence {
// // gate = written.Sequence
// // if gate <= sequence {
// // time.Sleep(time.Microsecond)
// // }
// // }
// // for sequence, gate := int64(0), int64(0); sequence < Iterations; sequence++ {

// // if ringBuffer[sequence&BufferMask] > 0 {
// // }
// // // for gate <= sequence {
// // // gate = written.Sequence
// // // if gate <= sequence {
// // // time.Sleep(time.Microsecond)
// // // }
// // // }

// // read.Sequence = sequence
// }
}
// // // if ringBuffer[sequence&BufferMask] > 0 {
// // // }

type Consumer interface {
Consume(lower, upper int64)
}
// // // read.Sequence = sequence
// // }
// }

// // type Consumer interface {
// // Consume(lower, upper int64)
// // }

type SampleConsumer struct{}

func (this SampleConsumer) Consume(current, gate int64) int64 {
if ringBuffer[current&BufferMask] > 0 {
}
// if ringBuffer[current&BufferMask] > 0 {
// }

return 1
}
47 changes: 31 additions & 16 deletions reader.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,56 @@
package disruptor

import "time"

type Reader struct {
read *Cursor
written *Cursor
upstream Barrier
consumer Consumer
ready bool
} // TODO: padding???

func NewReader(read, written *Cursor, upstream Barrier) *Reader {
func NewReader(read, written *Cursor, upstream Barrier, consumer Consumer) *Reader {
return &Reader{
read: read,
written: written,
upstream: upstream,
ready: true,
consumer: consumer,
ready: false,
}
}

func (this *Reader) Start() {
this.ready = true
go this.receive()
}
func (this *Reader) Stop() {
this.ready = false
}
func (this *Reader) Receive(next int64) int64 {
maximum := this.upstream.Read(next)

if next <= maximum {
return maximum
} else if maximum = this.written.Load(); next <= maximum {
return Gating
} else if this.ready {
return Idling
} else {
return Stopped
}
}
func (this *Reader) receive() {
current := this.read.Sequence + 1
for {
gate := this.upstream.Read(current)

func (this *Reader) Commit(sequence int64) {
this.read.Store(sequence)
if current <= gate {
for current < gate {
current += this.consumer.Consume(current, gate)
}
this.read.Store(current)
current++
} else if gate = this.written.Load(); current <= gate {
// Gating--TODO: wait strategy (provide gating count to wait strategy for phased backoff)
// gating++
// idling = 0
time.Sleep(time.Microsecond)
} else if this.ready {
// Idling--TODO: wait strategy (provide idling count to wait strategy for phased backoff)
// idling++
// gating = 0
time.Sleep(time.Microsecond)
} else {
break
}
}
}

0 comments on commit bfb8c88

Please sign in to comment.