Skip to content

feat: cluster mode #895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Ignore macOS system files
.DS_Store

# Ignore VS Code workspace files
*.code-workspace

# Ignore environment variable files
.env
.env.*
Expand Down
1 change: 1 addition & 0 deletions .trunk/configs/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"dynamicmap",
"dynaport",
"envfiles",
"Errf",
"estree",
"euclidian",
"expfmt",
Expand Down
29 changes: 29 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,29 @@
"${workspaceFolder}/sdk/${input:appLanguage}/examples/${input:exampleApp}/build"
]
},
{
"name": "Debug Modus Runtime (cluster mode)",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/runtime",
"env": {
"FORCE_COLOR": "1",
"MODUS_ENV": "dev",
"MODUS_CLUSTER_MODE": "NATS",
"MODUS_CLUSTER_NATS_URL": "nats://localhost:4222",
"MODUS_DEBUG_ACTORS": "true",
"MODUS_DEBUG": "true",
"MODUS_USE_MODUSDB": "false",
"MODUS_DB": "postgresql://postgres:postgres@localhost:5432/modus?sslmode=disable" // checkov:skip=CKV_SECRET_4
},
"args": [
"--port=${input:httpServerPort}",
"--refresh=1s",
"--appPath",
"${workspaceFolder}/sdk/${input:appLanguage}/examples/${input:exampleApp}/build"
]
},
{
"name": "Debug Modus Runtime (input path)",
"type": "go",
Expand Down Expand Up @@ -225,6 +248,12 @@
"type": "promptString",
"description": "Enter the S3 storage folder name",
"default": "shared"
},
{
"id": "httpServerPort",
"type": "promptString",
"description": "Enter the HTTP server port",
"default": "8686"
}
]
}
2 changes: 1 addition & 1 deletion go.work
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
go 1.24.3
go 1.24.4

use (
./lib/manifest
Expand Down
91 changes: 37 additions & 54 deletions runtime/actors/actorsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,96 +11,61 @@ package actors

import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/hypermodeinc/modus/runtime/db"
"github.com/hypermodeinc/modus/runtime/logger"
"github.com/hypermodeinc/modus/runtime/messages"
"github.com/hypermodeinc/modus/runtime/pluginmanager"
"github.com/hypermodeinc/modus/runtime/plugins"
"github.com/hypermodeinc/modus/runtime/wasmhost"

goakt "github.com/tochemey/goakt/v3/actor"
goakt_static "github.com/tochemey/goakt/v3/discovery/static"
goakt_remote "github.com/tochemey/goakt/v3/remote"
"github.com/travisjeffery/go-dynaport"
)

const actorSystemName = "modus"
const defaultAskTimeout = 10 * time.Second

var _actorSystem goakt.ActorSystem
var _remoting *goakt.Remoting

func Initialize(ctx context.Context) {

opts := []goakt.Option{
goakt.WithLogger(newActorLogger(logger.Get(ctx))),
goakt.WithCoordinatedShutdown(beforeShutdown),
goakt.WithPubSub(),
goakt.WithActorInitTimeout(10 * time.Second), // TODO: adjust this value, or make it configurable
goakt.WithActorInitMaxRetries(1), // TODO: adjust this value, or make it configurable

// for now, keep passivation disabled so that agents can perform long-running tasks without the actor stopping.
// TODO: figure out how to deal with this better
goakt.WithPassivationDisabled(),
}

// NOTE: we're not relying on cluster mode yet. The below code block is for future use and testing purposes only.
if clusterMode, _ := strconv.ParseBool(os.Getenv("MODUS_USE_CLUSTER_MODE")); clusterMode {
// TODO: static discovery should really only be used for local development and testing.
// In production, we should use a more robust discovery mechanism, such as Kubernetes or NATS.
// See https://tochemey.gitbook.io/goakt/features/service-discovery

// We just get three random ports for now.
// In prod, these will need to be configured so they are consistent across all nodes.
ports := dynaport.Get(3)
var gossip_port = ports[0]
var peers_port = ports[1]
var remoting_port = ports[2]

disco := goakt_static.NewDiscovery(&goakt_static.Config{
Hosts: []string{
fmt.Sprintf("localhost:%d", gossip_port),
},
})

opts = append(opts,
goakt.WithRemote(goakt_remote.NewConfig("localhost", remoting_port)),
goakt.WithCluster(goakt.NewClusterConfig().
WithDiscovery(disco).
WithDiscoveryPort(gossip_port).
WithPeersPort(peers_port).
WithKinds(&wasmAgentActor{}, &subscriptionActor{}),
),
)
}
opts = append(opts, clusterOptions(ctx)...)

if actorSystem, err := goakt.NewActorSystem("modus", opts...); err != nil {
if actorSystem, err := goakt.NewActorSystem(actorSystemName, opts...); err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to create actor system.")
} else if err := actorSystem.Start(ctx); err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to start actor system.")
} else {
_actorSystem = actorSystem
}

_remoting = goakt.NewRemoting()

logger.Info(ctx).Msg("Actor system started.")

pluginmanager.RegisterPluginLoadedCallback(loadAgentActors)
}

func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
// restart actors that are already running, giving them the new plugin instance
// restart local actors that are already running, giving them the new plugin instance
actors := _actorSystem.Actors()
runningAgents := make(map[string]bool, len(actors))
for _, pid := range actors {
go func(f_ctx context.Context, f_pid *goakt.PID) {
if actor, ok := f_pid.Actor().(*wasmAgentActor); ok {
runningAgents[actor.agentId] = true
actor.plugin = plugin
if err := f_pid.Restart(f_ctx); err != nil {
logger.Err(f_ctx, err).Msgf("Failed to restart actor for agent %s.", actor.agentId)
}
if a, ok := pid.Actor().(*wasmAgentActor); ok {
runningAgents[a.agentId] = true
a.plugin = plugin
if err := goakt.Tell(ctx, pid, &messages.RestartAgent{}); err != nil {
logger.Err(ctx, err).Str("agent_id", a.agentId).Msg("Failed to send restart agent message to actor.")
}
}(ctx, pid)
}
}

// spawn actors for agents with state in the database, that are not already running
Expand All @@ -115,7 +80,7 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
for _, agent := range agents {
if !runningAgents[agent.Id] {
go func(f_ctx context.Context, agentId string, agentName string) {
if _, err := spawnActorForAgent(host, plugin, agentId, agentName, false); err != nil {
if err := spawnActorForAgent(host, plugin, agentId, agentName, false); err != nil {
logger.Err(f_ctx, err).Msgf("Failed to spawn actor for agent %s.", agentId)
}
}(ctx, agent.Id, agent.Name)
Expand All @@ -125,16 +90,34 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
return nil
}

func beforeShutdown(ctx context.Context) error {
func beforeShutdown(ctx context.Context) {
logger.Info(ctx).Msg("Actor system shutting down...")
return nil

// stop all agent actors before shutdown so they can suspend properly
for _, pid := range _actorSystem.Actors() {
if _, ok := pid.Actor().(*wasmAgentActor); ok {

// pass the pid so it can be used during shutdown as an event sender
ctx := context.WithValue(ctx, pidContextKey{}, pid)
if err := pid.Shutdown(ctx); err != nil {
logger.Err(ctx, err).Msgf("Failed to shutdown actor %s.", pid.Name())
}
}
}
}

func Shutdown(ctx context.Context) {
if _actorSystem == nil {
return
}

beforeShutdown(ctx)

if _remoting != nil {
_remoting.Close()
_remoting = nil
}

if err := _actorSystem.Stop(ctx); err != nil {
logger.Err(ctx, err).Msg("Failed to shutdown actor system.")
}
Expand Down
Loading
Loading