Skip to content

Commit

Permalink
revert: "fix: reduce spammy logs for pings" (#3243)
Browse files Browse the repository at this point in the history
Reverts #3150

This broke runners. They failed to start with
```
{"level":"warn","attributes":{"deployment":"dpl-alice-4p86gzxx2zr5soc0","module":"alice","scope":"alice"},"message":"Plugin failed to start, terminating pid 11","time":"2024-10-30T03:06:14.701727838Z"}
ftl-runner: error: failed to spawn plugin: plugin process died: context canceled
```
  • Loading branch information
jvmakine authored Oct 30, 2024
1 parent 03212ac commit a803098
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 67 deletions.
15 changes: 13 additions & 2 deletions common/plugin/spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,21 +161,32 @@ func Spawn[Client PingableClient](
return nil, nil, err
}

dialCtx, cancel := context.WithTimeout(ctx, opts.startTimeout)
defer cancel()

// Wait for the plugin to start.
client := rpc.Dial(makeClient, pluginEndpoint.String(), log.Trace)
pingErr := make(chan error)
go func() {
retry := backoff.Backoff{Min: pluginRetryDelay, Max: pluginRetryDelay}
err := rpc.Wait(ctx, retry, opts.startTimeout, client)
err := rpc.Wait(dialCtx, retry, client)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Deliberately don't close pingErr because the select loop below
// will catch dialCtx closing and return a better error.
return
}
pingErr <- err
close(pingErr)
}()

select {
case <-dialCtx.Done():
return nil, nil, fmt.Errorf("plugin timed out while starting: %w", dialCtx.Err())

case <-cmdCtx.Done():
return nil, nil, fmt.Errorf("plugin process died: %w", cmdCtx.Err())

case err = <-pingErr:
case err := <-pingErr:
if err != nil {
return nil, nil, fmt.Errorf("plugin failed to respond to ping: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion frontend/cli/cmd_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ type benchCmd struct {
}

func (c *benchCmd) Run(ctx context.Context, client ftlv1connect.VerbServiceClient) error {
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, c.Wait, client); err != nil {
ctx, cancel := context.WithTimeout(ctx, c.Wait)
defer cancel()
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, client); err != nil {
return fmt.Errorf("FTL cluster did not become ready: %w", err)
}
logger := log.FromContext(ctx)
Expand Down
4 changes: 3 additions & 1 deletion frontend/cli/cmd_box_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func (b *boxRunCmd) Run(ctx context.Context, projConfig projectconfig.Config) er

// Wait for the controller to come up.
client := ftlv1connect.NewControllerServiceClient(rpc.GetHTTPClient(b.Bind.String()), b.Bind.String())
if err := rpc.Wait(ctx, backoff.Backoff{}, b.ControllerTimeout, client); err != nil {
waitCtx, cancel := context.WithTimeout(ctx, b.ControllerTimeout)
defer cancel()
if err := rpc.Wait(waitCtx, backoff.Backoff{}, client); err != nil {
return fmt.Errorf("controller failed to start: %w", err)
}

Expand Down
4 changes: 3 additions & 1 deletion frontend/cli/cmd_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ type callCmd struct {
}

func (c *callCmd) Run(ctx context.Context, client ftlv1connect.VerbServiceClient, ctlCli ftlv1connect.ControllerServiceClient) error {
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, c.Wait, client); err != nil {
ctx, cancel := context.WithTimeout(ctx, c.Wait)
defer cancel()
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, client); err != nil {
return err
}

Expand Down
4 changes: 3 additions & 1 deletion frontend/cli/cmd_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ type pingCmd struct {
}

func (c *pingCmd) Run(ctx context.Context, controller ftlv1connect.ControllerServiceClient) error {
return rpc.Wait(ctx, backoff.Backoff{Max: time.Second}, c.Wait, controller) //nolint:wrapcheck
ctx, cancel := context.WithTimeout(ctx, c.Wait)
defer cancel()
return rpc.Wait(ctx, backoff.Backoff{Max: time.Second}, controller) //nolint:wrapcheck
}
9 changes: 4 additions & 5 deletions frontend/cli/cmd_replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ type replayCmd struct {
}

func (c *replayCmd) Run(ctx context.Context, client ftlv1connect.VerbServiceClient, ctlCli ftlv1connect.ControllerServiceClient) error {
// Wait timeout is for both pings to complete, not each ping individually
startTime := time.Now()

if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, c.Wait, client); err != nil {
ctx, cancel := context.WithTimeout(ctx, c.Wait)
defer cancel()
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, client); err != nil {
return fmt.Errorf("failed to wait for client: %w", err)
}

consoleServiceClient := rpc.Dial(pbconsoleconnect.NewConsoleServiceClient, cli.Endpoint.String(), log.Error)
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, c.Wait-time.Since(startTime), consoleServiceClient); err != nil {
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, consoleServiceClient); err != nil {
return fmt.Errorf("failed to wait for console service client: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions frontend/cli/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini
return err
}
if s.Provisioners > 0 {
if err := rpc.Wait(ctx, backoff.Backoff{Max: s.StartupTimeout}, s.StartupTimeout, provisionerClient); err != nil {
if err := rpc.Wait(ctx, backoff.Backoff{Max: s.StartupTimeout}, provisionerClient); err != nil {
return fmt.Errorf("provisioner failed to start: %w", err)
}
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini
return fmt.Errorf("controller failed to start: %w", err)
}
if s.Provisioners > 0 {
if err := rpc.Wait(ctx, backoff.Backoff{Max: s.StartupTimeout}, s.StartupTimeout, provisionerClient); err != nil {
if err := rpc.Wait(ctx, backoff.Backoff{Max: s.StartupTimeout}, provisionerClient); err != nil {
return fmt.Errorf("provisioner failed to start: %w", err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ func (p *externalPluginImpl) start(ctx context.Context, bind *url.URL, language,
}

func (p *externalPluginImpl) ping(ctx context.Context) error {
err := rpc.Wait(ctx, backoff.Backoff{}, launchTimeout, p.client)
retry := backoff.Backoff{}
heartbeatCtx, cancel := context.WithTimeout(ctx, launchTimeout)
defer cancel()
err := rpc.Wait(heartbeatCtx, retry, p.client)
if err != nil {
return connect.NewError(connect.CodeUnavailable, fmt.Errorf("failed to connect to runner: %w", err))
}
Expand Down
71 changes: 18 additions & 53 deletions internal/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,66 +140,31 @@ func (m mergedContext) Value(key any) any {
return m.values.Value(key)
}

type noopLogSync struct{}

var _ log.Sink = noopLogSync{}

func (noopLogSync) Log(entry log.Entry) error { return nil }

// Wait for a client to become available.
//
// This will repeatedly call Ping() according to the retry policy until the client is
// ready or the deadline is reached.
// This will repeatedly call Ping() every 100ms until the service becomes
// ready. TODO: This will probably need to be smarter at some point.
//
// If "ctx" is cancelled this will return ctx.Err()
//
// Usually rpc errors are logged, but this function will silence ping call errors, and
// returns the last error if the deadline is reached.
func Wait(ctx context.Context, retry backoff.Backoff, deadline time.Duration, client Pingable) error {
errChan := make(chan error)
ctx, cancel := context.WithTimeout(ctx, deadline)
defer cancel()

go func() {
logger := log.FromContext(ctx)
// create a context logger with a new one that does not log debug messages (which include each ping call failures)
silencedCtx := log.ContextWithLogger(ctx, log.New(log.Error, noopLogSync{}))

start := time.Now()
// keep track of the last ping error
var err error
for {
select {
case <-ctx.Done():
if err != nil && errors.Is(ctx.Err(), context.DeadlineExceeded) {
errChan <- err
} else {
errChan <- ctx.Err()
}
return
default:
}
var resp *connect.Response[ftlv1.PingResponse]
resp, err = client.Ping(silencedCtx, connect.NewRequest(&ftlv1.PingRequest{}))
if err == nil {
if resp.Msg.NotReady == nil {
logger.Debugf("Ping succeeded in %.2fs", time.Since(start).Seconds())
errChan <- nil
return
}
err = fmt.Errorf("service is not ready: %s", *resp.Msg.NotReady)
func Wait(ctx context.Context, retry backoff.Backoff, client Pingable) error {
logger := log.FromContext(ctx)
for {
select {
case <-ctx.Done():
return ctx.Err() //nolint:wrapcheck
default:
}
resp, err := client.Ping(ctx, connect.NewRequest(&ftlv1.PingRequest{}))
if err == nil {
if resp.Msg.NotReady == nil {
return nil
}
delay := retry.Duration()
logger.Tracef("Ping failed waiting %s for client: %+v", delay, err)
time.Sleep(delay)
err = fmt.Errorf("service is not ready: %s", *resp.Msg.NotReady)
}
}()

err := <-errChan
if err != nil {
return err
delay := retry.Duration()
logger.Tracef("Ping failed waiting %s for client: %+v", delay, err)
time.Sleep(delay)
}
return nil
}

// RetryStreamingClientStream will repeatedly call handler with the stream
Expand Down

0 comments on commit a803098

Please sign in to comment.