diff --git a/dot/state/transaction.go b/dot/state/transaction.go index 98a0265ab4..ba914a06e0 100644 --- a/dot/state/transaction.go +++ b/dot/state/transaction.go @@ -5,6 +5,7 @@ package state import ( "sync" + "time" "github.com/ChainSafe/gossamer/dot/telemetry" @@ -47,6 +48,12 @@ func (s *TransactionState) Pop() *transaction.ValidTransaction { return s.queue.Pop() } +// PopWithTimer returns the next valid transaction from the queue. +// When the timer expires, it returns `nil`. +func (s *TransactionState) PopWithTimer(timerCh <-chan time.Time) (transaction *transaction.ValidTransaction) { + return s.queue.PopWithTimer(timerCh) +} + // Peek returns the head of the queue without removing it func (s *TransactionState) Peek() *transaction.ValidTransaction { return s.queue.Peek() diff --git a/lib/babe/build.go b/lib/babe/build.go index c68bbaf546..024e210988 100644 --- a/lib/babe/build.go +++ b/lib/babe/build.go @@ -174,11 +174,15 @@ func (b *BlockBuilder) buildBlockSeal(header *types.Header) (*types.SealDigest, func (b *BlockBuilder) buildBlockExtrinsics(slot Slot, rt runtime.Instance) []*transaction.ValidTransaction { var included []*transaction.ValidTransaction - for !hasSlotEnded(slot) { - txn := b.transactionState.Pop() - // Transaction queue is empty. - if txn == nil { - continue + slotEnd := slot.start.Add(slot.duration * 2 / 3) // reserve last 1/3 of slot for block finalisation + timeout := time.Until(slotEnd) + slotTimer := time.NewTimer(timeout) + + for { + txn := b.transactionState.PopWithTimer(slotTimer.C) + slotTimerExpired := txn == nil + if slotTimerExpired { + break } extrinsic := txn.Extrinsic @@ -287,11 +291,6 @@ func (b *BlockBuilder) addToQueue(txs []*transaction.ValidTransaction) { } } -func hasSlotEnded(slot Slot) bool { - slotEnd := slot.start.Add(slot.duration * 2 / 3) // reserve last 1/3 of slot for block finalisation - return time.Since(slotEnd) >= 0 -} - func extrinsicsToBody(inherents [][]byte, txs []*transaction.ValidTransaction) (types.Body, error) { extrinsics := types.BytesArrayToExtrinsics(inherents) diff --git a/lib/babe/mock_state_test.go b/lib/babe/mock_state_test.go index 38815094a9..a0215f8d9a 100644 --- a/lib/babe/mock_state_test.go +++ b/lib/babe/mock_state_test.go @@ -490,6 +490,20 @@ func (mr *MockTransactionStateMockRecorder) Pop() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pop", reflect.TypeOf((*MockTransactionState)(nil).Pop)) } +// PopWithTimer mocks base method. +func (m *MockTransactionState) PopWithTimer(arg0 <-chan time.Time) *transaction.ValidTransaction { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PopWithTimer", arg0) + ret0, _ := ret[0].(*transaction.ValidTransaction) + return ret0 +} + +// PopWithTimer indicates an expected call of PopWithTimer. +func (mr *MockTransactionStateMockRecorder) PopWithTimer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PopWithTimer", reflect.TypeOf((*MockTransactionState)(nil).PopWithTimer), arg0) +} + // Push mocks base method. func (m *MockTransactionState) Push(arg0 *transaction.ValidTransaction) (common.Hash, error) { m.ctrl.T.Helper() diff --git a/lib/babe/state.go b/lib/babe/state.go index 897f06bd35..0205443fcb 100644 --- a/lib/babe/state.go +++ b/lib/babe/state.go @@ -56,6 +56,7 @@ type TransactionState interface { Push(vt *transaction.ValidTransaction) (common.Hash, error) Pop() *transaction.ValidTransaction Peek() *transaction.ValidTransaction + PopWithTimer(timerCh <-chan time.Time) (tx *transaction.ValidTransaction) } // EpochState is the interface for epoch methods diff --git a/lib/transaction/priority_queue.go b/lib/transaction/priority_queue.go index 0f502b2b5f..a4408a8886 100644 --- a/lib/transaction/priority_queue.go +++ b/lib/transaction/priority_queue.go @@ -7,6 +7,7 @@ import ( "container/heap" "errors" "sync" + "time" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" @@ -77,17 +78,18 @@ func (pq *priorityQueue) Pop() interface{} { // PriorityQueue is a thread safe wrapper over `priorityQueue` type PriorityQueue struct { - pq priorityQueue - currOrder uint64 - txs map[common.Hash]*Item + pq priorityQueue + currOrder uint64 + txs map[common.Hash]*Item + pollInterval time.Duration sync.Mutex } // NewPriorityQueue creates new instance of PriorityQueue func NewPriorityQueue() *PriorityQueue { spq := &PriorityQueue{ - pq: make(priorityQueue, 0), - txs: make(map[common.Hash]*Item), + txs: make(map[common.Hash]*Item), + pollInterval: 10 * time.Millisecond, } heap.Init(&spq.pq) @@ -139,6 +141,40 @@ func (spq *PriorityQueue) Push(txn *ValidTransaction) (common.Hash, error) { return hash, nil } +// PopWithTimer returns the next valid transaction from the queue. +// When the timer expires, it returns `nil`. +func (spq *PriorityQueue) PopWithTimer(timerCh <-chan time.Time) (transaction *ValidTransaction) { + transaction = spq.Pop() + if transaction != nil { + return transaction + } + + transactionChannel := make(chan *ValidTransaction) + go func() { + pollTicker := time.NewTicker(spq.pollInterval) + defer pollTicker.Stop() + + for { + select { + case <-timerCh: + transactionChannel <- nil + return + case <-pollTicker.C: + } + + transaction := spq.Pop() + if transaction == nil { + continue + } + + transactionChannel <- transaction + return + } + }() + + return <-transactionChannel +} + // Pop removes the transaction with has the highest priority value from the queue and returns it. // If there are multiple transaction with same priority value then it return them in FIFO order. func (spq *PriorityQueue) Pop() *ValidTransaction { diff --git a/lib/transaction/priority_queue_integration_test.go b/lib/transaction/priority_queue_integration_test.go new file mode 100644 index 0000000000..029b5cf256 --- /dev/null +++ b/lib/transaction/priority_queue_integration_test.go @@ -0,0 +1,57 @@ +// Copyright 2021 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +//go:build integration + +package transaction + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func Test_PopWithTimer(t *testing.T) { + pq := NewPriorityQueue() + slotTimer := time.NewTimer(time.Second) + + tests := []*ValidTransaction{ + { + Extrinsic: []byte("a"), + Validity: &Validity{Priority: 1}, + }, + { + Extrinsic: []byte("b"), + Validity: &Validity{Priority: 4}, + }, + { + Extrinsic: []byte("c"), + Validity: &Validity{Priority: 2}, + }, + { + Extrinsic: []byte("d"), + Validity: &Validity{Priority: 17}, + }, + { + Extrinsic: []byte("e"), + Validity: &Validity{Priority: 2}, + }, + } + + expected := []int{3, 1, 2, 4, 0} + + for _, test := range tests { + pq.Push(test) + } + + counter := 0 + for { + txn := pq.PopWithTimer(slotTimer.C) + if txn == nil { + break + } + assert.Equal(t, tests[expected[counter]], txn) + counter++ + } +} diff --git a/lib/transaction/priority_queue_test.go b/lib/transaction/priority_queue_test.go index 61caae82cf..e04c8c5634 100644 --- a/lib/transaction/priority_queue_test.go +++ b/lib/transaction/priority_queue_test.go @@ -8,6 +8,8 @@ import ( "sync" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestPriorityQueue(t *testing.T) { @@ -270,3 +272,76 @@ func TestRemoveExtrinsic(t *testing.T) { t.Fatalf("Fail: got %v expected %v", res, tests[1]) } } + +func Test_PriorityQueue_PopWithTimer(t *testing.T) { + t.Parallel() + + testCases := map[string]struct { + queueBuilder func() *PriorityQueue + queueModifier func(queue *PriorityQueue, done chan<- struct{}) + timer *time.Timer + transaction *ValidTransaction + }{ + "empty queue polled once": { + // test should last 1ns + queueBuilder: NewPriorityQueue, + timer: time.NewTimer(time.Nanosecond), + }, + "empty queue polled multiple times": { + // test should last 1ms + queueBuilder: func() *PriorityQueue { + queue := NewPriorityQueue() + queue.pollInterval = time.Nanosecond + return queue + }, + timer: time.NewTimer(time.Millisecond), + }, + "queue with one element polled once": { + // test should be instantaneous + queueBuilder: func() *PriorityQueue { + queue := NewPriorityQueue() + queue.Push(&ValidTransaction{Validity: &Validity{Priority: 1}}) + return queue + }, + timer: time.NewTimer(time.Nanosecond), + transaction: &ValidTransaction{Validity: &Validity{Priority: 1}}, + }, + "queue polled multiple times until new element": { + // test should last 1ms + queueBuilder: func() *PriorityQueue { + queue := NewPriorityQueue() + queue.pollInterval = time.Nanosecond + return queue + }, + queueModifier: func(queue *PriorityQueue, done chan<- struct{}) { + close(done) + time.Sleep(time.Millisecond) + queue.Push(&ValidTransaction{Validity: &Validity{Priority: 1}}) + }, + timer: time.NewTimer(time.Second), + transaction: &ValidTransaction{Validity: &Validity{Priority: 1}}, + }, + } + + for name, testCase := range testCases { + testCase := testCase + t.Run(name, func(t *testing.T) { + t.Parallel() + + queue := testCase.queueBuilder() + + modifyDone := make(chan struct{}) + if testCase.queueModifier != nil { + // modify queue asynchronously while popping + go testCase.queueModifier(queue, modifyDone) + } else { + close(modifyDone) + } + + transaction := queue.PopWithTimer(testCase.timer.C) + <-modifyDone + testCase.timer.Stop() + assert.Equal(t, testCase.transaction, transaction) + }) + } +}