diff --git a/admin/admin.go b/admin/admin.go index 0dc48999d1d..32ffa2ab9a7 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -9,6 +9,7 @@ import ( "github.com/rilldata/rill/admin/billing" "github.com/rilldata/rill/admin/billing/payment" "github.com/rilldata/rill/admin/database" + "github.com/rilldata/rill/admin/jobs" "github.com/rilldata/rill/admin/provisioner" "github.com/rilldata/rill/runtime/pkg/email" "github.com/rilldata/rill/runtime/server/auth" @@ -31,6 +32,7 @@ type Options struct { type Service struct { DB database.DB + Jobs jobs.Client URLs *URLs ProvisionerSet map[string]provisioner.Provisioner Email *email.Client diff --git a/admin/jobs/jobs.go b/admin/jobs/jobs.go new file mode 100644 index 00000000000..c928fa34422 --- /dev/null +++ b/admin/jobs/jobs.go @@ -0,0 +1,19 @@ +package jobs + +import ( + "context" +) + +type Client interface { + Close(ctx context.Context) error + Work(ctx context.Context) error + CancelJob(ctx context.Context, jobID int64) error + + // NOTE: Add new job trigger functions here + ResetAllDeployments(ctx context.Context) (*InsertResult, error) +} + +type InsertResult struct { + ID int64 + Duplicate bool +} diff --git a/admin/jobs/river/reset_all_deployments.go b/admin/jobs/river/reset_all_deployments.go new file mode 100644 index 00000000000..5d0cbb129b1 --- /dev/null +++ b/admin/jobs/river/reset_all_deployments.go @@ -0,0 +1,80 @@ +package river + +import ( + "context" + + "github.com/rilldata/rill/admin" + "github.com/rilldata/rill/admin/database" + "github.com/rilldata/rill/runtime/pkg/observability" + "github.com/riverqueue/river" + "go.uber.org/zap" +) + +type ResetAllDeploymentsArgs struct{} + +func (ResetAllDeploymentsArgs) Kind() string { return "reset_all_deployments" } + +type ResetAllDeploymentsWorker struct { + river.WorkerDefaults[ResetAllDeploymentsArgs] + admin *admin.Service +} + +func (w *ResetAllDeploymentsWorker) Work(ctx context.Context, job *river.Job[ResetAllDeploymentsArgs]) error { + return work(ctx, w.admin.Logger, job.Kind, w.resetAllDeployments) +} + +func (w *ResetAllDeploymentsWorker) resetAllDeployments(ctx context.Context) error { + // Iterate over batches of projects to redeploy + limit := 100 + afterName := "" + stop := false + for !stop { + // Get batch and update iterator variables + projs, err := w.admin.DB.FindProjects(ctx, afterName, limit) + if err != nil { + return err + } + if len(projs) < limit { + stop = true + } + if len(projs) != 0 { + afterName = projs[len(projs)-1].Name + } + + // Process batch + for _, proj := range projs { + err := w.resetAllDeploymentsForProject(ctx, proj) + if err != nil { + // We log the error, but continues to the next project + w.admin.Logger.Error("reset all deployments: failed to reset project deployments", zap.String("project_id", proj.ID), observability.ZapCtx(ctx), zap.Error(err)) + } + } + } + + return nil +} + +func (w *ResetAllDeploymentsWorker) resetAllDeploymentsForProject(ctx context.Context, proj *database.Project) error { + depls, err := w.admin.DB.FindDeploymentsForProject(ctx, proj.ID) + if err != nil { + return err + } + + for _, depl := range depls { + // Make sure the deployment provisioner is in the current provisioner set + _, ok := w.admin.ProvisionerSet[depl.Provisioner] + if !ok { + w.admin.Logger.Error("reset all deployments: provisioner is not in the provisioner set", zap.String("provisioner", depl.Provisioner), zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx)) + continue + } + + w.admin.Logger.Info("reset all deployments: redeploying deployment", zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx)) + _, err = w.admin.TriggerRedeploy(ctx, proj, depl) + if err != nil { + return err + } + w.admin.Logger.Info("reset all deployments: redeployed deployment", zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx)) + } + + return nil +} diff --git a/admin/jobs/river/river.go b/admin/jobs/river/river.go new file mode 100644 index 00000000000..98ff054985d --- /dev/null +++ b/admin/jobs/river/river.go @@ -0,0 +1,188 @@ +package river + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/rilldata/rill/admin" + "github.com/rilldata/rill/admin/jobs" + "github.com/rilldata/rill/runtime/pkg/observability" + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivermigrate" + "github.com/riverqueue/river/rivertype" + "github.com/robfig/cron/v3" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + oteltrace "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + "go.uber.org/zap/exp/zapslog" +) + +var ( + tracer = otel.Tracer("github.com/rilldata/rill/admin/jobs/river") + meter = otel.Meter("github.com/rilldata/rill/admin/jobs/river") + jobLatencyHistogram = observability.Must(meter.Int64Histogram("job_latency", metric.WithUnit("ms"))) +) + +type Client struct { + dbPool *pgxpool.Pool + riverClient *river.Client[pgx.Tx] +} + +func New(ctx context.Context, dsn string, adm *admin.Service) (jobs.Client, error) { + dbPool, err := pgxpool.New(ctx, dsn) + if err != nil { + return nil, err + } + + tx, err := dbPool.Begin(ctx) + if err != nil { + return nil, err + } + defer func() { _ = tx.Rollback(ctx) }() + + migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil) + + res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, nil) + if err != nil { + return nil, err + } + + err = tx.Commit(ctx) + if err != nil { + return nil, err + } + + for _, version := range res.Versions { + adm.Logger.Info("river database migrated", zap.String("direction", string(res.Direction)), zap.Int("version", version.Version)) + } + + workers := river.NewWorkers() + // NOTE: Register new job workers here + river.AddWorker(workers, &ValidateDeploymentsWorker{admin: adm}) + river.AddWorker(workers, &ResetAllDeploymentsWorker{admin: adm}) + + periodicJobs := []*river.PeriodicJob{ + // NOTE: Add new periodic jobs here + newPeriodicJob(&ValidateDeploymentsArgs{}, "* */6 * * *", true), + } + + // Wire our zap logger to a slog logger for the river client + logger := slog.New(zapslog.NewHandler(adm.Logger.Core(), &zapslog.HandlerOptions{ + AddSource: true, + })) + + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 100}, + }, + Workers: workers, + PeriodicJobs: periodicJobs, + Logger: logger, + JobTimeout: time.Hour, + MaxAttempts: 3, + ErrorHandler: &ErrorHandler{logger: adm.Logger}, + }) + if err != nil { + return nil, err + } + + return &Client{ + dbPool: dbPool, + riverClient: riverClient, + }, nil +} + +func (c *Client) Close(ctx context.Context) error { + err := c.riverClient.Stop(ctx) + c.dbPool.Close() + return err +} + +func (c *Client) Work(ctx context.Context) error { + err := c.riverClient.Start(ctx) + if err != nil { + return err + } + return nil +} + +func (c *Client) CancelJob(ctx context.Context, jobID int64) error { + _, err := c.riverClient.JobCancel(ctx, jobID) + if err != nil { + return err + } + return nil +} + +// NOTE: Add new job trigger functions here +func (c *Client) ResetAllDeployments(ctx context.Context) (*jobs.InsertResult, error) { + res, err := c.riverClient.Insert(ctx, ResetAllDeploymentsArgs{}, nil) + if err != nil { + return nil, err + } + return &jobs.InsertResult{ + ID: res.Job.ID, + Duplicate: res.UniqueSkippedAsDuplicate, + }, nil +} + +type ErrorHandler struct { + logger *zap.Logger +} + +func (h *ErrorHandler) HandleError(ctx context.Context, job *rivertype.JobRow, err error) *river.ErrorHandlerResult { + var args string + _ = json.Unmarshal(job.EncodedArgs, &args) // ignore parse errors + h.logger.Error("Job errored", zap.Int64("job_id", job.ID), zap.Int("num_attempt", job.Attempt), zap.String("kind", job.Kind), zap.String("args", args), zap.Error(err)) + return nil +} + +func (h *ErrorHandler) HandlePanic(ctx context.Context, job *rivertype.JobRow, panicVal any, trace string) *river.ErrorHandlerResult { + var args string + _ = json.Unmarshal(job.EncodedArgs, &args) // ignore parse errors + h.logger.Error("Job panicked", zap.Int64("job_id", job.ID), zap.String("kind", job.Kind), zap.String("args", args), zap.Any("panic_val", panicVal), zap.String("trace", trace)) + // Set the job to be immediately cancelled + return &river.ErrorHandlerResult{SetCancelled: true} +} + +func newPeriodicJob(jobArgs river.JobArgs, cronExpr string, runOnStart bool) *river.PeriodicJob { + schedule, err := cron.ParseStandard(cronExpr) + if err != nil { + panic(err) + } + + periodicJob := river.NewPeriodicJob( + schedule, + func() (river.JobArgs, *river.InsertOpts) { + return jobArgs, nil + }, + &river.PeriodicJobOpts{RunOnStart: runOnStart}, + ) + + return periodicJob +} + +// Observability work wrapper for the job workers +func work(ctx context.Context, logger *zap.Logger, name string, fn func(context.Context) error) error { + ctx, span := tracer.Start(ctx, fmt.Sprintf("runJob %s", name), oteltrace.WithAttributes(attribute.String("name", name))) + defer span.End() + + start := time.Now() + logger.Info("job started", zap.String("name", name), observability.ZapCtx(ctx)) + err := fn(ctx) + jobLatencyHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attribute.String("name", name), attribute.Bool("failed", err != nil))) + if err != nil { + logger.Error("job failed", zap.String("name", name), zap.Error(err), zap.Duration("duration", time.Since(start)), observability.ZapCtx(ctx)) + return err + } + logger.Info("job completed", zap.String("name", name), zap.Duration("duration", time.Since(start)), observability.ZapCtx(ctx)) + return nil +} diff --git a/admin/jobs/river/validate_deployments.go b/admin/jobs/river/validate_deployments.go new file mode 100644 index 00000000000..5ef1b5d58f7 --- /dev/null +++ b/admin/jobs/river/validate_deployments.go @@ -0,0 +1,147 @@ +package river + +import ( + "context" + "fmt" + "time" + + "github.com/rilldata/rill/admin" + "github.com/rilldata/rill/admin/database" + "github.com/rilldata/rill/runtime/pkg/observability" + "github.com/riverqueue/river" + "go.uber.org/zap" +) + +type ValidateDeploymentsArgs struct{} + +func (ValidateDeploymentsArgs) Kind() string { return "validate_deployments" } + +type ValidateDeploymentsWorker struct { + river.WorkerDefaults[ValidateDeploymentsArgs] + admin *admin.Service +} + +func (w *ValidateDeploymentsWorker) Work(ctx context.Context, job *river.Job[ValidateDeploymentsArgs]) error { + return work(ctx, w.admin.Logger, job.Kind, w.validateDeployments) +} + +const validateAllDeploymentsForProjectTimeout = 5 * time.Minute + +func (w *ValidateDeploymentsWorker) validateDeployments(ctx context.Context) error { + // Resolve 'latest' version + latestVersion := w.admin.ResolveLatestRuntimeVersion() + + // Verify version is valid + err := w.admin.ValidateRuntimeVersion(latestVersion) + if err != nil { + return err + } + + // Iterate over batches of projects + limit := 100 + afterName := "" + stop := false + for !stop { + // Get batch and update iterator variables + projs, err := w.admin.DB.FindProjects(ctx, afterName, limit) + if err != nil { + return err + } + if len(projs) < limit { + stop = true + } + if len(projs) != 0 { + afterName = projs[len(projs)-1].Name + } + + // Process batch + for _, proj := range projs { + err := w.reconcileAllDeploymentsForProject(ctx, proj, latestVersion) + if err != nil { + // We log the error, but continues to the next project + w.admin.Logger.Error("validate deployments: failed to reconcile project deployments", zap.String("project_id", proj.ID), zap.String("version", latestVersion), observability.ZapCtx(ctx), zap.Error(err)) + } + } + } + + return nil +} + +func (w *ValidateDeploymentsWorker) reconcileAllDeploymentsForProject(ctx context.Context, proj *database.Project, latestVersion string) error { + // Apply timeout + ctx, cancel := context.WithTimeout(ctx, validateAllDeploymentsForProjectTimeout) + defer cancel() + + // Get all project deployments + depls, err := w.admin.DB.FindDeploymentsForProject(ctx, proj.ID) + if err != nil { + return err + } + + // Get project organization, we need this to create the deployment annotations + org, err := w.admin.DB.FindOrganization(ctx, proj.OrganizationID) + if err != nil { + return err + } + + var prodDeplID string + if proj.ProdDeploymentID != nil { + prodDeplID = *proj.ProdDeploymentID + } + + for _, depl := range depls { + if depl.ID == prodDeplID { + // Get deployment provisioner + p, ok := w.admin.ProvisionerSet[depl.Provisioner] + if !ok { + return fmt.Errorf("validate deployments: %q is not in the provisioner set", depl.Provisioner) + } + + v, err := p.ValidateConfig(ctx, depl.ProvisionID) + if err != nil { + w.admin.Logger.Warn("validate deployments: error validating provisioner config", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.Error(err), observability.ZapCtx(ctx)) + return err + } + + // Trigger a redeploy if config is no longer valid + if !v { + w.admin.Logger.Info("validate deployments: config no longer valid, triggering redeploy", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx)) + _, err = w.admin.TriggerRedeploy(ctx, proj, depl) + if err != nil { + return err + } + w.admin.Logger.Info("validate deployments: redeployed", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), observability.ZapCtx(ctx)) + continue + } + + // If project is running 'latest' version then update if needed, skip if 'static' provisioner type + if p.Type() != "static" && proj.ProdVersion == "latest" && depl.RuntimeVersion != latestVersion { + w.admin.Logger.Info("validate deployments: upgrading deployment", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.String("instance_id", depl.RuntimeInstanceID), zap.String("version", latestVersion), observability.ZapCtx(ctx)) + + // Update deployment to latest version + err = w.admin.UpdateDeployment(ctx, depl, &admin.UpdateDeploymentOptions{ + Version: latestVersion, + Branch: depl.Branch, + Variables: proj.ProdVariables, + Annotations: w.admin.NewDeploymentAnnotations(org, proj), + EvictCachedRepo: false, + }) + if err != nil { + w.admin.Logger.Error("validate deployments: failed to upgrade deployment", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.String("instance_id", depl.RuntimeInstanceID), zap.String("version", latestVersion), observability.ZapCtx(ctx), zap.Error(err)) + return err + } + w.admin.Logger.Info("validate deployments: upgraded deployment", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.String("instance_id", depl.RuntimeInstanceID), zap.String("version", latestVersion), observability.ZapCtx(ctx)) + } + } else if depl.UpdatedOn.Add(3 * time.Hour).Before(time.Now()) { + // Teardown old orphan non-prod deployment if more than 3 hours since last update + w.admin.Logger.Info("validate deployments: teardown deployment", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.String("instance_id", depl.RuntimeInstanceID), observability.ZapCtx(ctx)) + err = w.admin.TeardownDeployment(ctx, depl) + if err != nil { + w.admin.Logger.Error("validate deployments: teardown deployment error", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.String("instance_id", depl.RuntimeInstanceID), observability.ZapCtx(ctx), zap.Error(err)) + continue + } + } + } + + return nil +} diff --git a/admin/worker/reset_all_deployments.go b/admin/worker/reset_all_deployments.go deleted file mode 100644 index 59171b59726..00000000000 --- a/admin/worker/reset_all_deployments.go +++ /dev/null @@ -1,65 +0,0 @@ -package worker - -import ( - "context" - - "github.com/rilldata/rill/admin/database" - "github.com/rilldata/rill/runtime/pkg/observability" - "go.uber.org/zap" -) - -func (w *Worker) resetAllDeployments(ctx context.Context) error { - // Iterate over batches of projects to redeploy - limit := 100 - afterName := "" - stop := false - for !stop { - // Get batch and update iterator variables - projs, err := w.admin.DB.FindProjects(ctx, afterName, limit) - if err != nil { - return err - } - if len(projs) < limit { - stop = true - } - if len(projs) != 0 { - afterName = projs[len(projs)-1].Name - } - - // Process batch - for _, proj := range projs { - err := w.resetAllDeploymentsForProject(ctx, proj) - if err != nil { - // We log the error, but continues to the next project - w.logger.Error("reset all deployments: failed to reset project deployments", zap.String("project_id", proj.ID), observability.ZapCtx(ctx), zap.Error(err)) - } - } - } - - return nil -} - -func (w *Worker) resetAllDeploymentsForProject(ctx context.Context, proj *database.Project) error { - depls, err := w.admin.DB.FindDeploymentsForProject(ctx, proj.ID) - if err != nil { - return err - } - - for _, depl := range depls { - // Make sure the deployment provisioner is in the current provisioner set - _, ok := w.admin.ProvisionerSet[depl.Provisioner] - if !ok { - w.logger.Error("reset all deployments: provisioner is not in the provisioner set", zap.String("provisioner", depl.Provisioner), zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx)) - continue - } - - w.logger.Info("reset all deployments: redeploying deployment", zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx)) - _, err = w.admin.TriggerRedeploy(ctx, proj, depl) - if err != nil { - return err - } - w.logger.Info("reset all deployments: redeployed deployment", zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx)) - } - - return nil -} diff --git a/admin/worker/validate_deployments.go b/admin/worker/validate_deployments.go deleted file mode 100644 index d3e5fd8aced..00000000000 --- a/admin/worker/validate_deployments.go +++ /dev/null @@ -1,133 +0,0 @@ -package worker - -import ( - "context" - "fmt" - "time" - - "github.com/rilldata/rill/admin" - "github.com/rilldata/rill/admin/database" - "github.com/rilldata/rill/runtime/pkg/observability" - "go.uber.org/zap" -) - -const validateAllDeploymentsForProjectTimeout = 5 * time.Minute - -func (w *Worker) validateDeployments(ctx context.Context) error { - // Resolve 'latest' version - latestVersion := w.admin.ResolveLatestRuntimeVersion() - - // Verify version is valid - err := w.admin.ValidateRuntimeVersion(latestVersion) - if err != nil { - return err - } - - // Iterate over batches of projects - limit := 100 - afterName := "" - stop := false - for !stop { - // Get batch and update iterator variables - projs, err := w.admin.DB.FindProjects(ctx, afterName, limit) - if err != nil { - return err - } - if len(projs) < limit { - stop = true - } - if len(projs) != 0 { - afterName = projs[len(projs)-1].Name - } - - // Process batch - for _, proj := range projs { - err := w.reconcileAllDeploymentsForProject(ctx, proj, latestVersion) - if err != nil { - // We log the error, but continues to the next project - w.logger.Error("validate deployments: failed to reconcile project deployments", zap.String("project_id", proj.ID), zap.String("version", latestVersion), observability.ZapCtx(ctx), zap.Error(err)) - } - } - } - - return nil -} - -func (w *Worker) reconcileAllDeploymentsForProject(ctx context.Context, proj *database.Project, latestVersion string) error { - // Apply timeout - ctx, cancel := context.WithTimeout(ctx, validateAllDeploymentsForProjectTimeout) - defer cancel() - - // Get all project deployments - depls, err := w.admin.DB.FindDeploymentsForProject(ctx, proj.ID) - if err != nil { - return err - } - - // Get project organization, we need this to create the deployment annotations - org, err := w.admin.DB.FindOrganization(ctx, proj.OrganizationID) - if err != nil { - return err - } - - var prodDeplID string - if proj.ProdDeploymentID != nil { - prodDeplID = *proj.ProdDeploymentID - } - - for _, depl := range depls { - if depl.ID == prodDeplID { - // Get deployment provisioner - p, ok := w.admin.ProvisionerSet[depl.Provisioner] - if !ok { - return fmt.Errorf("validate deployments: %q is not in the provisioner set", depl.Provisioner) - } - - v, err := p.ValidateConfig(ctx, depl.ProvisionID) - if err != nil { - w.logger.Warn("validate deployments: error validating provisioner config", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.Error(err), observability.ZapCtx(ctx)) - return err - } - - // Trigger a redeploy if config is no longer valid - if !v { - w.logger.Info("validate deployments: config no longer valid, triggering redeploy", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx)) - _, err = w.admin.TriggerRedeploy(ctx, proj, depl) - if err != nil { - return err - } - w.logger.Info("validate deployments: redeployed", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), observability.ZapCtx(ctx)) - continue - } - - // If project is running 'latest' version then update if needed, skip if 'static' provisioner type - if p.Type() != "static" && proj.ProdVersion == "latest" && depl.RuntimeVersion != latestVersion { - w.logger.Info("validate deployments: upgrading deployment", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.String("instance_id", depl.RuntimeInstanceID), zap.String("version", latestVersion), observability.ZapCtx(ctx)) - - // Update deployment to latest version - err = w.admin.UpdateDeployment(ctx, depl, &admin.UpdateDeploymentOptions{ - Version: latestVersion, - Branch: depl.Branch, - Variables: proj.ProdVariables, - Annotations: w.admin.NewDeploymentAnnotations(org, proj), - EvictCachedRepo: false, - }) - if err != nil { - w.logger.Error("validate deployments: failed to upgrade deployment", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.String("instance_id", depl.RuntimeInstanceID), zap.String("version", latestVersion), observability.ZapCtx(ctx), zap.Error(err)) - return err - } - w.logger.Info("validate deployments: upgraded deployment", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.String("instance_id", depl.RuntimeInstanceID), zap.String("version", latestVersion), observability.ZapCtx(ctx)) - } - } else if depl.UpdatedOn.Add(3 * time.Hour).Before(time.Now()) { - // Teardown old orphan non-prod deployment if more than 3 hours since last update - w.logger.Info("validate deployments: teardown deployment", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.String("instance_id", depl.RuntimeInstanceID), observability.ZapCtx(ctx)) - err = w.admin.TeardownDeployment(ctx, depl) - if err != nil { - w.logger.Error("validate deployments: teardown deployment error", zap.String("organization_id", org.ID), zap.String("project_id", proj.ID), zap.String("deployment_id", depl.ID), zap.String("provisioner", depl.Provisioner), zap.String("provision_id", depl.ProvisionID), zap.String("instance_id", depl.RuntimeInstanceID), observability.ZapCtx(ctx), zap.Error(err)) - continue - } - } - } - - return nil -} diff --git a/admin/worker/worker.go b/admin/worker/worker.go index e9a382116e1..7becc26ec75 100644 --- a/admin/worker/worker.go +++ b/admin/worker/worker.go @@ -7,6 +7,7 @@ import ( "time" "github.com/rilldata/rill/admin" + "github.com/rilldata/rill/admin/jobs" "github.com/rilldata/rill/runtime/pkg/graceful" "github.com/rilldata/rill/runtime/pkg/observability" "github.com/robfig/cron/v3" @@ -29,16 +30,25 @@ var ( type Worker struct { logger *zap.Logger admin *admin.Service + jobs jobs.Client } -func New(logger *zap.Logger, adm *admin.Service) *Worker { +func New(logger *zap.Logger, adm *admin.Service, jobsClient jobs.Client) *Worker { return &Worker{ logger: logger, admin: adm, + jobs: jobsClient, } } func (w *Worker) Run(ctx context.Context) error { + // Start jobs client workers + err := w.jobs.Work(ctx) + if err != nil { + panic(err) + } + w.logger.Info("jobs client worker started") + group, ctx := errgroup.WithContext(ctx) group.Go(func() error { return w.schedule(ctx, "check_provisioner_capacity", w.checkProvisionerCapacity, 15*time.Minute) @@ -58,9 +68,6 @@ func (w *Worker) Run(ctx context.Context) error { group.Go(func() error { return w.schedule(ctx, "hibernate_expired_deployments", w.hibernateExpiredDeployments, 15*time.Minute) }) - group.Go(func() error { - return w.schedule(ctx, "validate_deployments", w.validateDeployments, 6*time.Hour) - }) group.Go(func() error { return w.scheduleCron(ctx, "run_autoscaler", w.runAutoscaler, w.admin.AutoscalerCron) }) @@ -99,9 +106,8 @@ func (w *Worker) RunJob(ctx context.Context, name string) error { case "check_provisioner_capacity": return w.runJob(ctx, name, w.checkProvisionerCapacity) case "reset_all_deployments": - return w.runJob(ctx, name, w.resetAllDeployments) - case "validate_deployments": - return w.runJob(ctx, name, w.validateDeployments) + _, err := w.jobs.ResetAllDeployments(ctx) + return err // NOTE: Add new ad-hoc jobs here default: return fmt.Errorf("unknown job: %s", name) diff --git a/cli/cmd/admin/start.go b/cli/cmd/admin/start.go index d8c6702af4a..03661707b76 100644 --- a/cli/cmd/admin/start.go +++ b/cli/cmd/admin/start.go @@ -17,6 +17,7 @@ import ( "github.com/rilldata/rill/admin/ai" "github.com/rilldata/rill/admin/billing" "github.com/rilldata/rill/admin/billing/payment" + "github.com/rilldata/rill/admin/jobs/river" "github.com/rilldata/rill/admin/server" "github.com/rilldata/rill/admin/worker" "github.com/rilldata/rill/cli/pkg/cmdutil" @@ -43,6 +44,7 @@ import ( type Config struct { DatabaseDriver string `default:"postgres" split_words:"true"` DatabaseURL string `split_words:"true"` + RiverDatabaseURL string `split_words:"true"` RedisURL string `default:"" split_words:"true"` ProvisionerSetJSON string `split_words:"true"` DefaultProvisioner string `split_words:"true"` @@ -289,6 +291,16 @@ func StartCmd(ch *cmdutil.Helper) *cobra.Command { } defer adm.Close() + // Init river jobs client + jobs, err := river.New(cmd.Context(), conf.RiverDatabaseURL, adm) + if err != nil { + logger.Fatal("error creating river jobs client", zap.Error(err)) + } + defer jobs.Close(cmd.Context()) + + // Set initialized jobs client on admin so jobs can be triggered from admin + adm.Jobs = jobs + // Parse session keys as hex strings keyPairs := make([][]byte, len(conf.SessionKeyPairs)) for idx, keyHex := range conf.SessionKeyPairs { @@ -349,7 +361,7 @@ func StartCmd(ch *cmdutil.Helper) *cobra.Command { // Init and run worker if runWorker || runJobs { - wkr := worker.New(logger, adm) + wkr := worker.New(logger, adm, jobs) if runWorker { group.Go(func() error { return wkr.Run(cctx) }) if !runServer { diff --git a/cli/cmd/devtool/data/cloud-deps.docker-compose.yml b/cli/cmd/devtool/data/cloud-deps.docker-compose.yml index bcbde657e89..79e9688de5e 100644 --- a/cli/cmd/devtool/data/cloud-deps.docker-compose.yml +++ b/cli/cmd/devtool/data/cloud-deps.docker-compose.yml @@ -28,6 +28,14 @@ services: - ./prometheus.yaml:/etc/prometheus/prometheus.yml ports: - "9412:9090" + riverui: + container_name: riverui + image: ghcr.io/riverqueue/riverui:latest + restart: always + environment: + - DATABASE_URL=postgres://postgres:postgres@postgres:5432/postgres + ports: + - 7070:8080 otel-collector: image: otel/opentelemetry-collector:latest restart: always diff --git a/go.mod b/go.mod index 27100bb7dfe..ad6d2316ec8 100644 --- a/go.mod +++ b/go.mod @@ -61,7 +61,7 @@ require ( github.com/jackc/pgconn v1.14.3 github.com/jackc/pgtype v1.14.3 github.com/jackc/pgx/v4 v4.18.2 - github.com/jackc/pgx/v5 v5.5.5 + github.com/jackc/pgx/v5 v5.6.0 github.com/jmoiron/sqlx v1.3.5 github.com/joho/godotenv v1.5.1 github.com/kelseyhightower/envconfig v1.4.0 @@ -74,6 +74,9 @@ require ( github.com/pingcap/tidb/pkg/parser v0.0.0-20231124053542-069631e2ecfe github.com/prometheus/client_golang v1.19.1 github.com/redis/go-redis/v9 v9.0.2 + github.com/riverqueue/river v0.11.4 + github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.4 + github.com/riverqueue/river/rivertype v0.11.4 github.com/robfig/cron/v3 v3.0.1 github.com/rs/cors v1.9.0 github.com/santhosh-tekuri/jsonschema/v5 v5.2.0 @@ -103,6 +106,7 @@ require ( go.opentelemetry.io/otel/trace v1.29.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 + go.uber.org/zap/exp v0.2.0 gocloud.dev v0.36.0 golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 golang.org/x/oauth2 v0.21.0 @@ -336,6 +340,8 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/richardlehane/mscfb v1.0.4 // indirect github.com/richardlehane/msoleps v1.0.3 // indirect + github.com/riverqueue/river/riverdriver v0.11.4 // indirect + github.com/riverqueue/river/rivershared v0.11.4 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/secure-systems-lab/go-securesystemslib v0.4.0 // indirect github.com/segmentio/asm v1.2.0 // indirect @@ -377,6 +383,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 // indirect go.opentelemetry.io/proto/otlp v1.2.0 // indirect go.uber.org/atomic v1.11.0 // indirect + go.uber.org/goleak v1.3.0 // indirect go.uber.org/mock v0.4.0 // indirect golang.org/x/crypto v0.26.0 // indirect golang.org/x/mod v0.17.0 // indirect diff --git a/go.sum b/go.sum index ffc33cccc6a..545b1b39e4e 100644 --- a/go.sum +++ b/go.sum @@ -1628,6 +1628,8 @@ github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8 github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w= github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM= +github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa h1:s+4MhCQ6YrzisK6hFJUX53drDT4UsSW3DEhKn0ifuHw= +github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= @@ -1662,8 +1664,8 @@ github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQ github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU= github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= -github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= -github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= @@ -2101,6 +2103,18 @@ github.com/richardlehane/msoleps v1.0.3 h1:aznSZzrwYRl3rLKRT3gUk9am7T/mLNSnJINvN github.com/richardlehane/msoleps v1.0.3/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg= github.com/rilldata/arrow/go/v14 v14.0.0-20240624035703-e234e04219ff h1:Tt67B9BQVkymWsosWgz7vyz8MXnlYzc8xbqtxYuPU1s= github.com/rilldata/arrow/go/v14 v14.0.0-20240624035703-e234e04219ff/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= +github.com/riverqueue/river v0.11.4 h1:NMRsODhRgFztf080RMCjI377jldLXsx41E2r7+c0lPE= +github.com/riverqueue/river v0.11.4/go.mod h1:HvgBkqon7lYKm9Su4lVOnn1qx8Q4FnSMJjf5auVial4= +github.com/riverqueue/river/riverdriver v0.11.4 h1:kBg68vfTnRuSwsgcZ7UbKC4ocZ+KSCGnuZw/GwMMMP4= +github.com/riverqueue/river/riverdriver v0.11.4/go.mod h1:+NxTrldRYYsdTbZSxX7L2LuWU/B0IAtAActDJcNbcPs= +github.com/riverqueue/river/riverdriver/riverdatabasesql v0.11.4 h1:QBegZQrB59dafWaiNphJC85KTA0CmeGYcpCqu52qbnI= +github.com/riverqueue/river/riverdriver/riverdatabasesql v0.11.4/go.mod h1:CQC2a/+GRtN6b67IA7jFCvcCtOBWRz3lWqyNxDggKSM= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.4 h1:rRY8WabllXRsLp8U+gxUpYgTgI8dveF3UWnZJu965Lg= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.4/go.mod h1:GgWsTnC7V7lanQLyj8W1UuYuzyDoJZc4bhhDomtYr30= +github.com/riverqueue/river/rivershared v0.11.4 h1:XGfzJKG7hhwd0MwImF/4r+t6F9aq2Q7e6NNYifStnus= +github.com/riverqueue/river/rivershared v0.11.4/go.mod h1:vZc9tRvSZ9spLqcz9UUuKbZGuDRwBhS3LuzLY7d/jkw= +github.com/riverqueue/river/rivertype v0.11.4 h1:TAdi4CQEYukveYneAqm5LupRVZjvSfB8tL3xKR13wi4= +github.com/riverqueue/river/rivertype v0.11.4/go.mod h1:3WRQEDlLKZky/vGwFcZC3uKjC+/8izE6ucHwCsuir98= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= @@ -2462,6 +2476,8 @@ go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.uber.org/zap/exp v0.2.0 h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs= +go.uber.org/zap/exp v0.2.0/go.mod h1:t0gqAIdh1MfKv9EwN/dLwfZnJxe9ITAZN78HEWPFWDQ= gocloud.dev v0.36.0 h1:q5zoXux4xkOZP473e1EZbG8Gq9f0vlg1VNH5Du/ybus= gocloud.dev v0.36.0/go.mod h1:bLxah6JQVKBaIxzsr5BQLYB4IYdWHkMZdzCXlo6F0gg= golang.org/x/crypto v0.0.0-20171113213409-9f005a07e0d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=