Skip to content

Commit

Permalink
add pop check, update pop with timer tests
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardmack committed Oct 6, 2022
1 parent e67c8cd commit 163aa71
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 48 deletions.
2 changes: 1 addition & 1 deletion lib/babe/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ func (b *BlockBuilder) buildBlockExtrinsics(slot Slot, rt runtime.Instance) []*t
break
}

// handle txn
extrinsic := txn.Extrinsic
logger.Tracef("build block, applying extrinsic %s", extrinsic)

Expand Down Expand Up @@ -223,6 +222,7 @@ func (b *BlockBuilder) buildBlockExtrinsics(slot Slot, rt runtime.Instance) []*t
logger.Debugf("build block applied extrinsic %s", extrinsic)
included = append(included, txn)
}

return included
}

Expand Down
8 changes: 6 additions & 2 deletions lib/transaction/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ type PriorityQueue struct {
// NewPriorityQueue creates new instance of PriorityQueue
func NewPriorityQueue() *PriorityQueue {
spq := &PriorityQueue{
pq: make(priorityQueue, 0),
txs: make(map[common.Hash]*Item),
pollInterval: time.Millisecond * 10,
pollInterval: 10 * time.Millisecond,
}

heap.Init(&spq.pq)
Expand Down Expand Up @@ -145,6 +144,11 @@ func (spq *PriorityQueue) Push(txn *ValidTransaction) (common.Hash, error) {
// PopWithTimer returns the next valid transaction from the queue.
// When the timer expires, it returns `nil`.
func (spq *PriorityQueue) PopWithTimer(timer *time.Timer) (transaction *ValidTransaction) {
transaction = spq.Pop()
if transaction != nil {
return transaction
}

transactionChannel := make(chan *ValidTransaction)
go func() {
pollTicker := time.NewTicker(spq.pollInterval)
Expand Down
110 changes: 65 additions & 45 deletions lib/transaction/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,55 +317,75 @@ func Test_PopWithTimer(t *testing.T) {
}
}

func Test_PopWithTimer_Ends(t *testing.T) {
pq := NewPriorityQueue()
// increase sleep time greater than timer
pq.pollInterval = 2 * time.Second
slotTimer := time.NewTimer(time.Second)

start := time.Now()

tests := []*ValidTransaction{
{
Extrinsic: []byte("a"),
Validity: &Validity{Priority: 1},
},
{
Extrinsic: []byte("b"),
Validity: &Validity{Priority: 4},
},
{
Extrinsic: []byte("c"),
Validity: &Validity{Priority: 2},
},
{
Extrinsic: []byte("d"),
Validity: &Validity{Priority: 17},
},
{
Extrinsic: []byte("e"),
Validity: &Validity{Priority: 2},
func Test_PriorityQueue_PopWithTimer(t *testing.T) {
t.Parallel()

testCases := map[string]struct {
queueBuilder func() *PriorityQueue
queueModifier func(queue *PriorityQueue, done chan<- struct{})
timer *time.Timer
transaction *ValidTransaction
}{
"empty queue polled once": {
// test should last 1ns
queueBuilder: NewPriorityQueue,
timer: time.NewTimer(time.Nanosecond),
},
"empty queue polled multiple times": {
// test should last 1ms
queueBuilder: func() *PriorityQueue {
queue := NewPriorityQueue()
queue.pollInterval = time.Nanosecond
return queue
},
timer: time.NewTimer(time.Millisecond),
},
"queue with one element polled once": {
// test should be instantaneous
queueBuilder: func() *PriorityQueue {
queue := NewPriorityQueue()
queue.Push(&ValidTransaction{Validity: &Validity{Priority: 1}})
return queue
},
timer: time.NewTimer(time.Nanosecond),
transaction: &ValidTransaction{Validity: &Validity{Priority: 1}},
},
"queue polled multiple times until new element": {
// test should last 1ms
queueBuilder: func() *PriorityQueue {
queue := NewPriorityQueue()
queue.pollInterval = time.Nanosecond
return queue
},
queueModifier: func(queue *PriorityQueue, done chan<- struct{}) {
close(done)
time.Sleep(time.Millisecond)
queue.Push(&ValidTransaction{Validity: &Validity{Priority: 1}})
},
timer: time.NewTimer(time.Second),
transaction: &ValidTransaction{Validity: &Validity{Priority: 1}},
},
}

expected := []int{3, 1, 2, 4, 0}
for name, testCase := range testCases {
testCase := testCase
t.Run(name, func(t *testing.T) {
t.Parallel()

for _, test := range tests {
pq.Push(test)
}
queue := testCase.queueBuilder()

counter := 0
for {
txn := pq.PopWithTimer(slotTimer)
if txn == nil {
break
}
assert.Equal(t, tests[expected[counter]], txn)
counter++
}
modifyDone := make(chan struct{})
if testCase.queueModifier != nil {
// modify queue asynchronously while popping
go testCase.queueModifier(queue, modifyDone)
} else {
close(modifyDone)
}

d := time.Since(start)
// assert between 1s and 1.1s
assert.GreaterOrEqual(t, d, time.Second)
assert.LessOrEqual(t, d, time.Second+(time.Millisecond*100))
transaction := queue.PopWithTimer(testCase.timer)
<-modifyDone
testCase.timer.Stop()
assert.Equal(t, testCase.transaction, transaction)
})
}
}

0 comments on commit 163aa71

Please sign in to comment.