Skip to content

Commit

Permalink
feat: add the machines package
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 committed Aug 30, 2024
1 parent ce710f2 commit 3e3854e
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 28 deletions.
16 changes: 15 additions & 1 deletion docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ When enabled, will connect to postgres database via SSL.
* **Type:** `bool`
* **Default:** `"true"`

## `CARTESI_ADVANCER_POLLING_INTERVAL`

How many seconds the node will wait before querying the database for new inputs.

* **Type:** `Duration`
* **Default:** `"30"`

## `CARTESI_EPOCH_LENGTH`

Length of a rollups epoch in blocks.
Expand All @@ -234,7 +241,7 @@ At the end of each epoch, the node will send claims to the blockchain.

## `CARTESI_EVM_READER_RETRY_POLICY_MAX_DELAY`

How seconds the retry policy will wait between retries.
How many seconds the retry policy will wait between retries.

* **Type:** `Duration`
* **Default:** `"3"`
Expand All @@ -258,3 +265,10 @@ How many seconds the node will wait before trying to finish epochs for all appli
Path to the directory with the cartesi-machine snapshot that will be loaded by the node.

* **Type:** `string`

## `CARTESI_MACHINE_SERVER_VERBOSITY`

TODO.

* **Type:** `string`
* **Default:** `"info"`
231 changes: 231 additions & 0 deletions internal/node/advancer/machines/machines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package machines

import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"sync"
"time"

. "github.com/cartesi/rollups-node/internal/node/model"

nm "github.com/cartesi/rollups-node/internal/nodemachine"
"github.com/cartesi/rollups-node/pkg/emulator"
rm "github.com/cartesi/rollups-node/pkg/rollupsmachine"
cm "github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine"
)

type MachineConfig struct {
AppAddress Address
SnapshotPath string
SnapshotInputIndex *uint64
IncCycles uint64
MaxCycles uint64
AdvanceTimeout time.Duration
InspectTimeout time.Duration
MaxConcurrentInspects uint8
}

type Repository interface {
// GetMachineConfigurations retrieves a machine configuration for each application.
GetMachineConfigurations(context.Context) ([]*MachineConfig, error)

// GetProcessedInputs retrieves the processed inputs of an application with indexes greater or
// equal to the given input index.
GetProcessedInputs(_ context.Context, app Address, index uint64) ([]*Input, error)
}

// AdvanceMachine masks nodemachine.NodeMachine to only expose methods required by the Advancer.
type AdvanceMachine interface {
Advance(_ context.Context, input []byte, index uint64) (*nm.AdvanceResult, error)
}

// InspectMachine masks nodemachine.NodeMachine to only expose methods required by the Inspector.
type InspectMachine interface {
Inspect(_ context.Context, query []byte) (*nm.InspectResult, error)
}

// Machines is a thread-safe type that manages the pool of cartesi machines being used by the node.
// It contains a map of applications to machines.
type Machines struct {
mutex sync.RWMutex
machines map[Address]*nm.NodeMachine
}

func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (*Machines, error) {
configs, err := repo.GetMachineConfigurations(ctx)
if err != nil {
return nil, err
}

machines := map[Address]*nm.NodeMachine{}

for _, config := range configs {
// Starts the server.
address, err := cm.StartServer(verbosity, 0, os.Stdout, os.Stderr)
if err != nil {
return nil, closeMachines(machines)
}

// Creates a CartesiMachine from the snapshot.
runtimeConfig := &emulator.MachineRuntimeConfig{}
cartesiMachine, err := cm.Load(ctx, config.SnapshotPath, address, runtimeConfig)
if err != nil {
err = errors.Join(err, cm.StopServer(address), closeMachines(machines))
return nil, err
}

// Creates a RollupsMachine from the CartesiMachine.
rollupsMachine, err := rm.New(ctx,
cartesiMachine,
config.IncCycles,
config.MaxCycles)
if err != nil {
err = errors.Join(err, cartesiMachine.Close(ctx), closeMachines(machines))
return nil, err
}

// Advances the machine until it catches up with the state of the database.
lastProcessedInputIndex, err := catchUp(ctx, repo,
config.AppAddress,
rollupsMachine,
config.SnapshotInputIndex)
if err != nil {
err = errors.Join(err, rollupsMachine.Close(ctx), closeMachines(machines))
return nil, err
}

// Creates a NodeMachine from the RollupsMachine.
nodeMachine, err := nm.New(rollupsMachine,
lastProcessedInputIndex,
config.AdvanceTimeout,
config.InspectTimeout,
config.MaxConcurrentInspects)
if err != nil {
err = errors.Join(err, rollupsMachine.Close(ctx), closeMachines(machines))
return nil, err
}

machines[config.AppAddress] = nodeMachine
}

return &Machines{machines: machines}, nil
}

// GetAdvanceMachine gets the machine associated with the application from the map.
func (m *Machines) GetAdvanceMachine(app Address) AdvanceMachine {
return m.getMachine(app)
}

// GetInspectMachine gets the machine associated with the application from the map.
func (m *Machines) GetInspectMachine(app Address) InspectMachine {
return m.getMachine(app)
}

// Add maps a new application to a machine.
// It does nothing if the application is already mapped to some machine.
// It returns true if it was able to add the machine and false otherwise.
func (m *Machines) Add(app Address, machine *nm.NodeMachine) bool {
m.mutex.Lock()
defer m.mutex.Unlock()

if _, ok := m.machines[app]; ok {
return false
} else {
m.machines[app] = machine
return true
}
}

// Delete deletes an application from the map.
// It returns the associated machine, if any.
func (m *Machines) Delete(app Address) *nm.NodeMachine {
m.mutex.Lock()
defer m.mutex.Unlock()

if machine, ok := m.machines[app]; ok {
return nil
} else {
delete(m.machines, app)
return machine
}
}

// Apps returns the addresses of the applications for which there are machines.
func (m *Machines) Apps() []Address {
m.mutex.RLock()
defer m.mutex.Unlock()

keys := make([]Address, len(m.machines))
i := 0
for k := range m.machines {
keys[i] = k
i++
}
return keys
}

// Close closes all the machines and erases them from the map.
func (m *Machines) Close() error {
m.mutex.Lock()
defer m.mutex.Unlock()

err := closeMachines(m.machines)
if err != nil {
slog.Error(fmt.Sprintf("failed to close some machines: %v", err))
}
return err
}

// ------------------------------------------------------------------------------------------------

func (m *Machines) getMachine(app Address) *nm.NodeMachine {
m.mutex.RLock()
defer m.mutex.Unlock()
return m.machines[app]
}

func closeMachines(machines map[Address]*nm.NodeMachine) (err error) {
for _, machine := range machines {
err = errors.Join(err, machine.Close())
}
for app := range machines {
delete(machines, app)
}
return
}

func catchUp(ctx context.Context,
repo Repository,
app Address,
machine rm.RollupsMachine,
snapshotInputIndex *uint64,
) (lastProcessedInputIndex *uint64, _ error) {
// A nil index indicates we should start to process from the beginning (index zero).
// A non-nil index indicates we should start to process from the next available index.
index := uint64(0)
if snapshotInputIndex != nil {
index = *snapshotInputIndex + 1
}

inputs, err := repo.GetProcessedInputs(ctx, app, index)
if err != nil {
return nil, err
}

for _, input := range inputs {
accepted, _, _, _, err := machine.Advance(ctx, input.RawData)
_, err = nm.ToInputStatus(accepted, err)
if err != nil {
return nil, err
}
*lastProcessedInputIndex = input.Index
}

return lastProcessedInputIndex, nil
}
8 changes: 8 additions & 0 deletions internal/node/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package config
import (
"fmt"
"os"

"github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine"
)

// NodeConfig contains all the Node variables.
Expand Down Expand Up @@ -39,7 +41,10 @@ type NodeConfig struct {
ExperimentalSunodoValidatorEnabled bool
ExperimentalSunodoValidatorRedisEndpoint string
Auth Auth
AdvancerPollingInterval Duration
ValidatorPollingInterval Duration
// Temporary
MachineServerVerbosity cartesimachine.ServerVerbosity
}

// Auth is used to sign transactions.
Expand Down Expand Up @@ -106,7 +111,10 @@ func FromEnv() NodeConfig {
if getFeatureClaimerEnabled() && !getExperimentalSunodoValidatorEnabled() {
config.Auth = authFromEnv()
}
config.AdvancerPollingInterval = getAdvancerPollingInterval()
config.ValidatorPollingInterval = getValidatorPollingInterval()
// Temporary.
config.MachineServerVerbosity = cartesimachine.ServerVerbosity(getMachineServerVerbosity())
return config
}

Expand Down
18 changes: 17 additions & 1 deletion internal/node/config/generate/Config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ How many times some functions should be retried after an error."""
default = "3"
go-type = "Duration"
description = """
How seconds the retry policy will wait between retries."""
How many seconds the retry policy will wait between retries."""

[rollups.CARTESI_ADVANCER_POLLING_INTERVAL]
default = "30"
go-type = "Duration"
description = """
How many seconds the node will wait before querying the database for new inputs."""

[rollups.CARTESI_VALIDATOR_POLLING_INTERVAL]
default = "30"
Expand Down Expand Up @@ -250,3 +256,13 @@ go-type = "bool"
description = """
When enabled, prints server-manager output to stdout and stderr directly.
All other log configurations are ignored."""

#
# Temporary
#

[temp.CARTESI_MACHINE_SERVER_VERBOSITY]
default = "info"
go-type = "string"
description = """
TODO."""
12 changes: 12 additions & 0 deletions internal/node/config/generate/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ func ToInt64FromString(s string) (int64, error) {
return strconv.ParseInt(s, 10, 64)
}
func ToUint8FromString(s string) (uint8, error) {
value, err := strconv.ParseUint(s, 10, 8)
return uint8(value), err
}
func ToUint32FromString(s string) (uint32, error) {
value, err := strconv.ParseUint(s, 10, 32)
return uint32(value), err
}
func ToUint64FromString(s string) (uint64, error) {
value, err := strconv.ParseUint(s, 10, 64)
return value, err
Expand Down Expand Up @@ -164,6 +174,8 @@ var (
toBool = strconv.ParseBool
toInt = strconv.Atoi
toInt64 = ToInt64FromString
toUint8 = ToUint8FromString
toUint32 = ToUint32FromString
toUint64 = ToUint64FromString
toString = ToStringFromString
toDuration = ToDurationFromSeconds
Expand Down
Loading

0 comments on commit 3e3854e

Please sign in to comment.