diff --git a/internal/node/advancer/advancer.go b/internal/node/advancer/advancer.go index 5e13cf756..9b9c20d80 100644 --- a/internal/node/advancer/advancer.go +++ b/internal/node/advancer/advancer.go @@ -10,6 +10,7 @@ import ( "log/slog" "time" + "github.com/cartesi/rollups-node/internal/node/advancer/machines" "github.com/cartesi/rollups-node/internal/node/advancer/poller" . "github.com/cartesi/rollups-node/internal/node/model" "github.com/cartesi/rollups-node/internal/nodemachine" @@ -24,19 +25,19 @@ var ( ) type Advancer struct { - machines Machines - repository Repository + machines Machines + repo Repository } // New instantiates a new Advancer. -func New(machines Machines, repository Repository) (*Advancer, error) { +func New(machines Machines, repo Repository) (*Advancer, error) { if machines == nil { return nil, ErrInvalidMachines } - if repository == nil { + if repo == nil { return nil, ErrInvalidRepository } - return &Advancer{machines: machines, repository: repository}, nil + return &Advancer{machines: machines, repo: repo}, nil } // Poller instantiates a new poller.Poller using the Advancer. @@ -49,11 +50,11 @@ func (advancer *Advancer) Poller(pollingInterval time.Duration) (*poller.Poller, // runs them through the cartesi machine, // and updates the repository with the ouputs. func (advancer *Advancer) Step(ctx context.Context) error { - apps := keysFrom(advancer.machines) + apps := advancer.machines.Keys() // Gets the unprocessed inputs (of all apps) from the repository. slog.Info("advancer: getting unprocessed inputs") - inputs, err := advancer.repository.GetUnprocessedInputs(ctx, apps) + inputs, err := advancer.repo.GetUnprocessedInputs(ctx, apps) if err != nil { return err } @@ -69,7 +70,7 @@ func (advancer *Advancer) Step(ctx context.Context) error { // Updates the status of the epochs. for _, app := range apps { - err := advancer.repository.UpdateEpochs(ctx, app) + err := advancer.repo.UpdateEpochs(ctx, app) if err != nil { return err } @@ -81,8 +82,8 @@ func (advancer *Advancer) Step(ctx context.Context) error { // process sequentially processes inputs from the the application. func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*Input) error { // Asserts that the app has an associated machine. - machine, ok := advancer.machines[app] - if !ok { + machine := advancer.machines.GetAdvanceMachine(app) + if machine == nil { panic(fmt.Errorf("%w %s", ErrNoApp, app.String())) } @@ -101,7 +102,7 @@ func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*In } // Stores the result in the database. - err = advancer.repository.StoreAdvanceResult(ctx, input, res) + err = advancer.repo.StoreAdvanceResult(ctx, input, res) if err != nil { return err } @@ -121,22 +122,7 @@ type Repository interface { UpdateEpochs(_ context.Context, app Address) error } -// A map of application addresses to machines. -type Machines = map[Address]Machine - -type Machine interface { - Advance(_ context.Context, input []byte, index uint64) (*nodemachine.AdvanceResult, error) -} - -// ------------------------------------------------------------------------------------------------ - -// keysFrom returns a slice with the keysFrom of a map. -func keysFrom[K comparable, V any](m map[K]V) []K { - keys := make([]K, len(m)) - i := 0 - for k := range m { - keys[i] = k - i++ - } - return keys +type Machines interface { + GetAdvanceMachine(Address) machines.AdvanceMachine + Keys() []Address } diff --git a/internal/node/advancer/machines/machines.go b/internal/node/advancer/machines/machines.go new file mode 100644 index 000000000..a60bd740c --- /dev/null +++ b/internal/node/advancer/machines/machines.go @@ -0,0 +1,167 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package machines + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + "sync" + + "github.com/cartesi/rollups-node/internal/node/config" + "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/internal/nodemachine" + "github.com/cartesi/rollups-node/internal/repository" + "github.com/cartesi/rollups-node/pkg/emulator" + "github.com/cartesi/rollups-node/pkg/rollupsmachine" + "github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine" +) + +type Repository interface { + GetAppData(context.Context) ([]*repository.AppData, error) +} + +type AdvanceMachine interface { + Advance(_ context.Context, input []byte, index uint64) (*nodemachine.AdvanceResult, error) +} + +type InspectMachine interface { + Inspect(_ context.Context, query []byte) (*nodemachine.InspectResult, error) +} + +type Machines struct { + mutex sync.RWMutex + machines map[model.Address]*nodemachine.NodeMachine +} + +func Load(ctx context.Context, config config.NodeConfig, repo Repository) (*Machines, error) { + appData, err := repo.GetAppData(ctx) + if err != nil { + return nil, err + } + + machines := map[model.Address]*nodemachine.NodeMachine{} + + maxConcurrentInspects := config.MaxConcurrentInspects + + serverVerbosity := config.MachineServerVerbosity + machineInc := config.MachineIncCycles + machineMax := config.MachineMaxCycles + machineAdvanceTimeout := config.MachineAdvanceTimeout + machineInspectTimeout := config.MachineInspectTimeout + + for _, appData := range appData { + appAddress := appData.AppAddress + snapshotPath := appData.SnapshotPath + snapshotInputIndex := appData.InputIndex + + address, err := cartesimachine.StartServer(serverVerbosity, 0, os.Stdout, os.Stderr) + if err != nil { + return nil, close(machines) + } + + config := &emulator.MachineRuntimeConfig{} + cartesiMachine, err := cartesimachine.Load(ctx, snapshotPath, address, config) + if err != nil { + err = errors.Join(err, cartesimachine.StopServer(address), close(machines)) + return nil, err + } + + rollupsMachine, err := rollupsmachine.New(ctx, cartesiMachine, machineInc, machineMax) + if err != nil { + err = errors.Join(err, cartesiMachine.Close(ctx), close(machines)) + return nil, err + } + + nodeMachine, err := nodemachine.New(rollupsMachine, + snapshotInputIndex, + machineAdvanceTimeout, + machineInspectTimeout, + maxConcurrentInspects) + if err != nil { + err = errors.Join(err, rollupsMachine.Close(ctx), close(machines)) + return nil, err + } + + machines[appAddress] = nodeMachine + } + + return &Machines{machines: machines}, nil +} + +func (m *Machines) GetAdvanceMachine(app model.Address) AdvanceMachine { + return m.get(app) +} + +func (m *Machines) GetInspectMachine(app model.Address) InspectMachine { + return m.get(app) +} + +func (m *Machines) Set(app model.Address, machine *nodemachine.NodeMachine) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + + if _, ok := m.machines[app]; ok { + return false + } else { + m.machines[app] = machine + return true + } +} + +func (m *Machines) Remove(app model.Address) *nodemachine.NodeMachine { + m.mutex.Lock() + defer m.mutex.Unlock() + + if machine, ok := m.machines[app]; ok { + return nil + } else { + delete(m.machines, app) + return machine + } +} + +func (m *Machines) Keys() []model.Address { + m.mutex.RLock() + defer m.mutex.Unlock() + + keys := make([]model.Address, len(m.machines)) + i := 0 + for k := range m.machines { + keys[i] = k + i++ + } + return keys +} + +func (m *Machines) Close() error { + m.mutex.Lock() + defer m.mutex.Unlock() + + err := close(m.machines) + if err != nil { + slog.Error(fmt.Sprintf("failed to close some machines: %v", err)) + } + return err +} + +// ------------------------------------------------------------------------------------------------ + +func (m *Machines) get(app model.Address) *nodemachine.NodeMachine { + m.mutex.RLock() + defer m.mutex.Unlock() + return m.machines[app] +} + +func close(machines map[model.Address]*nodemachine.NodeMachine) (err error) { + for _, machine := range machines { + err = errors.Join(err, machine.Close()) + } + for app := range machines { + delete(machines, app) + } + return +} diff --git a/internal/node/model/models.go b/internal/node/model/models.go index ab11d1d99..b0bd46c3d 100644 --- a/internal/node/model/models.go +++ b/internal/node/model/models.go @@ -65,6 +65,7 @@ type Application struct { Id uint64 ContractAddress Address TemplateHash Hash + TemplateUri string LastProcessedBlock uint64 Status ApplicationStatus IConsensusAddress Address diff --git a/internal/nodemachine/machine.go b/internal/nodemachine/machine.go index 4abf71da6..396d1a0b0 100644 --- a/internal/nodemachine/machine.go +++ b/internal/nodemachine/machine.go @@ -34,7 +34,7 @@ type AdvanceResult struct { } type InspectResult struct { - InputIndex uint64 + InputIndex *uint64 Accepted bool Reports [][]byte Error error @@ -44,7 +44,8 @@ type NodeMachine struct { inner rollupsmachine.RollupsMachine // Index of the last Input that was processed. - lastInputIndex uint64 + // Can be nil if no inputs were processed. + lastInputIndex *uint64 // How long a call to inner.Advance or inner.Inspect can take. advanceTimeout, inspectTimeout time.Duration @@ -68,7 +69,7 @@ type NodeMachine struct { func New( inner rollupsmachine.RollupsMachine, - inputIndex uint64, + inputIndex *uint64, advanceTimeout time.Duration, inspectTimeout time.Duration, maxConcurrentInspects uint8, @@ -148,14 +149,14 @@ func (machine *NodeMachine) Advance(ctx context.Context, // Replaces the current machine with the fork and updates lastInputIndex. machine.mutex.HLock() machine.inner = fork - machine.lastInputIndex = index + machine.lastInputIndex = &index machine.mutex.Unlock() } else { // Closes the forked machine. err = fork.Close(ctx) // Updates lastInputIndex. machine.mutex.HLock() - machine.lastInputIndex = index + machine.lastInputIndex = &index machine.mutex.Unlock() }