From b020e56acac6c0ed4525a0d04e9a74073b1df557 Mon Sep 17 00:00:00 2001 From: edwardmack Date: Wed, 14 Sep 2022 14:23:09 -0400 Subject: [PATCH] introduce pop func to use pollTimer --- lib/transaction/priority_queue.go | 33 +++++++++-------- lib/transaction/priority_queue_test.go | 50 ++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/lib/transaction/priority_queue.go b/lib/transaction/priority_queue.go index 04a595ede17..543e1aeb94c 100644 --- a/lib/transaction/priority_queue.go +++ b/lib/transaction/priority_queue.go @@ -78,19 +78,19 @@ func (pq *priorityQueue) Pop() interface{} { // PriorityQueue is a thread safe wrapper over `priorityQueue` type PriorityQueue struct { - pq priorityQueue - currOrder uint64 - txs map[common.Hash]*Item - sleepTime time.Duration + pq priorityQueue + currOrder uint64 + txs map[common.Hash]*Item + pollInterval time.Duration sync.Mutex } // NewPriorityQueue creates new instance of PriorityQueue func NewPriorityQueue() *PriorityQueue { spq := &PriorityQueue{ - pq: make(priorityQueue, 0), - txs: make(map[common.Hash]*Item), - sleepTime: time.Millisecond * 10, + pq: make(priorityQueue, 0), + txs: make(map[common.Hash]*Item), + pollInterval: time.Millisecond * 10, } heap.Init(&spq.pq) @@ -147,18 +147,23 @@ func (spq *PriorityQueue) Push(txn *ValidTransaction) (common.Hash, error) { func (spq *PriorityQueue) PopChannel(timer *time.Timer) (tx chan *ValidTransaction) { popChannel := make(chan *ValidTransaction) go func() { + var pollTimer <-chan time.Time = nil + var pop = func() { + txn := spq.Pop() + if txn != nil { + popChannel <- txn + } else { + pollTimer = time.NewTimer(spq.pollInterval).C + } + } for { select { case <-timer.C: close(popChannel) + case <-pollTimer: + pop() default: - txn := spq.Pop() - - if txn != nil { - popChannel <- txn - } else { - time.Sleep(spq.sleepTime) - } + pop() } } }() diff --git a/lib/transaction/priority_queue_test.go b/lib/transaction/priority_queue_test.go index f61093993e4..db443b7a200 100644 --- a/lib/transaction/priority_queue_test.go +++ b/lib/transaction/priority_queue_test.go @@ -313,3 +313,53 @@ func TestPopChannel(t *testing.T) { counter++ } } + +func TestPopChannelEnds(t *testing.T) { + pq := NewPriorityQueue() + // increase sleep time greater than timer + pq.pollInterval = 2 * time.Second + slotTimer := time.NewTimer(time.Second) + + start := time.Now() + + popChan := pq.PopChannel(slotTimer) + tests := []*ValidTransaction{ + { + Extrinsic: []byte("a"), + Validity: &Validity{Priority: 1}, + }, + { + Extrinsic: []byte("b"), + Validity: &Validity{Priority: 4}, + }, + { + Extrinsic: []byte("c"), + Validity: &Validity{Priority: 2}, + }, + { + Extrinsic: []byte("d"), + Validity: &Validity{Priority: 17}, + }, + { + Extrinsic: []byte("e"), + Validity: &Validity{Priority: 2}, + }, + } + + expected := []int{3, 1, 2, 4, 0} + + for _, test := range tests { + pq.Push(test) + } + + counter := 0 + for txn := range popChan { + assert.Equal(t, tests[expected[counter]], txn) + counter++ + } + + d := time.Since(start) + // assert between 1s and 1.1s + assert.GreaterOrEqual(t, d, time.Second) + assert.LessOrEqual(t, d, time.Second+(time.Millisecond*100)) +}