Skip to content

Commit

Permalink
feat: add the standalone advancer command
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 committed Sep 2, 2024
1 parent 026621d commit 7b4c731
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 65 deletions.
93 changes: 93 additions & 0 deletions cmd/cartesi-rollups-machines/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package main

import (
"context"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"time"

"github.com/cartesi/rollups-node/internal/node/advancer"
"github.com/cartesi/rollups-node/internal/node/advancer/machines"
"github.com/cartesi/rollups-node/internal/node/config"
"github.com/cartesi/rollups-node/internal/node/startup"
"github.com/cartesi/rollups-node/internal/repository"

"github.com/spf13/cobra"
)

const CMD_NAME = "advancer"

var (
buildVersion = "devel"
Cmd = &cobra.Command{
Use: CMD_NAME,
Short: "Runs the Advancer",
Long: "Runs the Advancer in standalone mode",
Run: run,
}
)

func main() {
err := Cmd.Execute()
if err != nil {
os.Exit(1)
}
}

func getDatabase(ctx context.Context, c config.NodeConfig) (*repository.Database, error) {
err := startup.ValidateSchema(c)
if err != nil {
return nil, fmt.Errorf("invalid database schema: %w", err)
}

database, err := repository.Connect(ctx, c.PostgresEndpoint.Value)
if err != nil {
return nil, fmt.Errorf("failed to connect to the database: %w", err)
}

return database, nil
}

func run(cmd *cobra.Command, args []string) {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

c := config.FromEnv()
startup.ConfigLogs(c)

slog.Info("Starting the Cartesi Rollups Node Advancer", "version", buildVersion, "config", c)

database, err := getDatabase(ctx, c)
check(err, "failed to connect to the database")
defer database.Close()

repo := &repository.MachineRepository{Database: database}

machines, err := machines.Load(ctx, repo, c.MachineServerVerbosity)
check(err, "failed to load the machines")
defer machines.Close()

advancer, err := advancer.New(machines, repo)
check(err, "failed to create the advancer")

poller, err := advancer.Poller(5 * time.Second) //nolint: mnd
check(err, "failed to create the advancer service")

ready := make(chan struct{}, 1)

err = poller.Start(ctx, ready)
check(err, "failed to start the advancer service")
}

func check(err error, s string) {
if err != nil {
slog.Error(s, "error", err)
os.Exit(1)
}
}
42 changes: 18 additions & 24 deletions internal/node/advancer/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"log/slog"
"time"

"github.com/cartesi/rollups-node/internal/node/advancer/machines"
"github.com/cartesi/rollups-node/internal/node/advancer/poller"
. "github.com/cartesi/rollups-node/internal/node/model"
"github.com/cartesi/rollups-node/internal/nodemachine"
Expand Down Expand Up @@ -49,7 +50,7 @@ func (advancer *Advancer) Poller(pollingInterval time.Duration) (*poller.Poller,
// runs them through the cartesi machine,
// and updates the repository with the ouputs.
func (advancer *Advancer) Step(ctx context.Context) error {
apps := keysFrom(advancer.machines)
apps := advancer.machines.Apps()

// Gets the unprocessed inputs (of all apps) from the repository.
slog.Info("advancer: getting unprocessed inputs")
Expand All @@ -67,14 +68,22 @@ func (advancer *Advancer) Step(ctx context.Context) error {
}
}

// Updates the status of the epochs.
for _, app := range apps {
err := advancer.repository.UpdateEpochs(ctx, app)
if err != nil {
return err
}
}

return nil
}

// process sequentially processes inputs from the the application.
func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*Input) error {
// Asserts that the app has an associated machine.
machine, ok := advancer.machines[app]
if !ok {
machine := advancer.machines.GetAdvanceMachine(app)
if machine == nil {
panic(fmt.Errorf("%w %s", ErrNoApp, app.String()))
}

Expand All @@ -99,11 +108,7 @@ func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*In
}
}

// Updates the status of the epochs based on the last processed input.
lastInput := inputs[len(inputs)-1]
err := advancer.repository.UpdateEpochs(ctx, app, lastInput)

return err
return nil
}

// ------------------------------------------------------------------------------------------------
Expand All @@ -114,25 +119,14 @@ type Repository interface {

StoreAdvanceResult(context.Context, *Input, *nodemachine.AdvanceResult) error

UpdateEpochs(_ context.Context, app Address, lastInput *Input) error
UpdateEpochs(_ context.Context, app Address) error
}

// A map of application addresses to machines.
type Machines = map[Address]Machine
type Machines interface {
GetAdvanceMachine(app Address) machines.AdvanceMachine
Apps() []Address
}

type Machine interface {
Advance(_ context.Context, input []byte, index uint64) (*nodemachine.AdvanceResult, error)
}

// ------------------------------------------------------------------------------------------------

// keysFrom returns a slice with the keysFrom of a map.
func keysFrom[K comparable, V any](m map[K]V) []K {
keys := make([]K, len(m))
i := 0
for k := range m {
keys[i] = k
i++
}
return keys
}
74 changes: 36 additions & 38 deletions internal/node/advancer/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
mrand "math/rand"
"testing"

"github.com/cartesi/rollups-node/internal/node/advancer/machines"
. "github.com/cartesi/rollups-node/internal/node/model"
"github.com/cartesi/rollups-node/internal/nodemachine"

Expand All @@ -27,7 +28,8 @@ type AdvancerSuite struct{ suite.Suite }
func (s *AdvancerSuite) TestNew() {
s.Run("Ok", func() {
require := s.Require()
var machines map[Address]Machine = Machines{randomAddress(): &MockMachine{}}
machines := newMockMachines()
machines.Map[randomAddress()] = &MockMachine{}
var repository Repository = &MockRepository{}
advancer, err := New(machines, repository)
require.NotNil(advancer)
Expand All @@ -36,7 +38,7 @@ func (s *AdvancerSuite) TestNew() {

s.Run("InvalidMachines", func() {
require := s.Require()
var machines map[Address]Machine = nil
var machines Machines = nil
var repository Repository = &MockRepository{}
advancer, err := New(machines, repository)
require.Nil(advancer)
Expand All @@ -46,7 +48,8 @@ func (s *AdvancerSuite) TestNew() {

s.Run("InvalidRepository", func() {
require := s.Require()
var machines map[Address]Machine = Machines{randomAddress(): &MockMachine{}}
machines := newMockMachines()
machines.Map[randomAddress()] = &MockMachine{}
var repository Repository = nil
advancer, err := New(machines, repository)
require.Nil(advancer)
Expand All @@ -63,11 +66,11 @@ func (s *AdvancerSuite) TestRun() {
s.Run("Ok", func() {
require := s.Require()

machines := Machines{}
machines := newMockMachines()
app1 := randomAddress()
machines[app1] = &MockMachine{}
machines.Map[app1] = &MockMachine{}
app2 := randomAddress()
machines[app2] = &MockMachine{}
machines.Map[app2] = &MockMachine{}
res1 := randomAdvanceResult()
res2 := randomAdvanceResult()
res3 := randomAdvanceResult()
Expand All @@ -94,14 +97,18 @@ func (s *AdvancerSuite) TestRun() {
require.Len(repository.StoredResults, 3)
})

s.Run("Error/UpdateEpochs", func() {
s.T().Skip("TODO")
})

// NOTE: missing more test cases
}

func (s *AdvancerSuite) TestProcess() {
setup := func() (Machines, *MockRepository, *Advancer, Address) {
app := randomAddress()
machines := Machines{}
machines[app] = &MockMachine{}
machines := newMockMachines()
machines.Map[app] = &MockMachine{}
repository := &MockRepository{}
advancer := &Advancer{machines, repository}
return machines, repository, advancer, app
Expand All @@ -124,7 +131,6 @@ func (s *AdvancerSuite) TestProcess() {
err := advancer.process(context.Background(), app, inputs)
require.Nil(err)
require.Len(repository.StoredResults, 7)
require.Equal(*inputs[6], repository.LastInput)
})

s.Run("Panic", func() {
Expand Down Expand Up @@ -183,29 +189,7 @@ func (s *AdvancerSuite) TestProcess() {
require.Errorf(err, "store-advance error")
require.Len(repository.StoredResults, 1)
})

s.Run("UpdateEpochs", func() {
require := s.Require()

_, repository, advancer, app := setup()
inputs := []*Input{
{Id: 1, RawData: marshal(randomAdvanceResult())},
{Id: 2, RawData: marshal(randomAdvanceResult())},
{Id: 3, RawData: marshal(randomAdvanceResult())},
{Id: 4, RawData: marshal(randomAdvanceResult())},
}
repository.UpdateEpochsError = errors.New("update-epochs error")

err := advancer.process(context.Background(), app, inputs)
require.Errorf(err, "update-epochs error")
require.Len(repository.StoredResults, 4)
})
})

}

func (s *AdvancerSuite) TestKeysFrom() {
s.T().Skip("TODO")
}

// ------------------------------------------------------------------------------------------------
Expand All @@ -227,14 +211,33 @@ func (mock *MockMachine) Advance(

// ------------------------------------------------------------------------------------------------

type MachinesMock struct {
Map map[Address]machines.AdvanceMachine
}

func newMockMachines() *MachinesMock {
return &MachinesMock{
Map: map[Address]machines.AdvanceMachine{},
}
}

func (mock *MachinesMock) GetAdvanceMachine(app Address) machines.AdvanceMachine {
return mock.Map[app]
}

func (mock *MachinesMock) Apps() []Address {
return []Address{}
}

// ------------------------------------------------------------------------------------------------

type MockRepository struct {
GetInputsReturn map[Address][]*Input
GetInputsError error
StoreAdvanceError error
UpdateEpochsError error

StoredResults []*nodemachine.AdvanceResult
LastInput Input
}

func (mock *MockRepository) GetUnprocessedInputs(
Expand All @@ -253,12 +256,7 @@ func (mock *MockRepository) StoreAdvanceResult(
return mock.StoreAdvanceError
}

func (mock *MockRepository) UpdateEpochs(
_ context.Context,
_ Address,
lastInput *Input,
) error {
mock.LastInput = *lastInput
func (mock *MockRepository) UpdateEpochs(_ context.Context, _ Address) error {
return mock.UpdateEpochsError
}

Expand Down
5 changes: 2 additions & 3 deletions internal/repository/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func (repo *MachineRepository) GetMachineConfigurations(
return nil, fmt.Errorf("%w (failed querying applications): %w", ErrAdvancerRepository, err)
}

// TODO: missing machine config fields
res := []*machines.MachineConfig{}
var row machines.MachineConfig

Expand Down Expand Up @@ -76,14 +75,14 @@ func (repo *MachineRepository) GetProcessedInputs(
app Address,
index uint64,
) ([]*Input, error) {
query := fmt.Sprintf(`
query := `
SELECT id, index, status, raw_data
FROM input
WHERE application_address = @applicationAddress
AND index >= @index
AND status != 'NONE'
ORDER BY index ASC
`)
`
args := pgx.NamedArgs{
"applicationAddress": app,
"index": index,
Expand Down

0 comments on commit 7b4c731

Please sign in to comment.