Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add the machines package #584

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ When enabled, will connect to postgres database via SSL.
* **Type:** `bool`
* **Default:** `"true"`

## `CARTESI_ADVANCER_POLLING_INTERVAL`

How many seconds the node will wait before querying the database for new inputs.

* **Type:** `Duration`
* **Default:** `"30"`

## `CARTESI_EPOCH_LENGTH`

Length of a rollups epoch in blocks.
Expand All @@ -234,7 +241,7 @@ At the end of each epoch, the node will send claims to the blockchain.

## `CARTESI_EVM_READER_RETRY_POLICY_MAX_DELAY`

How seconds the retry policy will wait between retries.
How many seconds the retry policy will wait between retries.

* **Type:** `Duration`
* **Default:** `"3"`
Expand All @@ -258,3 +265,10 @@ How many seconds the node will wait before trying to finish epochs for all appli
Path to the directory with the cartesi-machine snapshot that will be loaded by the node.

* **Type:** `string`

## `CARTESI_MACHINE_SERVER_VERBOSITY`

TODO.
vfusco marked this conversation as resolved.
Show resolved Hide resolved

* **Type:** `string`
* **Default:** `"info"`
245 changes: 245 additions & 0 deletions internal/node/advancer/machines/machines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package machines

import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"sync"
"time"

. "github.com/cartesi/rollups-node/internal/node/model"

nm "github.com/cartesi/rollups-node/internal/nodemachine"
"github.com/cartesi/rollups-node/pkg/emulator"
rm "github.com/cartesi/rollups-node/pkg/rollupsmachine"
cm "github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine"
)

type MachineConfig struct {
AppAddress Address
SnapshotPath string
SnapshotInputIndex *uint64
IncCycles uint64
MaxCycles uint64
AdvanceTimeout time.Duration
InspectTimeout time.Duration
MaxConcurrentInspects uint8
}

type Repository interface {
// GetMachineConfigurations retrieves a machine configuration for each application.
GetMachineConfigurations(context.Context) ([]*MachineConfig, error)

// GetProcessedInputs retrieves the processed inputs of an application with indexes greater or
// equal to the given input index.
GetProcessedInputs(_ context.Context, app Address, index uint64) ([]*Input, error)
}

// AdvanceMachine masks nodemachine.NodeMachine to only expose methods required by the Advancer.
type AdvanceMachine interface {
Advance(_ context.Context, input []byte, index uint64) (*nm.AdvanceResult, error)
}

// InspectMachine masks nodemachine.NodeMachine to only expose methods required by the Inspector.
type InspectMachine interface {
Inspect(_ context.Context, query []byte) (*nm.InspectResult, error)
}

// Machines is a thread-safe type that manages the pool of cartesi machines being used by the node.
// It contains a map of applications to machines.
type Machines struct {
mutex sync.RWMutex
machines map[Address]*nm.NodeMachine
}

// Load initializes the cartesi machines.
// Load advances a machine to the last processed input stored in the database.
//
// Load does not fail when one of those machines fail to initialize.
// It stores the error to be returned later and continues to initialize the other machines.
func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (*Machines, error) {
configs, err := repo.GetMachineConfigurations(ctx)
if err != nil {
return nil, err
}

machines := map[Address]*nm.NodeMachine{}
var errs error

for _, config := range configs {
// Creates the machine.
machine, err := createMachine(ctx, verbosity, config)
if err != nil {
err = fmt.Errorf("failed to create machine from snapshot (%v): %w", config, err)
errs = errors.Join(errs, err)
continue
}

// Advances the machine until it catches up with the state of the database (if necessary).
err = catchUp(ctx, repo, config.AppAddress, machine, config.SnapshotInputIndex)
if err != nil {
err = fmt.Errorf("failed to advance cartesi machine (%v): %w", config, err)
errs = errors.Join(errs, err, machine.Close())
continue
}

machines[config.AppAddress] = machine
}

return &Machines{machines: machines}, errs
}

// GetAdvanceMachine gets the machine associated with the application from the map.
func (m *Machines) GetAdvanceMachine(app Address) AdvanceMachine {
return m.getMachine(app)
}

// GetInspectMachine gets the machine associated with the application from the map.
func (m *Machines) GetInspectMachine(app Address) InspectMachine {
return m.getMachine(app)
}

// Add maps a new application to a machine.
// It does nothing if the application is already mapped to some machine.
// It returns true if it was able to add the machine and false otherwise.
func (m *Machines) Add(app Address, machine *nm.NodeMachine) bool {
m.mutex.Lock()
defer m.mutex.Unlock()

if _, ok := m.machines[app]; ok {
return false
} else {
m.machines[app] = machine
return true
}
}

// Delete deletes an application from the map.
// It returns the associated machine, if any.
func (m *Machines) Delete(app Address) *nm.NodeMachine {
m.mutex.Lock()
defer m.mutex.Unlock()

if machine, ok := m.machines[app]; ok {
return nil
} else {
delete(m.machines, app)
return machine
}
}

// Apps returns the addresses of the applications for which there are machines.
func (m *Machines) Apps() []Address {
m.mutex.RLock()
defer m.mutex.Unlock()

keys := make([]Address, len(m.machines))
i := 0
for k := range m.machines {
keys[i] = k
i++
}
return keys
}

// Close closes all the machines and erases them from the map.
func (m *Machines) Close() error {
m.mutex.Lock()
defer m.mutex.Unlock()

err := closeMachines(m.machines)
if err != nil {
slog.Error(fmt.Sprintf("failed to close some machines: %v", err))
}
return err
}

// ------------------------------------------------------------------------------------------------

func (m *Machines) getMachine(app Address) *nm.NodeMachine {
m.mutex.RLock()
defer m.mutex.Unlock()
return m.machines[app]
}

func closeMachines(machines map[Address]*nm.NodeMachine) (err error) {
for _, machine := range machines {
err = errors.Join(err, machine.Close())
}
for app := range machines {
delete(machines, app)
}
return
}

func createMachine(ctx context.Context,
verbosity cm.ServerVerbosity,
config *MachineConfig,
) (*nm.NodeMachine, error) {
// Starts the server.
address, err := cm.StartServer(verbosity, 0, os.Stdout, os.Stderr)
if err != nil {
return nil, err
}

// Creates a CartesiMachine from the snapshot.
runtimeConfig := &emulator.MachineRuntimeConfig{}
cartesiMachine, err := cm.Load(ctx, config.SnapshotPath, address, runtimeConfig)
if err != nil {
return nil, errors.Join(err, cm.StopServer(address))
}

// Creates a RollupsMachine from the CartesiMachine.
rollupsMachine, err := rm.New(ctx,
cartesiMachine,
config.IncCycles,
config.MaxCycles)
if err != nil {
return nil, errors.Join(err, cartesiMachine.Close(ctx))
}

// Creates a NodeMachine from the RollupsMachine.
nodeMachine, err := nm.New(rollupsMachine,
config.SnapshotInputIndex,
config.AdvanceTimeout,
config.InspectTimeout,
config.MaxConcurrentInspects)
if err != nil {
return nil, errors.Join(err, rollupsMachine.Close(ctx))
}

return nodeMachine, err
}

func catchUp(ctx context.Context,
repo Repository,
app Address,
machine *nm.NodeMachine,
snapshotInputIndex *uint64,
) error {
// A nil index indicates we should start to process inputs from the beginning (index zero).
// A non-nil index indicates we should start to process inputs from the next available index.
firstInputIndexToProcess := uint64(0)
if snapshotInputIndex != nil {
firstInputIndexToProcess = *snapshotInputIndex + 1
}

inputs, err := repo.GetProcessedInputs(ctx, app, firstInputIndexToProcess)
if err != nil {
return err
}

for _, input := range inputs {
_, err := machine.Advance(ctx, input.RawData, input.Index)
if err != nil {
return err
}
}

return nil
}
8 changes: 8 additions & 0 deletions internal/node/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package config
import (
"fmt"
"os"

"github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine"
)

// NodeConfig contains all the Node variables.
Expand Down Expand Up @@ -39,7 +41,10 @@ type NodeConfig struct {
ExperimentalSunodoValidatorEnabled bool
ExperimentalSunodoValidatorRedisEndpoint string
Auth Auth
AdvancerPollingInterval Duration
ValidatorPollingInterval Duration
// Temporary
MachineServerVerbosity cartesimachine.ServerVerbosity
}

// Auth is used to sign transactions.
Expand Down Expand Up @@ -106,7 +111,10 @@ func FromEnv() NodeConfig {
if getFeatureClaimerEnabled() && !getExperimentalSunodoValidatorEnabled() {
config.Auth = authFromEnv()
}
config.AdvancerPollingInterval = getAdvancerPollingInterval()
config.ValidatorPollingInterval = getValidatorPollingInterval()
// Temporary.
config.MachineServerVerbosity = cartesimachine.ServerVerbosity(getMachineServerVerbosity())
return config
}

Expand Down
18 changes: 17 additions & 1 deletion internal/node/config/generate/Config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ How many times some functions should be retried after an error."""
default = "3"
go-type = "Duration"
description = """
How seconds the retry policy will wait between retries."""
How many seconds the retry policy will wait between retries."""

[rollups.CARTESI_ADVANCER_POLLING_INTERVAL]
default = "30"
go-type = "Duration"
description = """
How many seconds the node will wait before querying the database for new inputs."""

[rollups.CARTESI_VALIDATOR_POLLING_INTERVAL]
default = "30"
Expand Down Expand Up @@ -250,3 +256,13 @@ go-type = "bool"
description = """
When enabled, prints server-manager output to stdout and stderr directly.
All other log configurations are ignored."""

#
# Temporary
#

[temp.CARTESI_MACHINE_SERVER_VERBOSITY]
default = "info"
go-type = "string"
description = """
TODO."""
vfusco marked this conversation as resolved.
Show resolved Hide resolved
24 changes: 24 additions & 0 deletions internal/node/config/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/node/model/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Application struct {
Id uint64
ContractAddress Address
TemplateHash Hash
TemplateUri string
LastProcessedBlock uint64
Status ApplicationStatus
IConsensusAddress Address
Expand Down
Loading
Loading