Skip to content

Commit

Permalink
revert LRUEjector and remove random ejection fallback
Browse files Browse the repository at this point in the history
  • Loading branch information
Tarak Ben Youssef committed Jul 12, 2023
1 parent 67b4390 commit 1f1a341
Show file tree
Hide file tree
Showing 2 changed files with 298 additions and 0 deletions.
68 changes: 68 additions & 0 deletions module/mempool/stdmap/eject.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package stdmap

import (
"fmt"
"math"
"sort"
"sync"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/rand"
Expand Down Expand Up @@ -96,3 +98,69 @@ func EjectRandomFast(b *Backend) (bool, error) {
func EjectPanic(b *Backend) (flow.Identifier, flow.Entity, bool) {
panic("unexpected: mempool size over the limit")
}

// LRUEjector provides a swift FIFO ejection functionality
type LRUEjector struct {
sync.Mutex
table map[flow.Identifier]uint64 // keeps sequence number of entities it tracks
seqNum uint64 // keeps the most recent sequence number
}

func NewLRUEjector() *LRUEjector {
return &LRUEjector{
table: make(map[flow.Identifier]uint64),
seqNum: 0,
}
}

// Track should be called every time a new entity is added to the mempool.
// It tracks the entity for later ejection.
func (q *LRUEjector) Track(entityID flow.Identifier) {
q.Lock()
defer q.Unlock()

if _, ok := q.table[entityID]; ok {
// skips adding duplicate item
return
}

// TODO current table structure provides O(1) track and untrack features
// however, the Eject functionality is asymptotically O(n).
// With proper resource cleanups by the mempools, the Eject is supposed
// as a very infrequent operation. However, further optimizations on
// Eject efficiency is needed.
q.table[entityID] = q.seqNum
q.seqNum++
}

// Untrack simply removes the tracker of the ejector off the entityID
func (q *LRUEjector) Untrack(entityID flow.Identifier) {
q.Lock()
defer q.Unlock()

delete(q.table, entityID)
}

// Eject implements EjectFunc for LRUEjector. It finds the entity with the lowest sequence number (i.e.,
// the oldest entity). It also untracks. This is using a linear search
func (q *LRUEjector) Eject(b *Backend) flow.Identifier {
q.Lock()
defer q.Unlock()

// finds the oldest entity
oldestSQ := uint64(math.MaxUint64)
var oldestID flow.Identifier
for _, id := range b.backData.Identifiers() {
if sq, ok := q.table[id]; ok {
if sq < oldestSQ {
oldestID = id
oldestSQ = sq
}
}
}

// untracks the oldest id as it is supposed to be ejected
delete(q.table, oldestID)

return oldestID
}
230 changes: 230 additions & 0 deletions module/mempool/stdmap/eject_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package stdmap

import (
crand "crypto/rand"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/unittest"
)

// TestLRUEjector_Track evaluates that tracking a new item adds the item to the ejector table.
func TestLRUEjector_Track(t *testing.T) {
ejector := NewLRUEjector()
// ejector's table should be empty
assert.Len(t, ejector.table, 0)

// sequence number of ejector should initially be zero
assert.Equal(t, ejector.seqNum, uint64(0))

// creates adds an item to the ejector
item := flow.Identifier{0x00}
ejector.Track(item)

// size of ejector's table should be one
// which indicates that ejector is tracking the item
assert.Len(t, ejector.table, 1)

// item should reside in the ejector's table
_, ok := ejector.table[item]
assert.True(t, ok)

// sequence number of ejector should be increased by one
assert.Equal(t, ejector.seqNum, uint64(1))
}

// TestLRUEjector_Track_Duplicate evaluates that tracking a duplicate item
// does not change the internal state of the ejector.
func TestLRUEjector_Track_Duplicate(t *testing.T) {
ejector := NewLRUEjector()

// creates adds an item to the ejector
item := flow.Identifier{0x00}
ejector.Track(item)

// size of ejector's table should be one
// which indicates that ejector is tracking the item
assert.Len(t, ejector.table, 1)

// item should reside in the ejector's table
_, ok := ejector.table[item]
assert.True(t, ok)

// sequence number of ejector should be increased by one
assert.Equal(t, ejector.seqNum, uint64(1))

// adds the duplicate item
ejector.Track(item)

// internal state of the ejector should be unchaged
assert.Len(t, ejector.table, 1)
assert.Equal(t, ejector.seqNum, uint64(1))
_, ok = ejector.table[item]
assert.True(t, ok)
}

// TestLRUEjector_Track_Many evaluates that tracking many items
// changes the state of ejector properly, i.e., items reside on the
// memory, and sequence number changed accordingly.
func TestLRUEjector_Track_Many(t *testing.T) {
ejector := NewLRUEjector()

// creates and tracks 100 items
size := 100
items := flow.IdentifierList{}
for i := 0; i < size; i++ {
var id flow.Identifier
_, _ = crand.Read(id[:])
ejector.Track(id)
items = append(items, id)
}

// size of ejector's table should be 100
assert.Len(t, ejector.table, size)

// all items should reside in the ejector's table
for _, id := range items {
_, ok := ejector.table[id]
require.True(t, ok)
}

// sequence number of ejector should be increased by size
assert.Equal(t, ejector.seqNum, uint64(size))
}

// TestLRUEjector_Untrack_One evaluates that untracking an existing item
// removes it from the ejector state and changes the state accordingly.
func TestLRUEjector_Untrack_One(t *testing.T) {
ejector := NewLRUEjector()

// creates adds an item to the ejector
item := flow.Identifier{0x00}
ejector.Track(item)

// size of ejector's table should be one
// which indicates that ejector is tracking the item
assert.Len(t, ejector.table, 1)

// item should reside in the ejector's table
_, ok := ejector.table[item]
assert.True(t, ok)

// sequence number of ejector should be increased by one
assert.Equal(t, ejector.seqNum, uint64(1))

// untracks the item
ejector.Untrack(item)

// internal state of the ejector should be changed
assert.Len(t, ejector.table, 0)

// sequence number should not be changed
assert.Equal(t, ejector.seqNum, uint64(1))

// item should no longer reside on internal state of ejector
_, ok = ejector.table[item]
assert.False(t, ok)
}

// TestLRUEjector_Untrack_Duplicate evaluates that untracking an item twice
// removes it from the ejector state only once and changes the state safely.
func TestLRUEjector_Untrack_Duplicate(t *testing.T) {
ejector := NewLRUEjector()

// creates and adds two items to the ejector
item1 := flow.Identifier{0x00}
item2 := flow.Identifier{0x01}
ejector.Track(item1)
ejector.Track(item2)

// size of ejector's table should be two
// which indicates that ejector is tracking the items
assert.Len(t, ejector.table, 2)

// items should reside in the ejector's table
_, ok := ejector.table[item1]
assert.True(t, ok)
_, ok = ejector.table[item2]
assert.True(t, ok)

// sequence number of ejector should be increased by two
assert.Equal(t, ejector.seqNum, uint64(2))

// untracks the item twice
ejector.Untrack(item1)
ejector.Untrack(item1)

// internal state of the ejector should be changed
assert.Len(t, ejector.table, 1)

// sequence number should not be changed
assert.Equal(t, ejector.seqNum, uint64(2))

// double untracking should only affect the untracked item1
_, ok = ejector.table[item1]
assert.False(t, ok)

// item 2 should still reside in the memory
_, ok = ejector.table[item2]
assert.True(t, ok)
}

// TestLRUEjector_UntrackEject evaluates that untracking the next ejectable item
// properly changes the next ejectable item in the ejector.
func TestLRUEjector_UntrackEject(t *testing.T) {
ejector := NewLRUEjector()

// creates and tracks 100 items
size := 100
backEnd := NewBackend()

items := make([]flow.Identifier, size)

for i := 0; i < size; i++ {
mockEntity := unittest.MockEntityFixture()
require.True(t, backEnd.Add(mockEntity))

id := mockEntity.ID()
ejector.Track(id)
items[i] = id
}

// untracks the oldest item
ejector.Untrack(items[0])

// next ejectable item should be the second oldest item
id := ejector.Eject(backEnd)
assert.Equal(t, id, items[1])
}

// TestLRUEjector_EjectAll adds many item to the ejector and then ejects them
// all one by one and evaluates an LRU ejection behavior.
func TestLRUEjector_EjectAll(t *testing.T) {
ejector := NewLRUEjector()

// creates and tracks 100 items
size := 100
backEnd := NewBackend()

items := make([]flow.Identifier, size)

for i := 0; i < size; i++ {
mockEntity := unittest.MockEntityFixture()
require.True(t, backEnd.Add(mockEntity))

id := mockEntity.ID()
ejector.Track(id)
items[i] = id
}

require.Equal(t, uint(size), backEnd.Size())

// ejects one by one
for i := 0; i < size; i++ {
id := ejector.Eject(backEnd)
require.Equal(t, id, items[i])
}
}

0 comments on commit 1f1a341

Please sign in to comment.