Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve TState #1834

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions api/jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,12 @@ func (j *JSONRPCServer) ExecuteActions(
storage[string(storageKeysToRead[i])] = value
}

tsv := ts.NewView(stateKeysWithPermissions, storage)
tsv := ts.NewView(
state.NewDefaultScope(
stateKeysWithPermissions,
storage,
),
)

output, err := action.Execute(
ctx,
Expand Down Expand Up @@ -292,10 +297,23 @@ func (j *JSONRPCServer) SimulateActions(
return err
}

ts := tstate.New(0)
scope := state.NewSimulatedScope(
state.Keys{},
currentState,
)
tsv := ts.NewView(scope)

currentTime := time.Now().UnixMilli()
for _, action := range actions {
recorder := tstate.NewRecorder(currentState)
actionOutput, err := action.Execute(ctx, j.vm.Rules(currentTime), recorder, currentTime, args.Actor, ids.Empty)
actionOutput, err := action.Execute(
ctx,
j.vm.Rules(currentTime),
tsv,
currentTime,
args.Actor,
ids.Empty,
)

var actionResult SimulateActionResult
if actionOutput == nil {
Expand All @@ -309,9 +327,9 @@ func (j *JSONRPCServer) SimulateActions(
if err != nil {
return err
}
actionResult.StateKeys = recorder.GetStateKeys()
actionResult.StateKeys = scope.StateKeys()
reply.ActionResults = append(reply.ActionResults, actionResult)
currentState = recorder
scope.Flush()
}
return nil
}
Expand Down
22 changes: 16 additions & 6 deletions chain/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,12 @@ func (c *Builder) BuildBlock(ctx context.Context, parentView state.View, parent
}

// Execute block
tsv := ts.NewView(stateKeys, storage)
tsv := ts.NewView(
state.NewDefaultScope(
stateKeys,
storage,
),
)
if err := tx.PreExecute(ctx, feeManager, c.balanceHandler, r, tsv, nextTime); err != nil {
// We don't need to rollback [tsv] here because it will never
// be committed.
Expand Down Expand Up @@ -408,11 +413,16 @@ func (c *Builder) BuildBlock(ctx context.Context, parentView state.View, parent
keys.Add(heightKeyStr, state.Write)
keys.Add(timestampKeyStr, state.Write)
keys.Add(feeKeyStr, state.Write)
tsv := ts.NewView(keys, map[string][]byte{
heightKeyStr: binary.BigEndian.AppendUint64(nil, parent.Hght),
timestampKeyStr: binary.BigEndian.AppendUint64(nil, uint64(parent.Tmstmp)),
feeKeyStr: parentFeeManager.Bytes(),
})
tsv := ts.NewView(
state.NewDefaultScope(
keys,
map[string][]byte{
heightKeyStr: binary.BigEndian.AppendUint64(nil, parent.Hght),
timestampKeyStr: binary.BigEndian.AppendUint64(nil, uint64(parent.Tmstmp)),
feeKeyStr: parentFeeManager.Bytes(),
},
),
)
if err := tsv.Insert(ctx, heightKey, binary.BigEndian.AppendUint64(nil, height)); err != nil {
return nil, nil, nil, fmt.Errorf("%w: unable to insert height", err)
}
Expand Down
25 changes: 17 additions & 8 deletions chain/chaintest/balance_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/codec/codectest"
"github.com/ava-labs/hypersdk/consts"
"github.com/ava-labs/hypersdk/state"
"github.com/ava-labs/hypersdk/state/tstate"
)

Expand Down Expand Up @@ -74,8 +75,10 @@ func TestBalanceHandler(t *testing.T, ctx context.Context, bf func() chain.Balan

ts := tstate.New(1)
tsv := ts.NewView(
bh.SponsorStateKeys(addrOne),
ms.Storage,
state.NewDefaultScope(
bh.SponsorStateKeys(addrOne),
ms.Storage,
),
)

r.NoError(bh.Deduct(ctx, addrOne, tsv, 1))
Expand All @@ -94,8 +97,10 @@ func TestBalanceHandler(t *testing.T, ctx context.Context, bf func() chain.Balan

ts := tstate.New(1)
tsv := ts.NewView(
bh.SponsorStateKeys(addrOne),
ms.Storage,
state.NewDefaultScope(
bh.SponsorStateKeys(addrOne),
ms.Storage,
),
)

r.Error(bh.Deduct(ctx, addrOne, tsv, 2))
Expand All @@ -114,8 +119,10 @@ func TestBalanceHandler(t *testing.T, ctx context.Context, bf func() chain.Balan

ts := tstate.New(1)
tsv := ts.NewView(
bh.SponsorStateKeys(addrOne),
ms.Storage,
state.NewDefaultScope(
bh.SponsorStateKeys(addrOne),
ms.Storage,
),
)

r.NoError(bh.CanDeduct(ctx, addrOne, tsv, 1))
Expand All @@ -134,8 +141,10 @@ func TestBalanceHandler(t *testing.T, ctx context.Context, bf func() chain.Balan

ts := tstate.New(1)
tsv := ts.NewView(
bh.SponsorStateKeys(addrOne),
ms.Storage,
state.NewDefaultScope(
bh.SponsorStateKeys(addrOne),
ms.Storage,
),
)

r.Error(bh.CanDeduct(ctx, addrOne, tsv, 2))
Expand Down
24 changes: 17 additions & 7 deletions chain/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,16 @@ func (p *Processor) Execute(
keys.Add(heightKeyStr, state.Write)
keys.Add(timestampKeyStr, state.Write)
keys.Add(feeKeyStr, state.Write)
tsv := ts.NewView(keys, map[string][]byte{
heightKeyStr: parentHeightRaw,
timestampKeyStr: parentTimestampRaw,
feeKeyStr: parentFeeManager.Bytes(),
})
tsv := ts.NewView(
state.NewDefaultScope(
keys,
map[string][]byte{
heightKeyStr: parentHeightRaw,
timestampKeyStr: parentTimestampRaw,
feeKeyStr: parentFeeManager.Bytes(),
},
),
)
if err := tsv.Insert(ctx, heightKey, binary.BigEndian.AppendUint64(nil, b.Hght)); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -337,7 +342,7 @@ func (p *Processor) executeTxs(

// Prefetch state keys from disk
txID := tx.GetID()
if err := f.Fetch(ctx, txID, stateKeys); err != nil {
if err := f.Fetch(ctx, txID, stateKeys.Strip()); err != nil {
return nil, nil, err
}
e.Run(stateKeys, func() error {
Expand All @@ -351,7 +356,12 @@ func (p *Processor) executeTxs(
//
// It is critical we explicitly set the scope before each transaction is
// processed
tsv := ts.NewView(stateKeys, storage)
tsv := ts.NewView(
state.NewDefaultScope(
stateKeys,
storage,
),
)

// Ensure we have enough funds to pay fees
if err := tx.PreExecute(ctx, feeManager, p.balanceHandler, r, tsv, t); err != nil {
Expand Down
16 changes: 8 additions & 8 deletions internal/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Fetcher struct {
type tx struct {
blockers int
waiter chan struct{}
keys state.Keys
keys []string
}

type task struct {
Expand Down Expand Up @@ -140,18 +140,18 @@ func (f *Fetcher) handleErr(err error) {
// Fetch can be called concurrently.
//
// Invariant: Don't call [Fetch] afer calling [Stop] or [Wait]
func (f *Fetcher) Fetch(ctx context.Context, txID ids.ID, stateKeys state.Keys) error {
func (f *Fetcher) Fetch(ctx context.Context, txID ids.ID, keys []string) error {
f.l.Lock()
if f.err != nil {
f.l.Unlock()
return f.err
}
var (
tx = &tx{keys: stateKeys}
tasks = make([]*task, 0, len(stateKeys))
tx = &tx{keys: keys}
tasks = make([]*task, 0, len(keys))
blockers = 0
)
for k := range stateKeys {
for _, k := range keys {
d, ok := f.keys[k]
if !ok {
f.keys[k] = &key{blocked: []ids.ID{txID}}
Expand Down Expand Up @@ -216,10 +216,10 @@ func (f *Fetcher) Get(txID ids.ID) (map[string][]byte, error) {
f.l.RLock()
defer f.l.RUnlock()
var (
stateKeys = tx.keys
storage = make(map[string][]byte, len(stateKeys))
keys = tx.keys
storage = make(map[string][]byte, len(keys))
)
for k := range stateKeys {
for _, k := range keys {
if v := f.keys[k].cache; v != nil {
if v.exists {
storage[k] = v.v
Expand Down
28 changes: 12 additions & 16 deletions internal/fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,14 @@ func TestFetchDifferentKeys(t *testing.T) {
wg.Add(numTxs)

for i := 0; i < numTxs; i++ {
stateKeys := make(state.Keys, (i + 1))
keys := make([]string, (i + 1))
for k := 0; k < i+1; k++ {
// Generate different read keys
stateKeys.Add(ids.GenerateTestID().String(), state.Read)
keys = append(keys, ids.GenerateTestID().String())
}
txID := ids.GenerateTestID()
// Since these are all different keys, we will
// fetch each key from disk
require.NoError(f.Fetch(ctx, txID, stateKeys))
require.NoError(f.Fetch(ctx, txID, keys))
go func() {
defer wg.Done()
// Get keys from cache
Expand Down Expand Up @@ -98,15 +97,14 @@ func TestFetchSameKeys(t *testing.T) {
wg.Add(numTxs)

for i := 0; i < numTxs; i++ {
stateKeys := make(state.Keys, (i + 1))
keys := make([]string, (i + 1))
for k := 0; k < i+1; k++ {
// Generate the same keys
stateKeys.Add(keyBase+strconv.Itoa(k), state.Read)
keys = append(keys, keyBase+strconv.Itoa(k))
}
txID := ids.GenerateTestID()
// We are fetching the same keys, so we should
// be getting subsequent requests from cache
require.NoError(f.Fetch(ctx, txID, stateKeys))
require.NoError(f.Fetch(ctx, txID, keys))
go func() {
defer wg.Done()
storage, err := f.Get(txID)
Expand Down Expand Up @@ -136,18 +134,17 @@ func TestFetchSameKeysSlow(t *testing.T) {
)
wg.Add(numTxs)
for i := 0; i < numTxs; i++ {
stateKeys := make(state.Keys, (i + 1))
keys := make([]string, (i + 1))
for k := 0; k < i+1; k++ {
// Generate the same keys
stateKeys.Add(keyBase+strconv.Itoa(k), state.Read)
keys = append(keys, keyBase+strconv.Itoa(k))
}
txID := ids.GenerateTestID()

// Empty chan to mimic timing out
delay := make(chan struct{})

// Fetch the key
require.NoError(f.Fetch(ctx, txID, stateKeys))
require.NoError(f.Fetch(ctx, txID, keys))
go func() {
defer wg.Done()
// Get the keys from cache
Expand Down Expand Up @@ -183,13 +180,12 @@ func TestFetcherStop(t *testing.T) {
)
wg.Add(numTxs)
for i := 0; i < numTxs; i++ {
stateKeys := make(state.Keys, (i + 1))
keys := make([]string, (i + 1))
for k := 0; k < i+1; k++ {
// Generate the same keys
stateKeys.Add(keyBase+strconv.Itoa(k), state.Read)
keys = append(keys, keyBase+strconv.Itoa(k))
}
txID := ids.GenerateTestID()
err := f.Fetch(ctx, txID, stateKeys)
err := f.Fetch(ctx, txID, keys)
if err != nil {
// Some [Fetch] may return an error.
// This happens after we called [Stop]
Expand Down
38 changes: 38 additions & 0 deletions state/dbtest/dbtest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package dbtest

import (
"context"

"github.com/ava-labs/avalanchego/database"
)

type TestDB struct {
storage map[string][]byte
}

func NewTestDB() *TestDB {
return &TestDB{
storage: make(map[string][]byte),
}
}

func (db *TestDB) GetValue(_ context.Context, key []byte) (value []byte, err error) {
val, ok := db.storage[string(key)]
if !ok {
return nil, database.ErrNotFound
}
return val, nil
}

func (db *TestDB) Insert(_ context.Context, key []byte, value []byte) error {
db.storage[string(key)] = value
return nil
}

func (db *TestDB) Remove(_ context.Context, key []byte) error {
delete(db.storage, string(key))
return nil
}
9 changes: 9 additions & 0 deletions state/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ func (k Keys) ChunkSizes() ([]uint16, bool) {
return chunks, true
}

// Strips returns the database keys of k
func (k Keys) Strip() []string {
ks := make([]string, len(k))
for key := range k {
ks = append(ks, key)
}
return ks
}

type keysJSON map[string]Permissions

// MarshalJSON marshals Keys as readable JSON.
Expand Down
Loading
Loading