Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion server/database/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ CREATE TABLE IF NOT EXISTS fly_apps (
status TEXT NOT NULL, -- pending, ready, failed

reaped_at timestamptz, -- when we deleted an app and its resources on fly.io
reap_error TEXT, -- error message if reaping failed

created_at timestamptz NOT NULL DEFAULT clock_timestamp(),
updated_at timestamptz NOT NULL DEFAULT clock_timestamp(),
Expand All @@ -439,7 +440,8 @@ CREATE TABLE IF NOT EXISTS fly_apps (
CONSTRAINT fly_apps_access_id_fkey FOREIGN KEY (access_id) REFERENCES functions_access (id) ON DELETE CASCADE
);

CREATE UNIQUE INDEX IF NOT EXISTS fly_apps_project_deployment_function_key ON fly_apps (project_id, deployment_id, function_id, status);
CREATE UNIQUE INDEX IF NOT EXISTS fly_apps_project_deployment_function_active_key ON fly_apps (project_id, deployment_id, function_id) WHERE reaped_at IS NULL;
CREATE INDEX IF NOT EXISTS fly_apps_reaper_idx ON fly_apps (project_id, created_at DESC) WHERE status = 'ready' AND reaped_at IS NULL;
CREATE UNIQUE INDEX IF NOT EXISTS fly_apps_seq_key ON fly_apps (seq DESC);
CREATE INDEX IF NOT EXISTS fly_apps_access_id_idx ON fly_apps (access_id DESC);

Expand Down
2 changes: 2 additions & 0 deletions server/database/sqlc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ sql:
- schema: schema.sql
queries: ../internal/functions/queries.sql
engine: postgresql
database:
managed: true
gen:
go:
package: "repo"
Expand Down
6 changes: 6 additions & 0 deletions server/internal/background/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Activities struct {
processDeployment *activities.ProcessDeployment
provisionFunctionsAccess *activities.ProvisionFunctionsAccess
deployFunctionRunners *activities.DeployFunctionRunners
reapFlyApps *activities.ReapFlyApps
refreshBillingUsage *activities.RefreshBillingUsage
refreshOpenRouterKey *activities.RefreshOpenRouterKey
slackChatCompletion *activities.SlackChatCompletion
Expand Down Expand Up @@ -72,6 +73,7 @@ func NewActivities(
processDeployment: activities.NewProcessDeployment(logger, tracerProvider, meterProvider, db, features, assetStorage, billingRepo),
provisionFunctionsAccess: activities.NewProvisionFunctionsAccess(logger, db, encryption),
deployFunctionRunners: activities.NewDeployFunctionRunners(logger, db, functionsDeployer, functionsVersion, encryption),
reapFlyApps: activities.NewReapFlyApps(logger, meterProvider, db, functionsDeployer, 3),
refreshBillingUsage: activities.NewRefreshBillingUsage(logger, db, billingRepo),
refreshOpenRouterKey: activities.NewRefreshOpenRouterKey(logger, db, openrouter),
slackChatCompletion: activities.NewSlackChatCompletionActivity(logger, slackClient, chatClient),
Expand Down Expand Up @@ -144,3 +146,7 @@ func (a *Activities) DeployFunctionRunners(ctx context.Context, req activities.D
func (a *Activities) ValidateDeployment(ctx context.Context, projectID uuid.UUID, deploymentID uuid.UUID) error {
return a.validateDeployment.Do(ctx, projectID, deploymentID)
}

func (a *Activities) ReapFlyApps(ctx context.Context) (*activities.ReapFlyAppsResult, error) {
return a.reapFlyApps.Do(ctx)
}
25 changes: 25 additions & 0 deletions server/internal/background/activities/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
meterFunctionsToolsSkipped = "functions.tools.skipped"
meterFunctionsToolsCounter = "functions.tools.count"
meterFunctionsProcessedDuration = "functions.processed.duration"

meterFlyAppReaperReapCount = "flyapp_reaper.reap.count"
)

type metrics struct {
Expand All @@ -40,6 +42,8 @@ type metrics struct {
functionsToolsSkipped metric.Int64Counter
functionsToolsCounter metric.Int64Counter
functionsProcessedDuration metric.Float64Histogram

flyAppReaperReapCount metric.Int64Counter
}

func newMetrics(meter metric.Meter, logger *slog.Logger) *metrics {
Expand Down Expand Up @@ -120,6 +124,15 @@ func newMetrics(meter metric.Meter, logger *slog.Logger) *metrics {
logger.ErrorContext(ctx, "failed to create metric", attr.SlogMetricName(meterFunctionsProcessedDuration), attr.SlogError(err))
}

flyAppReaperReapCount, err := meter.Int64Counter(
meterFlyAppReaperReapCount,
metric.WithDescription("Number of fly apps reaped by the reaper workflow"),
metric.WithUnit("{app}"),
)
if err != nil {
logger.ErrorContext(ctx, "failed to create metric", attr.SlogMetricName(meterFlyAppReaperReapCount), attr.SlogError(err))
}

return &metrics{
opSkipped: opSkipped,
openAPIUpgradeCounter: openAPIUpgradeCounter,
Expand All @@ -129,6 +142,7 @@ func newMetrics(meter metric.Meter, logger *slog.Logger) *metrics {
functionsToolsSkipped: functionsToolsSkipped,
functionsToolsCounter: functionsToolsCounter,
functionsProcessedDuration: functionsProcessedDuration,
flyAppReaperReapCount: flyAppReaperReapCount,
}
}

Expand Down Expand Up @@ -214,3 +228,14 @@ func (m *metrics) RecordFunctionsProcessed(
))
}
}

func (m *metrics) RecordFlyAppReaperReapCount(ctx context.Context, success int64, fail int64) {
if counter := m.flyAppReaperReapCount; counter != nil {
counter.Add(ctx, success, metric.WithAttributes(
attr.Outcome(o11y.OutcomeSuccess),
))
counter.Add(ctx, fail, metric.WithAttributes(
attr.Outcome(o11y.OutcomeFailure),
))
}
}
99 changes: 99 additions & 0 deletions server/internal/background/activities/reap_functions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package activities

import (
"context"
"log/slog"

"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/metric"

"github.com/speakeasy-api/gram/server/internal/attr"
"github.com/speakeasy-api/gram/server/internal/functions"
funcrepo "github.com/speakeasy-api/gram/server/internal/functions/repo"
"github.com/speakeasy-api/gram/server/internal/oops"
)

type ReapFlyAppsResult struct {
Reaped int
Errors int
}

type ReapFlyApps struct {
logger *slog.Logger
metrics *metrics
db *pgxpool.Pool
deployer functions.Deployer
keepCount int64
}

func NewReapFlyApps(
logger *slog.Logger,
meterProvider metric.MeterProvider,
db *pgxpool.Pool,
deployer functions.Deployer,
keepCount int64,
) *ReapFlyApps {
return &ReapFlyApps{
logger: logger.With(attr.SlogComponent("flyio-reaper")),
metrics: newMetrics(newMeter(meterProvider), logger),
db: db,
deployer: deployer,
keepCount: keepCount,
}
}

func (r *ReapFlyApps) Do(ctx context.Context) (*ReapFlyAppsResult, error) {
logger := r.logger

repo := funcrepo.New(r.db)

// Get all apps that should be reaped (keeping only the most recent N per project)
appsToReap, err := repo.GetFlyAppsToReap(ctx, pgtype.Int8{Int64: r.keepCount, Valid: true})
if err != nil {
return nil, oops.E(oops.CodeUnexpected, err, "failed to query apps to reap").Log(ctx, logger)
}

if len(appsToReap) == 0 {
logger.InfoContext(ctx, "no apps to reap")
return &ReapFlyAppsResult{
Reaped: 0,
Errors: 0,
}, nil
}

result := &ReapFlyAppsResult{
Reaped: 0,
Errors: 0,
}

for _, app := range appsToReap {
appLogger := logger.With(
attr.SlogFlyAppInternalID(app.ID.String()),
attr.SlogFlyAppName(app.AppName),
attr.SlogFlyOrgSlug(app.FlyOrgSlug),
attr.SlogProjectID(app.ProjectID.String()),
attr.SlogDeploymentID(app.DeploymentID.String()),
attr.SlogDeploymentFunctionsID(app.FunctionID.String()),
)

appLogger.InfoContext(ctx, "reaping fly app")

if err := r.deployer.Reap(ctx, functions.ReapRequest{
ProjectID: app.ProjectID,
DeploymentID: app.DeploymentID,
FunctionID: app.FunctionID,
}); err != nil {
appLogger.ErrorContext(ctx, "failed to reap app", attr.SlogError(err))
result.Errors++
continue
}

result.Reaped++
appLogger.InfoContext(ctx, "successfully reaped fly app")
}

r.metrics.RecordFlyAppReaperReapCount(ctx, int64(result.Reaped), int64(result.Errors))

return result, nil
}
14 changes: 14 additions & 0 deletions server/internal/background/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,20 @@ func ProcessDeploymentWorkflow(ctx workflow.Context, params ProcessDeploymentWor
)
}

// Trigger functions reaper workflow after successful deployment
// This runs asynchronously and doesn't block the deployment workflow
if finalStatus == "completed" {
_ = workflow.ExecuteChildWorkflow(
workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
WorkflowID: "v1:functions-reaper-triggered",
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
}),
FunctionsReaperWorkflow,
FunctionsReaperWorkflowParams{},
).GetChildWorkflowExecution()
// We don't wait for the reaper to complete - it runs independently
}

return &ProcessDeploymentWorkflowResult{
ProjectID: params.ProjectID,
DeploymentID: params.DeploymentID,
Expand Down
71 changes: 71 additions & 0 deletions server/internal/background/functions_reaper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package background

import (
"context"
"fmt"
"time"

"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"

"github.com/speakeasy-api/gram/server/internal/background/activities"
)

type FunctionsReaperWorkflowParams struct {
// No parameters needed - reaper runs globally across all projects
}

type FunctionsReaperWorkflowResult struct {
AppsReaped int
Errors int
}

func ExecuteFunctionsReaperWorkflow(ctx context.Context, temporalClient client.Client, params FunctionsReaperWorkflowParams) (client.WorkflowRun, error) {
// Use a fixed workflow ID so that only one reaper workflow can run at a time
return temporalClient.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
ID: "v1:functions-reaper",
TaskQueue: string(TaskQueueMain),
WorkflowIDConflictPolicy: enums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowRunTimeout: time.Minute * 10,
}, FunctionsReaperWorkflow, params)
}

func FunctionsReaperWorkflow(ctx workflow.Context, params FunctionsReaperWorkflowParams) (*FunctionsReaperWorkflowResult, error) {
// This can stay nil/unassigned. Temporal just uses this to get activity names.
// The actual activities are registered in the CLI layer (cmd/gram/worker.go).
var a *Activities

logger := workflow.GetLogger(ctx)

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
MaximumInterval: time.Minute,
BackoffCoefficient: 2,
MaximumAttempts: 3,
},
})

var result activities.ReapFlyAppsResult
err := workflow.ExecuteActivity(
ctx,
a.ReapFlyApps,
).Get(ctx, &result)
if err != nil {
return nil, fmt.Errorf("failed to reap functions: %w", err)
}

logger.Info("functions reaper completed",
"apps_reaped", result.Reaped,
"errors", result.Errors,
)

return &FunctionsReaperWorkflowResult{
AppsReaped: result.Reaped,
Errors: result.Errors,
}, nil
}
2 changes: 2 additions & 0 deletions server/internal/background/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func NewTemporalWorker(
temporalWorker.RegisterActivity(activities.TransitionDeployment)
temporalWorker.RegisterActivity(activities.ProvisionFunctionsAccess)
temporalWorker.RegisterActivity(activities.DeployFunctionRunners)
temporalWorker.RegisterActivity(activities.ReapFlyApps)
temporalWorker.RegisterActivity(activities.GetSlackProjectContext)
temporalWorker.RegisterActivity(activities.PostSlackMessage)
temporalWorker.RegisterActivity(activities.SlackChatCompletion)
Expand All @@ -164,6 +165,7 @@ func NewTemporalWorker(
temporalWorker.RegisterActivity(activities.ValidateDeployment)

temporalWorker.RegisterWorkflow(ProcessDeploymentWorkflow)
temporalWorker.RegisterWorkflow(FunctionsReaperWorkflow)
temporalWorker.RegisterWorkflow(SlackEventWorkflow)
temporalWorker.RegisterWorkflow(OpenrouterKeyRefreshWorkflow)
temporalWorker.RegisterWorkflow(CustomDomainRegistrationWorkflow)
Expand Down
1 change: 1 addition & 0 deletions server/internal/database/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions server/internal/functions/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Orchestrator interface {

type Deployer interface {
Deploy(context.Context, RunnerDeployRequest) (*RunnerDeployResult, error)
Reap(context.Context, ReapRequest) error
}

type ToolCaller interface {
Expand Down Expand Up @@ -102,3 +103,9 @@ type RunnerResourceReadRequest struct {
ResourceURI string
ResourceName string
}

type ReapRequest struct {
ProjectID uuid.UUID
DeploymentID uuid.UUID
FunctionID uuid.UUID
}
Loading
Loading