Skip to content

Commit

Permalink
op-supervisor: fix db locking, fix crossdb usage
Browse files Browse the repository at this point in the history
  • Loading branch information
protolambda committed Oct 31, 2024
1 parent a8b2276 commit f6c00d4
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 161 deletions.
48 changes: 48 additions & 0 deletions op-service/locks/rwmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package locks

import "sync"

// RWMap is a simple wrapper around a map, with global Read-Write protection.
// For many concurrent reads/writes a sync.Map may be more performant,
// although it does not utilize Go generics.
// The RWMap does not have to be initialized,
// it is immediately ready for reads/writes.
type RWMap[K comparable, V any] struct {
inner map[K]V
mu sync.RWMutex
}

func (m *RWMap[K, V]) Has(key K) (ok bool) {
m.mu.RLock()
defer m.mu.RUnlock()
_, ok = m.inner[key]
return
}

func (m *RWMap[K, V]) Get(key K) (value V, ok bool) {
m.mu.RLock()
defer m.mu.RUnlock()
value, ok = m.inner[key]
return
}

func (m *RWMap[K, V]) Set(key K, value V) {
m.mu.Lock()
defer m.mu.Unlock()
if m.inner == nil {
m.inner = make(map[K]V)
}
m.inner[key] = value
}

// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
func (m *RWMap[K, V]) Range(f func(key K, value V) bool) {
m.mu.RLock()
defer m.mu.RUnlock()
for k, v := range m.inner {
if !f(k, v) {
break
}
}
}
52 changes: 52 additions & 0 deletions op-service/locks/rwmap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package locks

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestRWMap(t *testing.T) {
m := &RWMap[uint64, int64]{}

// get on new map
v, ok := m.Get(123)
require.False(t, ok)
require.Equal(t, int64(0), v)

// set a value
m.Set(123, 42)
v, ok = m.Get(123)
require.True(t, ok)
require.Equal(t, int64(42), v)

// overwrite a value
m.Set(123, -42)
v, ok = m.Get(123)
require.True(t, ok)
require.Equal(t, int64(-42), v)

// add a value
m.Set(10, 100)

// range over values
got := make(map[uint64]int64)
m.Range(func(key uint64, value int64) bool {
if _, ok := got[key]; ok {
panic("duplicate")
}
got[key] = value
return true
})
require.Len(t, got, 2)
require.Equal(t, int64(100), got[uint64(10)])
require.Equal(t, int64(-42), got[uint64(123)])

// range and stop early
clear(got)
m.Range(func(key uint64, value int64) bool {
got[key] = value
return false
})
require.Len(t, got, 1, "stop early")
}
24 changes: 24 additions & 0 deletions op-service/locks/rwvalue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package locks

import "sync"

// RWValue is a simple container struct, to deconflict reads/writes of the value,
// without locking up a bigger structure in the caller.
// It exposes the underlying RWLock and Value for direct access where needed.
type RWValue[E any] struct {
sync.RWMutex
Value E
}

func (c *RWValue[E]) Get() (out E) {
c.RLock()
defer c.RUnlock()
out = c.Value
return
}

func (c *RWValue[E]) Set(v E) {
c.Lock()
defer c.Unlock()
c.Value = v
}
16 changes: 16 additions & 0 deletions op-service/locks/rwvalue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package locks

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestRWValue(t *testing.T) {
v := &RWValue[uint64]{}
require.Equal(t, uint64(0), v.Get())
v.Set(123)
require.Equal(t, uint64(123), v.Get())
v.Set(42)
require.Equal(t, uint64(42), v.Get())
}
77 changes: 27 additions & 50 deletions op-supervisor/supervisor/backend/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"errors"
"fmt"
"io"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/locks"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/fromda"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
Expand Down Expand Up @@ -73,28 +73,23 @@ var _ LogStorage = (*logs.DB)(nil)
// ChainsDB is a database that stores logs and derived-from data for multiple chains.
// it implements the LogStorage interface, as well as several DB interfaces needed by the cross package.
type ChainsDB struct {
// RW mutex:
// Read = chains can be read / mutated.
// Write = set of chains is changing.
mu sync.RWMutex

// unsafe info: the sequence of block seals and events
logDBs map[types.ChainID]LogStorage
logDBs locks.RWMap[types.ChainID, LogStorage]

// cross-unsafe: how far we have processed the unsafe data.
// If present but set to a zeroed value the cross-unsafe will fallback to cross-safe.
crossUnsafe map[types.ChainID]types.BlockSeal
crossUnsafe locks.RWMap[types.ChainID, *locks.RWValue[types.BlockSeal]]

// local-safe: index of what we optimistically know about L2 blocks being derived from L1
localDBs map[types.ChainID]LocalDerivedFromStorage
localDBs locks.RWMap[types.ChainID, LocalDerivedFromStorage]

// cross-safe: index of L2 blocks we know to only have cross-L2 valid dependencies
crossDBs map[types.ChainID]CrossDerivedFromStorage
crossDBs locks.RWMap[types.ChainID, CrossDerivedFromStorage]

// finalized: the L1 finality progress. This can be translated into what may be considered as finalized in L2.
// It is initially zeroed, and the L2 finality query will return
// an error until it has this L1 finality to work with.
finalizedL1 eth.L1BlockRef
finalizedL1 locks.RWValue[eth.L1BlockRef]

// depSet is the dependency set, used to determine what may be tracked,
// what is missing, and to provide it to DB users.
Expand All @@ -105,93 +100,75 @@ type ChainsDB struct {

func NewChainsDB(l log.Logger, depSet depset.DependencySet) *ChainsDB {
return &ChainsDB{
logDBs: make(map[types.ChainID]LogStorage),
logger: l,
localDBs: make(map[types.ChainID]LocalDerivedFromStorage),
crossDBs: make(map[types.ChainID]CrossDerivedFromStorage),
crossUnsafe: make(map[types.ChainID]types.BlockSeal),
depSet: depSet,
logger: l,
depSet: depSet,
}
}

func (db *ChainsDB) AddLogDB(chainID types.ChainID, logDB LogStorage) {
db.mu.Lock()
defer db.mu.Unlock()

if _, ok := db.logDBs[chainID]; ok {
if db.logDBs.Has(chainID) {
db.logger.Warn("overwriting existing log DB for chain", "chain", chainID)
}

db.logDBs[chainID] = logDB
db.logDBs.Set(chainID, logDB)
}

func (db *ChainsDB) AddLocalDerivedFromDB(chainID types.ChainID, dfDB LocalDerivedFromStorage) {
db.mu.Lock()
defer db.mu.Unlock()

if _, ok := db.localDBs[chainID]; ok {
if db.localDBs.Has(chainID) {
db.logger.Warn("overwriting existing local derived-from DB for chain", "chain", chainID)
}

db.localDBs[chainID] = dfDB
db.localDBs.Set(chainID, dfDB)
}

func (db *ChainsDB) AddCrossDerivedFromDB(chainID types.ChainID, dfDB CrossDerivedFromStorage) {
db.mu.Lock()
defer db.mu.Unlock()

if _, ok := db.crossDBs[chainID]; ok {
if db.crossDBs.Has(chainID) {
db.logger.Warn("overwriting existing cross derived-from DB for chain", "chain", chainID)
}

db.crossDBs[chainID] = dfDB
db.crossDBs.Set(chainID, dfDB)
}

func (db *ChainsDB) AddCrossUnsafeTracker(chainID types.ChainID) {
db.mu.Lock()
defer db.mu.Unlock()

if _, ok := db.crossUnsafe[chainID]; ok {
if db.crossUnsafe.Has(chainID) {
db.logger.Warn("overwriting existing cross-unsafe tracker for chain", "chain", chainID)
}
db.crossUnsafe[chainID] = types.BlockSeal{}
db.crossUnsafe.Set(chainID, &locks.RWValue[types.BlockSeal]{})
}

// ResumeFromLastSealedBlock prepares the chains db to resume recording events after a restart.
// It rewinds the database to the last block that is guaranteed to have been fully recorded to the database,
// to ensure it can resume recording from the first log of the next block.
func (db *ChainsDB) ResumeFromLastSealedBlock() error {
db.mu.RLock()
defer db.mu.RUnlock()

for chain, logStore := range db.logDBs {
var result error
db.logDBs.Range(func(chain types.ChainID, logStore LogStorage) bool {
headNum, ok := logStore.LatestSealedBlockNum()
if !ok {
// db must be empty, nothing to rewind to
db.logger.Info("Resuming, but found no DB contents", "chain", chain)
continue
return true
}
db.logger.Info("Resuming, starting from last sealed block", "head", headNum)
if err := logStore.Rewind(headNum); err != nil {
return fmt.Errorf("failed to rewind chain %s to sealed block %d", chain, headNum)
result = fmt.Errorf("failed to rewind chain %s to sealed block %d", chain, headNum)
return false
}
}
return nil
return true
})
return result
}

func (db *ChainsDB) DependencySet() depset.DependencySet {
return db.depSet
}

func (db *ChainsDB) Close() error {
db.mu.Lock()
defer db.mu.Unlock()

var combined error
for id, logDB := range db.logDBs {
db.logDBs.Range(func(id types.ChainID, logDB LogStorage) bool {
if err := logDB.Close(); err != nil {
combined = errors.Join(combined, fmt.Errorf("failed to close log db for chain %v: %w", id, err))
}
}
return true
})
return combined
}
Loading

0 comments on commit f6c00d4

Please sign in to comment.