Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions tavern/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ var (
EnvDBMaxConnLifetime = EnvInteger{"DB_MAX_CONN_LIFETIME", 3600}

// EnvGCPProjectID represents the project id tavern is deployed in for Google Cloud Platform deployments (leave empty otherwise).
// EnvGCPPubsubKeepAliveIntervalMs is the interval to publish no-op pubsub messages to help avoid gcppubsub coldstart latency. 0 disables this feature.
// EnvPubSubTopicShellInput defines the topic to publish shell input to.
// EnvPubSubSubscriptionShellInput defines the subscription to receive shell input from.
// EnvPubSubTopicShellOutput defines the topic to publish shell output to.
// EnvPubSubSubscriptionShellOutput defines the subscription to receive shell output from.
EnvGCPProjectID = EnvString{"GCP_PROJECT_ID", ""}
EnvGCPPubsubKeepAliveIntervalMs = EnvInteger{"GCP_PUBSUB_KEEP_ALIVE_INTERVAL_MS", 1000}
EnvPubSubTopicShellInput = EnvString{"PUBSUB_TOPIC_SHELL_INPUT", "mem://shell_input"}
EnvPubSubSubscriptionShellInput = EnvString{"PUBSUB_SUBSCRIPTION_SHELL_INPUT", "mem://shell_input"}
EnvPubSubTopicShellOutput = EnvString{"PUBSUB_TOPIC_SHELL_OUTPUT", "mem://shell_output"}
Expand Down Expand Up @@ -201,6 +203,17 @@ func (cfg *Config) NewShellMuxes(ctx context.Context) (wsMux *stream.Mux, grpcMu
slog.DebugContext(ctx, "created GCP PubSub subscription for shell input", "subscription_name", subShellInput)
subShellOutput = fmt.Sprintf("gcppubsub://projects/%s/subscriptions/%s", projectID, createGCPSubscription(ctx, shellOutputTopic))
slog.DebugContext(ctx, "created GCP PubSub subscription for shell output", "subscription_name", subShellOutput)

// Start a goroutine to publish noop messages on an interval.
// This reduces cold-start latency for GCP PubSub which can improve shell user experience.
if interval := EnvGCPPubsubKeepAliveIntervalMs.Int(); interval > 0 {
go stream.PreventPubSubColdStarts(
ctx,
time.Duration(interval)*time.Millisecond,
topicShellOutput,
topicShellInput,
)
}
}

pubOutput, err := pubsub.OpenTopic(ctx, topicShellOutput)
Expand Down
61 changes: 61 additions & 0 deletions tavern/internal/http/stream/gcp_coldstart.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package stream

import (
"context"
"log/slog"
"time"

"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/gcppubsub"
)

// PreventPubSubColdStarts by publishing noop messages on an interval.
// This reduces cold-start latency for GCP PubSub which can improve shell user experience.
// In other environments, this functionality may not be necessary.
func PreventPubSubColdStarts(ctx context.Context, interval time.Duration, topicShellOutput string, topicShellInput string) {
if interval == 0 {
slog.WarnContext(ctx, "gcppubsub cold-start polling disabled due to 0ms interval, this may impact shell latency")
return
}
if interval < 1*time.Millisecond {
slog.WarnContext(ctx, "gcppubsub cold-start polling interval less than minimum, setting to 1 millisecond")
interval = 1 * time.Millisecond
}

pubOutput, err := pubsub.OpenTopic(ctx, topicShellOutput)
if err != nil {
slog.ErrorContext(ctx, "warmup failed to connect to pubsub output topic, cold-start latency may impact user experience (%q): %v", topicShellOutput, err)
return
}
pubInput, err := pubsub.OpenTopic(ctx, topicShellInput)
if err != nil {
slog.ErrorContext(ctx, "warmup failed to connect to pubsub input topic, cold-start latency may impact user experience (%q): %v", topicShellOutput, err)
return
}

ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := pubOutput.Send(ctx, &pubsub.Message{
Metadata: map[string]string{
"id": "noop",
},
Body: []byte{0},
}); err != nil {
slog.ErrorContext(ctx, "warmup failed to publish shell output keep-alive no op. GCP coldstart latency may be encountered.")
}
if err := pubInput.Send(ctx, &pubsub.Message{
Metadata: map[string]string{
"id": "noop",
},
Body: []byte{0},
}); err != nil {
slog.ErrorContext(ctx, "warmup failed to publish shell input keep-alive no op. GCP coldstart latency may be encountered.")
}
}
}
}
Loading