Skip to content

remove parent context from composite runner #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/composite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func main() {
// Create composite runner
runner, err := composite.NewRunner(
configCallback,
composite.WithContext[*Worker](ctx),
)
if err != nil {
logger.Error("Failed to create composite runner", "error", err)
Expand Down
1 change: 0 additions & 1 deletion examples/composite/reload_membership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ func TestMembershipChangesBasic(t *testing.T) {

runner, err := composite.NewRunner[*TestWorker](
configCallback,
composite.WithContext[*TestWorker](ctx),
composite.WithLogHandler[*TestWorker](logger.Handler()),
)
require.NoError(t, err)
Expand Down
11 changes: 0 additions & 11 deletions runnables/composite/options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package composite

import (
"context"
"log/slog"
)

Expand All @@ -16,13 +15,3 @@ func WithLogHandler[T runnable](handler slog.Handler) Option[T] {
}
}
}

// WithContext sets a custom context for the CompositeRunner instance.
// This allows for more granular control over cancellation and timeouts.
func WithContext[T runnable](ctx context.Context) Option[T] {
return func(c *Runner[T]) {
if ctx != nil {
c.parentCtx, c.parentCancel = context.WithCancel(ctx)
}
}
}
56 changes: 0 additions & 56 deletions runnables/composite/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// Mock runnable implementation for testing
Expand All @@ -17,10 +16,6 @@ func (m *mockRunnable) Run(ctx context.Context) error { return nil }
func (m *mockRunnable) Stop() {}
func (m *mockRunnable) String() string { return "mockRunnable" }

type contextKeyType string

const testContextKey contextKeyType = "testKey"

func TestWithLogHandler(t *testing.T) {
t.Parallel()

Expand All @@ -38,54 +33,3 @@ func TestWithLogHandler(t *testing.T) {
WithLogHandler[*mockRunnable](nil)(runner)
assert.Equal(t, slog.Default(), runner.logger, "Logger should not change with nil handler")
}

func TestWithContext(t *testing.T) {
t.Parallel()

originalCtx := context.Background()
runner := &Runner[*mockRunnable]{
parentCtx: nil,
parentCancel: nil,
}

WithContext[*mockRunnable](originalCtx)(runner)

// Verify the context and cancel function are set
require.NotNil(t, runner.parentCtx, "Context should be set")
require.NotNil(t, runner.parentCancel, "Cancel function should be set")

// Test that the contexts are related (child can access parent values)
originalCtx = context.WithValue(context.Background(), testContextKey, "value")
runner = &Runner[*mockRunnable]{}
WithContext[*mockRunnable](originalCtx)(runner)

assert.Equal(t,
"value", runner.parentCtx.Value(testContextKey),
"Child context should inherit values from parent",
)

// Test with empty context - we should get a new cancellable context
// but still be able to verify it's connected to the background context
runner = &Runner[*mockRunnable]{
parentCtx: nil,
parentCancel: nil,
}
// Use context.Background() instead of nil
WithContext[*mockRunnable](context.Background())(runner)

// Instead of checking for equality, verify:
// 1. The context is not nil
// 2. The cancel function is not nil
// 3. The context is derived from Background() (it will be a cancel context)
require.NotNil(t, runner.parentCtx, "Context should be set with Background()")
require.NotNil(t, runner.parentCancel, "Cancel function should be set with Background()")

// Verify it's a cancel context by calling the cancel function and checking if Done channel closes
runner.parentCancel()
select {
case <-runner.parentCtx.Done():
// This is what we want - the context was canceled
default:
t.Error("Context should be cancellable when created with Background()")
}
}
40 changes: 27 additions & 13 deletions runnables/composite/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package composite

import (
"fmt"
"log/slog"

"github.com/robbyt/go-supervisor/internal/finitestate"
"github.com/robbyt/go-supervisor/supervisor"
Expand Down Expand Up @@ -34,11 +33,13 @@ func (r *Runner[T]) Reload() {
newConfig, err := r.configCallback()
if err != nil {
logger.Error("Failed to get updated config", "error", err)
// TODO: consider removing the setStateError() call here
r.setStateError()
return
}
if newConfig == nil {
logger.Error("Config callback returned nil during reload")
// TODO: consider removing the setStateError() call here
r.setStateError()
return
}
Expand All @@ -55,14 +56,14 @@ func (r *Runner[T]) Reload() {
logger.Debug(
"Membership change detected, stopping all existing runnables before updating membership and config",
)
if err := r.reloadMembershipChanged(newConfig); err != nil {
if err := r.reloadWithRestart(newConfig); err != nil {
logger.Error("Failed to reload runnables due to membership change", "error", err)
r.setStateError()
return
}
logger.Debug("Reloaded runnables due to membership change")
} else {
r.reloadConfig(logger, newConfig)
r.reloadSkipRestart(newConfig)
logger.Debug("Reloaded runnables without membership change")
}

Expand All @@ -74,47 +75,60 @@ func (r *Runner[T]) Reload() {
}
}

// reloadMembershipChanged handles the case where the membership of runnables has changed.
func (r *Runner[T]) reloadMembershipChanged(newConfig *Config[T]) error {
// reloadWithRestart handles the case where the membership of runnables has changed.
func (r *Runner[T]) reloadWithRestart(newConfig *Config[T]) error {
logger := r.logger.WithGroup("reloadWithRestart")
logger.Debug("Reloading runnables due to membership change")
defer logger.Debug("Completed.")

// Stop all existing runnables while we still have the old config
// This acquires the runnables mutex
if err := r.stopRunnables(); err != nil {
if err := r.stopAllRunnables(); err != nil {
return fmt.Errorf("%w: failed to stop existing runnables during membership change", err)
}
// Now update the stored config after stopping old runnables
// Lock the config mutex for writing
logger.Debug("Updating config after stopping existing runnables")
r.configMu.Lock()
r.setConfig(newConfig)
r.configMu.Unlock()

// Start all runnables from the new config
// This acquires the runnables mutex
if err := r.boot(r.runCtx); err != nil {
if err := r.boot(r.ctx); err != nil {
return fmt.Errorf("%w: failed to start new runnables during membership change", err)
}
return nil
}

// reloadConfig handles the case where the membership of runnables has not changed.
func (r *Runner[T]) reloadConfig(logger *slog.Logger, newConfig *Config[T]) {
logger = logger.WithGroup("reloadConfig")
// No membership change, update config and reload existing runnables
// reloadSkipRestart handles the case where the membership of runnables has not changed.
func (r *Runner[T]) reloadSkipRestart(newConfig *Config[T]) {
logger := r.logger.WithGroup("reloadSkipRestart")
logger.Debug("Reloading runnables without membership change")
defer logger.Debug("Completed.")

logger.Debug("Updating config")
r.configMu.Lock()
r.setConfig(newConfig)
r.configMu.Unlock()

logger.Debug("Reloading configs of existing runnables")
// Reload configs of existing runnables
// Runnables mutex not locked as membership is not changing
for _, entry := range newConfig.Entries {
logger := logger.With("runnable", entry.Runnable.String())

if reloadableWithConfig, ok := any(entry.Runnable).(ReloadableWithConfig); ok {
// If the runnable implements our ReloadableWithConfig interface, use that to pass the new config
logger.Debug("Reloading child runnable with config", "runnable", entry.Runnable)
logger.Debug("Reloading child runnable with config")
reloadableWithConfig.ReloadWithConfig(entry.Config)
} else if reloadable, ok := any(entry.Runnable).(supervisor.Reloadable); ok {
// Fall back to standard Reloadable interface, assume the configCallback
// has somehow updated the runnable's internal state
logger.Debug("Reloading child runnable", "runnable", entry.Runnable)
logger.Debug("Reloading child runnable")
reloadable.Reload()
} else {
logger.Warn("Child runnable does not implement Reloadable or ReloadableWithConfig")
}
}
}
Expand Down
Loading