Skip to content

Commit

Permalink
whisk away
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 committed Aug 30, 2024
1 parent d4b9e3b commit cbee884
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 34 deletions.
44 changes: 15 additions & 29 deletions internal/node/advancer/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"log/slog"
"time"

"github.com/cartesi/rollups-node/internal/node/advancer/machines"
"github.com/cartesi/rollups-node/internal/node/advancer/poller"
. "github.com/cartesi/rollups-node/internal/node/model"
"github.com/cartesi/rollups-node/internal/nodemachine"
Expand All @@ -24,19 +25,19 @@ var (
)

type Advancer struct {
machines Machines
repository Repository
machines Machines
repo Repository
}

// New instantiates a new Advancer.
func New(machines Machines, repository Repository) (*Advancer, error) {
func New(machines Machines, repo Repository) (*Advancer, error) {
if machines == nil {
return nil, ErrInvalidMachines
}
if repository == nil {
if repo == nil {
return nil, ErrInvalidRepository
}
return &Advancer{machines: machines, repository: repository}, nil
return &Advancer{machines: machines, repo: repo}, nil
}

// Poller instantiates a new poller.Poller using the Advancer.
Expand All @@ -49,11 +50,11 @@ func (advancer *Advancer) Poller(pollingInterval time.Duration) (*poller.Poller,
// runs them through the cartesi machine,
// and updates the repository with the ouputs.
func (advancer *Advancer) Step(ctx context.Context) error {
apps := keysFrom(advancer.machines)
apps := advancer.machines.Keys()

// Gets the unprocessed inputs (of all apps) from the repository.
slog.Info("advancer: getting unprocessed inputs")
inputs, err := advancer.repository.GetUnprocessedInputs(ctx, apps)
inputs, err := advancer.repo.GetUnprocessedInputs(ctx, apps)
if err != nil {
return err
}
Expand All @@ -69,7 +70,7 @@ func (advancer *Advancer) Step(ctx context.Context) error {

// Updates the status of the epochs.
for _, app := range apps {
err := advancer.repository.UpdateEpochs(ctx, app)
err := advancer.repo.UpdateEpochs(ctx, app)
if err != nil {
return err
}
Expand All @@ -81,8 +82,8 @@ func (advancer *Advancer) Step(ctx context.Context) error {
// process sequentially processes inputs from the the application.
func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*Input) error {
// Asserts that the app has an associated machine.
machine, ok := advancer.machines[app]
if !ok {
machine := advancer.machines.GetAdvanceMachine(app)
if machine == nil {
panic(fmt.Errorf("%w %s", ErrNoApp, app.String()))
}

Expand All @@ -101,7 +102,7 @@ func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*In
}

// Stores the result in the database.
err = advancer.repository.StoreAdvanceResult(ctx, input, res)
err = advancer.repo.StoreAdvanceResult(ctx, input, res)
if err != nil {
return err
}
Expand All @@ -121,22 +122,7 @@ type Repository interface {
UpdateEpochs(_ context.Context, app Address) error
}

// A map of application addresses to machines.
type Machines = map[Address]Machine

type Machine interface {
Advance(_ context.Context, input []byte, index uint64) (*nodemachine.AdvanceResult, error)
}

// ------------------------------------------------------------------------------------------------

// keysFrom returns a slice with the keysFrom of a map.
func keysFrom[K comparable, V any](m map[K]V) []K {
keys := make([]K, len(m))
i := 0
for k := range m {
keys[i] = k
i++
}
return keys
type Machines interface {
GetAdvanceMachine(Address) machines.AdvanceMachine
Keys() []Address
}
167 changes: 167 additions & 0 deletions internal/node/advancer/machines/machines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package machines

import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"sync"

"github.com/cartesi/rollups-node/internal/node/config"
"github.com/cartesi/rollups-node/internal/node/model"
"github.com/cartesi/rollups-node/internal/nodemachine"
"github.com/cartesi/rollups-node/internal/repository"
"github.com/cartesi/rollups-node/pkg/emulator"
"github.com/cartesi/rollups-node/pkg/rollupsmachine"
"github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine"
)

type Repository interface {
GetAppData(context.Context) ([]*repository.AppData, error)
}

type AdvanceMachine interface {
Advance(_ context.Context, input []byte, index uint64) (*nodemachine.AdvanceResult, error)
}

type InspectMachine interface {
Inspect(_ context.Context, query []byte) (*nodemachine.InspectResult, error)
}

type Machines struct {
mutex sync.RWMutex
machines map[model.Address]*nodemachine.NodeMachine
}

func Load(ctx context.Context, config config.NodeConfig, repo Repository) (*Machines, error) {
appData, err := repo.GetAppData(ctx)
if err != nil {
return nil, err
}

machines := map[model.Address]*nodemachine.NodeMachine{}

maxConcurrentInspects := config.MaxConcurrentInspects

serverVerbosity := config.MachineServerVerbosity
machineInc := config.MachineIncCycles
machineMax := config.MachineMaxCycles
machineAdvanceTimeout := config.MachineAdvanceTimeout
machineInspectTimeout := config.MachineInspectTimeout

for _, appData := range appData {
appAddress := appData.AppAddress
snapshotPath := appData.SnapshotPath
snapshotInputIndex := appData.InputIndex

address, err := cartesimachine.StartServer(serverVerbosity, 0, os.Stdout, os.Stderr)
if err != nil {
return nil, close(machines)
}

config := &emulator.MachineRuntimeConfig{}
cartesiMachine, err := cartesimachine.Load(ctx, snapshotPath, address, config)
if err != nil {
err = errors.Join(err, cartesimachine.StopServer(address), close(machines))
return nil, err
}

rollupsMachine, err := rollupsmachine.New(ctx, cartesiMachine, machineInc, machineMax)
if err != nil {
err = errors.Join(err, cartesiMachine.Close(ctx), close(machines))
return nil, err
}

nodeMachine, err := nodemachine.New(rollupsMachine,
snapshotInputIndex,
machineAdvanceTimeout,
machineInspectTimeout,
maxConcurrentInspects)
if err != nil {
err = errors.Join(err, rollupsMachine.Close(ctx), close(machines))
return nil, err
}

machines[appAddress] = nodeMachine
}

return &Machines{machines: machines}, nil
}

func (m *Machines) GetAdvanceMachine(app model.Address) AdvanceMachine {
return m.get(app)
}

func (m *Machines) GetInspectMachine(app model.Address) InspectMachine {
return m.get(app)
}

func (m *Machines) Set(app model.Address, machine *nodemachine.NodeMachine) bool {
m.mutex.Lock()
defer m.mutex.Unlock()

if _, ok := m.machines[app]; ok {
return false
} else {
m.machines[app] = machine
return true
}
}

func (m *Machines) Remove(app model.Address) *nodemachine.NodeMachine {
m.mutex.Lock()
defer m.mutex.Unlock()

if machine, ok := m.machines[app]; ok {
return nil
} else {
delete(m.machines, app)
return machine
}
}

func (m *Machines) Keys() []model.Address {
m.mutex.RLock()
defer m.mutex.Unlock()

keys := make([]model.Address, len(m.machines))
i := 0
for k := range m.machines {
keys[i] = k
i++
}
return keys
}

func (m *Machines) Close() error {
m.mutex.Lock()
defer m.mutex.Unlock()

err := close(m.machines)
if err != nil {
slog.Error(fmt.Sprintf("failed to close some machines: %v", err))
}
return err
}

// ------------------------------------------------------------------------------------------------

func (m *Machines) get(app model.Address) *nodemachine.NodeMachine {
m.mutex.RLock()
defer m.mutex.Unlock()
return m.machines[app]
}

func close(machines map[model.Address]*nodemachine.NodeMachine) (err error) {
for _, machine := range machines {
err = errors.Join(err, machine.Close())
}
for app := range machines {
delete(machines, app)
}
return
}
1 change: 1 addition & 0 deletions internal/node/model/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Application struct {
Id uint64
ContractAddress Address
TemplateHash Hash
TemplateUri string
LastProcessedBlock uint64
Status ApplicationStatus
IConsensusAddress Address
Expand Down
11 changes: 6 additions & 5 deletions internal/nodemachine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type AdvanceResult struct {
}

type InspectResult struct {
InputIndex uint64
InputIndex *uint64
Accepted bool
Reports [][]byte
Error error
Expand All @@ -44,7 +44,8 @@ type NodeMachine struct {
inner rollupsmachine.RollupsMachine

// Index of the last Input that was processed.
lastInputIndex uint64
// Can be nil if no inputs were processed.
lastInputIndex *uint64

// How long a call to inner.Advance or inner.Inspect can take.
advanceTimeout, inspectTimeout time.Duration
Expand All @@ -68,7 +69,7 @@ type NodeMachine struct {

func New(
inner rollupsmachine.RollupsMachine,
inputIndex uint64,
inputIndex *uint64,
advanceTimeout time.Duration,
inspectTimeout time.Duration,
maxConcurrentInspects uint8,
Expand Down Expand Up @@ -148,14 +149,14 @@ func (machine *NodeMachine) Advance(ctx context.Context,
// Replaces the current machine with the fork and updates lastInputIndex.
machine.mutex.HLock()
machine.inner = fork
machine.lastInputIndex = index
machine.lastInputIndex = &index
machine.mutex.Unlock()
} else {
// Closes the forked machine.
err = fork.Close(ctx)
// Updates lastInputIndex.
machine.mutex.HLock()
machine.lastInputIndex = index
machine.lastInputIndex = &index
machine.mutex.Unlock()
}

Expand Down

0 comments on commit cbee884

Please sign in to comment.