Skip to content

Commit

Permalink
introduce pop func to use pollTimer
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardmack committed Sep 14, 2022
1 parent 558c0c6 commit b020e56
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 14 deletions.
33 changes: 19 additions & 14 deletions lib/transaction/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,19 @@ func (pq *priorityQueue) Pop() interface{} {

// PriorityQueue is a thread safe wrapper over `priorityQueue`
type PriorityQueue struct {
pq priorityQueue
currOrder uint64
txs map[common.Hash]*Item
sleepTime time.Duration
pq priorityQueue
currOrder uint64
txs map[common.Hash]*Item
pollInterval time.Duration
sync.Mutex
}

// NewPriorityQueue creates new instance of PriorityQueue
func NewPriorityQueue() *PriorityQueue {
spq := &PriorityQueue{
pq: make(priorityQueue, 0),
txs: make(map[common.Hash]*Item),
sleepTime: time.Millisecond * 10,
pq: make(priorityQueue, 0),
txs: make(map[common.Hash]*Item),
pollInterval: time.Millisecond * 10,
}

heap.Init(&spq.pq)
Expand Down Expand Up @@ -147,18 +147,23 @@ func (spq *PriorityQueue) Push(txn *ValidTransaction) (common.Hash, error) {
func (spq *PriorityQueue) PopChannel(timer *time.Timer) (tx chan *ValidTransaction) {
popChannel := make(chan *ValidTransaction)
go func() {
var pollTimer <-chan time.Time = nil
var pop = func() {
txn := spq.Pop()
if txn != nil {
popChannel <- txn
} else {
pollTimer = time.NewTimer(spq.pollInterval).C
}
}
for {
select {
case <-timer.C:
close(popChannel)
case <-pollTimer:
pop()
default:
txn := spq.Pop()

if txn != nil {
popChannel <- txn
} else {
time.Sleep(spq.sleepTime)
}
pop()
}
}
}()
Expand Down
50 changes: 50 additions & 0 deletions lib/transaction/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,53 @@ func TestPopChannel(t *testing.T) {
counter++
}
}

func TestPopChannelEnds(t *testing.T) {
pq := NewPriorityQueue()
// increase sleep time greater than timer
pq.pollInterval = 2 * time.Second
slotTimer := time.NewTimer(time.Second)

start := time.Now()

popChan := pq.PopChannel(slotTimer)
tests := []*ValidTransaction{
{
Extrinsic: []byte("a"),
Validity: &Validity{Priority: 1},
},
{
Extrinsic: []byte("b"),
Validity: &Validity{Priority: 4},
},
{
Extrinsic: []byte("c"),
Validity: &Validity{Priority: 2},
},
{
Extrinsic: []byte("d"),
Validity: &Validity{Priority: 17},
},
{
Extrinsic: []byte("e"),
Validity: &Validity{Priority: 2},
},
}

expected := []int{3, 1, 2, 4, 0}

for _, test := range tests {
pq.Push(test)
}

counter := 0
for txn := range popChan {
assert.Equal(t, tests[expected[counter]], txn)
counter++
}

d := time.Since(start)
// assert between 1s and 1.1s
assert.GreaterOrEqual(t, d, time.Second)
assert.LessOrEqual(t, d, time.Second+(time.Millisecond*100))
}

0 comments on commit b020e56

Please sign in to comment.