Skip to content

Commit

Permalink
statedb2: StateDB v2.0 with per-table locks and deletion tracking
Browse files Browse the repository at this point in the history
This introduces an evolution of pkg/statedb that solves the following
issues:
  * Per-table locking instead of global writer lock for increased resiliency.
  * No more use of reflection in indexing. Indexes are defined in a type-safe
    way.
  * Revisions are built into the database so it does not need to be reimplemented
    by each table.
  * Deletions on a table can now be tracked and a consistent sequence of inserts
    and deletes can be derived from it to e.g. implement retryable reconciliation
    loops.

It is introduced as pkg/statedb2 for easier code reviewing (no need to rewrite
uses of pkg/statedb in this PR) and to allow some time for it to mature before
replacing pkg/statedb.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
  • Loading branch information
joamaki authored and aanm committed Aug 22, 2023
1 parent e7b569b commit 23b0492
Show file tree
Hide file tree
Showing 16 changed files with 2,175 additions and 0 deletions.
57 changes: 57 additions & 0 deletions pkg/statedb2/cell.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package statedb2

import (
"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/hive/cell"
)

// This module provides an in-memory database built on top of immutable radix trees
// As the database is based on an immutable data structure, the objects inserted into
// the database MUST NOT be mutated, but rather copied first!
//
// For example use see pkg/statedb/example.
var Cell = cell.Module(
"statedb",
"In-memory transactional database",

cell.Provide(newHiveDB),
)

type tablesIn struct {
cell.In

TableMetas []TableMeta `group:"statedb-tables"`
}

func newHiveDB(lc hive.Lifecycle, tablesIn tablesIn) (*DB, error) {
db, err := NewDB(tablesIn.TableMetas)
if err != nil {
return nil, err
}
lc.Append(db)
return db, nil
}

type tableOut[Obj any] struct {
cell.Out
Table Table[Obj]
Meta TableMeta `group:"statedb-tables"`
}

// NewTableCell creates a cell for creating and registering a statedb Table[Obj].
func NewTableCell[Obj any](
tableName TableName,
primaryIndexer Indexer[Obj],
secondaryIndexers ...Indexer[Obj],
) cell.Cell {
return cell.Provide(func() (tableOut[Obj], error) {
if table, err := NewTable(tableName, primaryIndexer, secondaryIndexers...); err != nil {
return tableOut[Obj]{}, err
} else {
return tableOut[Obj]{Table: table, Meta: table}, nil
}
})
}
178 changes: 178 additions & 0 deletions pkg/statedb2/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package statedb2

import (
"errors"
"fmt"
"sync/atomic"

iradix "github.com/hashicorp/go-immutable-radix/v2"

"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/lock"
)

// DB provides an in-memory transaction database built on top of immutable radix
// trees. The database supports multiple tables, each with one or more user-defined
// indexes. Readers can access the data locklessly with a simple atomic pointer read
// to obtain a snapshot. On writes to the database table-level locks are acquired
// on target tables and on write transaction commit a root lock is taken to swap
// in the new root with the modified tables.
//
// As data is stored in immutable data structures any objects inserted into
// it MUST NOT be mutated afterwards.
//
// DB holds the "root" tree of tables with each table holding a tree of indexes:
//
// root
// / \
// ba T(foo)
// / \
// T(bar) T(baz)
//
// T(bar).indexes
// / \
// i I(byRevision)
// / \
// I(id) I(ip)
//
// I(ip)
// / \
// 192 172
// / ...
// bar(192.168.1.1)
//
// T = tableEntry
// I = indexTree
//
// To lookup:
// 1. Create a read (or write) transaction
// 2. Find the table from the root tree
// 3. Find the index from the table's index tree
// 4. Find the object from the index
//
// To insert:
// 1. Create write transaction against the target table
// 2. Find the table from the root tree
// 3. Create/reuse write transaction on primary index
// 4. Insert/replace the object into primary index
// 5. Create/reuse write transaction on revision index
// 6. If old object existed, remove from revision index
// 7. If old object existed, remove from graveyard
// 8. Update each secondary index
// 9. Commit transaction by committing each index to
// the table and then committing table to the root.
// Swap the root atomic pointer to new root and
// notify by closing channels of all modified nodes.
//
// To observe deletions:
// 1. Create write transaction against the target table
// 2. Create new delete tracker and add it to the table
// 3. Commit the write transaction to update the table
// with the new delete tracker
// 4. Query the graveyard by revision, starting from the
// revision of the write transaction at which it was
// created.
// 5. For each successfully processed deletion, mark the
// revision to set low watermark for garbage collection.
// 6. Periodically garbage collect the graveyard by finding
// the lowest revision of all delete trackers.
type DB struct {
tables map[TableName]TableMeta
mu lock.Mutex // sequences modifications to the root tree
root atomic.Pointer[iradix.Tree[tableEntry]]
gcTrigger chan struct{} // trigger for graveyard garbage collection
gcExited chan struct{}
}

func NewDB(tables []TableMeta) (*DB, error) {
txn := iradix.New[tableEntry]().Txn()
db := &DB{
tables: make(map[TableName]TableMeta),
gcTrigger: make(chan struct{}, 1),
gcExited: make(chan struct{}),
}
for _, t := range tables {
name := t.Name()
if _, ok := db.tables[name]; ok {
return nil, fmt.Errorf("table %q already exists", name)
}
db.tables[name] = t
var table tableEntry
table.meta = t
table.deleteTrackers = iradix.New[deleteTracker]()
indexTxn := iradix.New[indexTree]().Txn()
indexTxn.Insert([]byte(t.primaryIndexer().name), iradix.New[object]())
indexTxn.Insert([]byte(RevisionIndex), iradix.New[object]())
indexTxn.Insert([]byte(GraveyardIndex), iradix.New[object]())
indexTxn.Insert([]byte(GraveyardRevisionIndex), iradix.New[object]())
for index := range t.secondaryIndexers() {
indexTxn.Insert([]byte(index), iradix.New[object]())
}
table.indexes = indexTxn.CommitOnly()
txn.Insert(t.tableKey(), table)
}
db.root.Store(txn.CommitOnly())

return db, nil
}

// ReadTxn constructs a new read transaction for performing reads against
// a snapshot of the database.
//
// ReadTxn is not thread-safe!
func (db *DB) ReadTxn() ReadTxn {
return &txn{
db: db,
rootReadTxn: db.root.Load().Txn(),
}
}

// WriteTxn constructs a new write transaction against the given set of tables.
// Each table is locked, which may block until the table locks are acquired.
// The modifications performed in the write transaction are not visible outside
// it until Commit() is called. To discard the changes call Abort().
//
// WriteTxn is not thread-safe!
func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
allTables := append(tables, table)
smus := lock.SortableMutexes{}
for _, table := range allTables {
smus = append(smus, table.sortableMutex())
}
smus.Lock()

rootReadTxn := db.root.Load().Txn()
tableEntries := make(map[TableName]*tableEntry, len(tables))
for _, table := range allTables {
tableEntry, ok := rootReadTxn.Get(table.tableKey())
if !ok {
panic("BUG: Table '" + table.Name() + "' not found")
}
tableEntries[table.Name()] = &tableEntry
}
return &txn{
db: db,
rootReadTxn: rootReadTxn,
tables: tableEntries,
writeTxns: make(map[tableIndex]*iradix.Txn[object]),
smus: smus,
}
}

func (db *DB) Start(hive.HookContext) error {
go graveyardWorker(db)
return nil
}

func (db *DB) Stop(ctx hive.HookContext) error {
close(db.gcTrigger)
select {
case <-ctx.Done():
return errors.New("timed out waiting for graveyard worker to exit")
case <-db.gcExited:
}
return nil
}
Loading

0 comments on commit 23b0492

Please sign in to comment.