Skip to content

Commit

Permalink
Simplify batches.
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <cody@eigenlabs.org>
  • Loading branch information
cody-littley committed Oct 18, 2024
1 parent 4ddfa37 commit 91e8ff8
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 45 deletions.
14 changes: 7 additions & 7 deletions common/kvstore/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import "time"
// Batch is a collection of key / value pairs that will be written atomically to a database.
// Although it is thread safe to modify different batches in parallel or to modify a batch while
// the store is being modified, it is not thread safe to concurrently modify the same batch.
type Batch[K any] interface {
type Batch interface {
// Put stores the given key / value pair in the batch, overwriting any existing value for that key.
// If nil is passed as the value, a byte slice of length 0 will be stored.
Put(key K, value []byte)
Put(key []byte, value []byte)
// Delete removes the key from the batch.
Delete(key K)
Delete(key []byte)
// Apply atomically writes all the key / value pairs in the batch to the database.
Apply() error
// Size returns the number of operations in the batch.
Expand All @@ -21,12 +21,12 @@ type Batch[K any] interface {
// time-to-live (TTL) or expiration times. Although it is thread safe to modify different batches in
// parallel or to modify a batch while the store is being modified, it is not thread safe to concurrently
// modify the same batch.
type TTLBatch[K any] interface {
Batch[K]
type TTLBatch interface {
Batch
// PutWithTTL stores the given key / value pair in the batch with a time-to-live (TTL) or expiration time.
// If nil is passed as the value, a byte slice of length 0 will be stored.
PutWithTTL(key K, value []byte, ttl time.Duration)
PutWithTTL(key []byte, value []byte, ttl time.Duration)
// PutWithExpiration stores the given key / value pair in the batch with an expiration time.
// If nil is passed as the value, a byte slice of length 0 will be stored.
PutWithExpiration(key K, value []byte, expiryTime time.Time)
PutWithExpiration(key []byte, value []byte, expiryTime time.Time)
}
2 changes: 1 addition & 1 deletion common/kvstore/leveldb/leveldb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (store *levelDBStore) WriteBatch(keys [][]byte, values [][]byte) error {
}

// NewBatch creates a new batch for the store.
func (store *levelDBStore) NewBatch() kvstore.StoreBatch {
func (store *levelDBStore) NewBatch() kvstore.Batch {
return &levelDBBatch{
store: store,
batch: new(leveldb.Batch),
Expand Down
2 changes: 1 addition & 1 deletion common/kvstore/mapstore/map_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (store *mapStore) WriteBatch(keys, values [][]byte) error {
}

// NewBatch creates a new batch for the store.
func (store *mapStore) NewBatch() kvstore.StoreBatch {
func (store *mapStore) NewBatch() kvstore.Batch {
return &batch{
store: store,
keys: make([][]byte, 0),
Expand Down
5 changes: 1 addition & 4 deletions common/kvstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
// ErrNotFound is returned when a key is not found in the database.
var ErrNotFound = errors.New("not found")

// StoreBatch is a collection of operations that can be applied atomically to a Store.
type StoreBatch Batch[[]byte]

// Store implements a key-value store. May be backed by a database like LevelDB.
//
// Implementations of this interface are expected to be thread-safe.
Expand All @@ -28,7 +25,7 @@ type Store interface {
Delete(key []byte) error

// NewBatch creates a new batch that can be used to perform multiple operations atomically.
NewBatch() StoreBatch
NewBatch() Batch

// NewIterator returns an iterator that can be used to iterate over a subset of the keys in the database.
// Only keys with the given prefix will be iterated. The iterator must be closed by calling Release() when done.
Expand Down
22 changes: 5 additions & 17 deletions common/kvstore/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@ import (
// ErrTableNotFound is returned when a table is not found.
var ErrTableNotFound = errors.New("table not found")

// TTLStoreBatch is a collection of key / value pairs that will be written atomically to a database with
// time-to-live (TTL) or expiration times. Although it is thread safe to modify different batches in
// parallel or to modify a batch while the store is being modified, it is not thread safe to concurrently
// modify the same batch.
type TTLStoreBatch TTLBatch[[]byte]

// Table can be used to operate on data in a specific table in a TableStore.
type Table interface {
Store
Expand All @@ -22,8 +16,9 @@ type Table interface {
Name() string

// TableKey creates a new key scoped to this table that can be used for TableStoreBatch
// operations that modify this table.
TableKey(key []byte) TableKey
// operations that modify this table. Using keys in TableStore batches that are not created using this method
// has undefined behavior. Use of this method in a TableStoreBatch is not optional.
TableKey(key []byte) []byte

// PutWithTTL adds a key-value pair to the store that expires after a specified duration.
// Key is eventually deleted after the TTL elapses.
Expand All @@ -41,16 +36,9 @@ type Table interface {

// NewTTLBatch creates a new TTLBatch that can be used to perform multiple operations atomically.
// Use this instead of NewBatch to create a batch that supports TTL/expiration.
NewTTLBatch() TTLStoreBatch
NewTTLBatch() TTLBatch
}

// TableKey is a key scoped to a particular table. It can be used to perform batch operations that modify multiple
// table keys atomically.
type TableKey []byte

// TableStoreBatch is a collection of operations that can be applied atomically to a TableStore.
type TableStoreBatch TTLBatch[TableKey]

// TableStore implements a key-value store, with the addition of the abstraction of tables.
// A "table" in this context is a disjoint keyspace. Keys in one table to not collide with keys in another table,
// and keys within a particular table can be iterated over efficiently.
Expand All @@ -69,7 +57,7 @@ type TableStore interface {
GetTables() []Table

// NewBatch creates a new batch that can be used to perform multiple operations across tables atomically.
NewBatch() TableStoreBatch
NewBatch() TTLBatch

// Shutdown shuts down the store, flushing any remaining data to disk.
Shutdown() error
Expand Down
2 changes: 1 addition & 1 deletion common/kvstore/tablestore/table_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (t *tableStore) GetTables() []kvstore.Table {
}

// NewBatch creates a new batch for writing to the store.
func (t *tableStore) NewBatch() kvstore.TableStoreBatch {
func (t *tableStore) NewBatch() kvstore.TTLBatch {
return &tableStoreBatch{
batch: t.base.NewBatch(),
expirationTable: t.expirationTable,
Expand Down
12 changes: 6 additions & 6 deletions common/kvstore/tablestore/table_store_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,38 @@ import (
"time"
)

var _ kvstore.TableStoreBatch = &tableStoreBatch{}
var _ kvstore.TTLBatch = &tableStoreBatch{}

// tableStoreBatch is a batch for writing to a table store.
type tableStoreBatch struct {
batch kvstore.StoreBatch
batch kvstore.Batch
expirationTable kvstore.Table
}

// PutWithTTL adds a key-value pair to the batch that expires after a specified duration.
func (t *tableStoreBatch) PutWithTTL(key kvstore.TableKey, value []byte, ttl time.Duration) {
func (t *tableStoreBatch) PutWithTTL(key []byte, value []byte, ttl time.Duration) {
expirationTime := time.Now().Add(ttl)
t.PutWithExpiration(key, value, expirationTime)
}

// PutWithExpiration adds a key-value pair to the batch that expires at a specified time.
func (t *tableStoreBatch) PutWithExpiration(key kvstore.TableKey, value []byte, expiryTime time.Time) {
func (t *tableStoreBatch) PutWithExpiration(key []byte, value []byte, expiryTime time.Time) {
expirationKey := t.expirationTable.TableKey(prependTimestamp(expiryTime, key))

t.Put(key, value)
t.Put(expirationKey, make([]byte, 0))
}

// Put adds a key-value pair to the batch.
func (t *tableStoreBatch) Put(key kvstore.TableKey, value []byte) {
func (t *tableStoreBatch) Put(key []byte, value []byte) {
if value == nil {
value = []byte{}
}
t.batch.Put(key, value)
}

// Delete removes a key-value pair from the batch.
func (t *tableStoreBatch) Delete(key kvstore.TableKey) {
func (t *tableStoreBatch) Delete(key []byte) {
t.batch.Delete(key)
}

Expand Down
2 changes: 1 addition & 1 deletion common/kvstore/tablestore/table_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ func (e *explodingStore) Delete(key []byte) error {
return e.base.Delete(key)
}

func (e *explodingStore) NewBatch() kvstore.StoreBatch {
func (e *explodingStore) NewBatch() kvstore.Batch {
panic("not used")
}

Expand Down
10 changes: 5 additions & 5 deletions common/kvstore/tablestore/table_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type tableView struct {
// destroy is a function that destroys the table store.
destroy func() error
// newBatch builds batches for the table store.
newBatch func() kvstore.TableStoreBatch
newBatch func() kvstore.TTLBatch
}

// NewTableView creates a new view into a table in a New.
Expand All @@ -33,7 +33,7 @@ func newTableView(
prefix uint32,
shutdown func() error,
destroy func() error,
newBatch func() kvstore.TableStoreBatch) kvstore.Table {
newBatch func() kvstore.TTLBatch) kvstore.Table {

return &tableView{
base: base,
Expand Down Expand Up @@ -163,22 +163,22 @@ func (t *tableView) Destroy() error {
}

// NewTTLBatch creates a new batch for the table with time-to-live (TTL) or expiration times.
func (t *tableView) NewTTLBatch() kvstore.TTLStoreBatch {
func (t *tableView) NewTTLBatch() kvstore.TTLBatch {
return &tableViewBatch{
table: t,
batch: t.newBatch(),
}
}

// NewBatch creates a new batch for the table.
func (t *tableView) NewBatch() kvstore.StoreBatch {
func (t *tableView) NewBatch() kvstore.Batch {
// This method is a simple alias for NewTTLBatch. We inherit the need to implement this function from the base
// interface, but we don't need to do anything special here.
return t.NewTTLBatch()
}

// TableKey creates a key scoped to this table.
func (t *tableView) TableKey(key []byte) kvstore.TableKey {
func (t *tableView) TableKey(key []byte) []byte {
result := make([]byte, 4+len(key))
binary.BigEndian.PutUint32(result, t.prefix)
copy(result[4:], key)
Expand Down
4 changes: 2 additions & 2 deletions common/kvstore/tablestore/table_view_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"time"
)

var _ kvstore.TTLStoreBatch = &tableViewBatch{}
var _ kvstore.TTLBatch = &tableViewBatch{}

// tableViewBatch is a batch for a table in a TableStore.
type tableViewBatch struct {
table kvstore.Table
batch kvstore.TableStoreBatch
batch kvstore.TTLBatch
}

// PutWithTTL schedules a key-value pair to be added to the table with a time-to-live (TTL).
Expand Down

0 comments on commit 91e8ff8

Please sign in to comment.