Skip to content

Commit

Permalink
Add jobs package to admin with a River implementation (#5549)
Browse files Browse the repository at this point in the history
* Add jobs package to admin with a River implementation

* Add observability wrapper to reset_all_deployments job

* Self review

* Add riverui to devtools

* Add separate config for River database URL

* Fix review comments

* Add job cancel and error handler

* Add duplicate field to insert result

* Add logger to river client
  • Loading branch information
kaspersjo authored Sep 2, 2024
1 parent ddc0957 commit 7d619fa
Show file tree
Hide file tree
Showing 12 changed files with 496 additions and 209 deletions.
2 changes: 2 additions & 0 deletions admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/rilldata/rill/admin/billing"
"github.com/rilldata/rill/admin/billing/payment"
"github.com/rilldata/rill/admin/database"
"github.com/rilldata/rill/admin/jobs"
"github.com/rilldata/rill/admin/provisioner"
"github.com/rilldata/rill/runtime/pkg/email"
"github.com/rilldata/rill/runtime/server/auth"
Expand All @@ -31,6 +32,7 @@ type Options struct {

type Service struct {
DB database.DB
Jobs jobs.Client
URLs *URLs
ProvisionerSet map[string]provisioner.Provisioner
Email *email.Client
Expand Down
19 changes: 19 additions & 0 deletions admin/jobs/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package jobs

import (
"context"
)

type Client interface {
Close(ctx context.Context) error
Work(ctx context.Context) error
CancelJob(ctx context.Context, jobID int64) error

// NOTE: Add new job trigger functions here
ResetAllDeployments(ctx context.Context) (*InsertResult, error)
}

type InsertResult struct {
ID int64
Duplicate bool
}
80 changes: 80 additions & 0 deletions admin/jobs/river/reset_all_deployments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package river

import (
"context"

"github.com/rilldata/rill/admin"
"github.com/rilldata/rill/admin/database"
"github.com/rilldata/rill/runtime/pkg/observability"
"github.com/riverqueue/river"
"go.uber.org/zap"
)

type ResetAllDeploymentsArgs struct{}

func (ResetAllDeploymentsArgs) Kind() string { return "reset_all_deployments" }

type ResetAllDeploymentsWorker struct {
river.WorkerDefaults[ResetAllDeploymentsArgs]
admin *admin.Service
}

func (w *ResetAllDeploymentsWorker) Work(ctx context.Context, job *river.Job[ResetAllDeploymentsArgs]) error {
return work(ctx, w.admin.Logger, job.Kind, w.resetAllDeployments)
}

func (w *ResetAllDeploymentsWorker) resetAllDeployments(ctx context.Context) error {
// Iterate over batches of projects to redeploy
limit := 100
afterName := ""
stop := false
for !stop {
// Get batch and update iterator variables
projs, err := w.admin.DB.FindProjects(ctx, afterName, limit)
if err != nil {
return err
}
if len(projs) < limit {
stop = true
}
if len(projs) != 0 {
afterName = projs[len(projs)-1].Name
}

// Process batch
for _, proj := range projs {
err := w.resetAllDeploymentsForProject(ctx, proj)
if err != nil {
// We log the error, but continues to the next project
w.admin.Logger.Error("reset all deployments: failed to reset project deployments", zap.String("project_id", proj.ID), observability.ZapCtx(ctx), zap.Error(err))
}
}
}

return nil
}

func (w *ResetAllDeploymentsWorker) resetAllDeploymentsForProject(ctx context.Context, proj *database.Project) error {
depls, err := w.admin.DB.FindDeploymentsForProject(ctx, proj.ID)
if err != nil {
return err
}

for _, depl := range depls {
// Make sure the deployment provisioner is in the current provisioner set
_, ok := w.admin.ProvisionerSet[depl.Provisioner]
if !ok {
w.admin.Logger.Error("reset all deployments: provisioner is not in the provisioner set", zap.String("provisioner", depl.Provisioner), zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx))
continue
}

w.admin.Logger.Info("reset all deployments: redeploying deployment", zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx))
_, err = w.admin.TriggerRedeploy(ctx, proj, depl)
if err != nil {
return err
}
w.admin.Logger.Info("reset all deployments: redeployed deployment", zap.String("deployment_id", depl.ID), observability.ZapCtx(ctx))
}

return nil
}
188 changes: 188 additions & 0 deletions admin/jobs/river/river.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package river

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rilldata/rill/admin"
"github.com/rilldata/rill/admin/jobs"
"github.com/rilldata/rill/runtime/pkg/observability"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
"github.com/riverqueue/river/rivertype"
"github.com/robfig/cron/v3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
oteltrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/exp/zapslog"
)

var (
tracer = otel.Tracer("github.com/rilldata/rill/admin/jobs/river")
meter = otel.Meter("github.com/rilldata/rill/admin/jobs/river")
jobLatencyHistogram = observability.Must(meter.Int64Histogram("job_latency", metric.WithUnit("ms")))
)

type Client struct {
dbPool *pgxpool.Pool
riverClient *river.Client[pgx.Tx]
}

func New(ctx context.Context, dsn string, adm *admin.Service) (jobs.Client, error) {
dbPool, err := pgxpool.New(ctx, dsn)
if err != nil {
return nil, err
}

tx, err := dbPool.Begin(ctx)
if err != nil {
return nil, err
}
defer func() { _ = tx.Rollback(ctx) }()

migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil)

res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, nil)
if err != nil {
return nil, err
}

err = tx.Commit(ctx)
if err != nil {
return nil, err
}

for _, version := range res.Versions {
adm.Logger.Info("river database migrated", zap.String("direction", string(res.Direction)), zap.Int("version", version.Version))
}

workers := river.NewWorkers()
// NOTE: Register new job workers here
river.AddWorker(workers, &ValidateDeploymentsWorker{admin: adm})
river.AddWorker(workers, &ResetAllDeploymentsWorker{admin: adm})

periodicJobs := []*river.PeriodicJob{
// NOTE: Add new periodic jobs here
newPeriodicJob(&ValidateDeploymentsArgs{}, "* */6 * * *", true),
}

// Wire our zap logger to a slog logger for the river client
logger := slog.New(zapslog.NewHandler(adm.Logger.Core(), &zapslog.HandlerOptions{
AddSource: true,
}))

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
PeriodicJobs: periodicJobs,
Logger: logger,
JobTimeout: time.Hour,
MaxAttempts: 3,
ErrorHandler: &ErrorHandler{logger: adm.Logger},
})
if err != nil {
return nil, err
}

return &Client{
dbPool: dbPool,
riverClient: riverClient,
}, nil
}

func (c *Client) Close(ctx context.Context) error {
err := c.riverClient.Stop(ctx)
c.dbPool.Close()
return err
}

func (c *Client) Work(ctx context.Context) error {
err := c.riverClient.Start(ctx)
if err != nil {
return err
}
return nil
}

func (c *Client) CancelJob(ctx context.Context, jobID int64) error {
_, err := c.riverClient.JobCancel(ctx, jobID)
if err != nil {
return err
}
return nil
}

// NOTE: Add new job trigger functions here
func (c *Client) ResetAllDeployments(ctx context.Context) (*jobs.InsertResult, error) {
res, err := c.riverClient.Insert(ctx, ResetAllDeploymentsArgs{}, nil)
if err != nil {
return nil, err
}
return &jobs.InsertResult{
ID: res.Job.ID,
Duplicate: res.UniqueSkippedAsDuplicate,
}, nil
}

type ErrorHandler struct {
logger *zap.Logger
}

func (h *ErrorHandler) HandleError(ctx context.Context, job *rivertype.JobRow, err error) *river.ErrorHandlerResult {
var args string
_ = json.Unmarshal(job.EncodedArgs, &args) // ignore parse errors
h.logger.Error("Job errored", zap.Int64("job_id", job.ID), zap.Int("num_attempt", job.Attempt), zap.String("kind", job.Kind), zap.String("args", args), zap.Error(err))
return nil
}

func (h *ErrorHandler) HandlePanic(ctx context.Context, job *rivertype.JobRow, panicVal any, trace string) *river.ErrorHandlerResult {
var args string
_ = json.Unmarshal(job.EncodedArgs, &args) // ignore parse errors
h.logger.Error("Job panicked", zap.Int64("job_id", job.ID), zap.String("kind", job.Kind), zap.String("args", args), zap.Any("panic_val", panicVal), zap.String("trace", trace))
// Set the job to be immediately cancelled
return &river.ErrorHandlerResult{SetCancelled: true}
}

func newPeriodicJob(jobArgs river.JobArgs, cronExpr string, runOnStart bool) *river.PeriodicJob {
schedule, err := cron.ParseStandard(cronExpr)
if err != nil {
panic(err)
}

periodicJob := river.NewPeriodicJob(
schedule,
func() (river.JobArgs, *river.InsertOpts) {
return jobArgs, nil
},
&river.PeriodicJobOpts{RunOnStart: runOnStart},
)

return periodicJob
}

// Observability work wrapper for the job workers
func work(ctx context.Context, logger *zap.Logger, name string, fn func(context.Context) error) error {
ctx, span := tracer.Start(ctx, fmt.Sprintf("runJob %s", name), oteltrace.WithAttributes(attribute.String("name", name)))
defer span.End()

start := time.Now()
logger.Info("job started", zap.String("name", name), observability.ZapCtx(ctx))
err := fn(ctx)
jobLatencyHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attribute.String("name", name), attribute.Bool("failed", err != nil)))
if err != nil {
logger.Error("job failed", zap.String("name", name), zap.Error(err), zap.Duration("duration", time.Since(start)), observability.ZapCtx(ctx))
return err
}
logger.Info("job completed", zap.String("name", name), zap.Duration("duration", time.Since(start)), observability.ZapCtx(ctx))
return nil
}
Loading

0 comments on commit 7d619fa

Please sign in to comment.