diff --git a/admin/database/database.go b/admin/database/database.go index 186a37dafb0..2cfefaaa221 100644 --- a/admin/database/database.go +++ b/admin/database/database.go @@ -86,6 +86,7 @@ type DB interface { InsertProjectWhitelistedDomain(ctx context.Context, opts *InsertProjectWhitelistedDomainOptions) (*ProjectWhitelistedDomain, error) DeleteProjectWhitelistedDomain(ctx context.Context, id string) error + FindDeployments(ctx context.Context, afterID string, limit int) ([]*Deployment, error) FindExpiredDeployments(ctx context.Context) ([]*Deployment, error) FindDeploymentsForProject(ctx context.Context, projectID string) ([]*Deployment, error) FindDeployment(ctx context.Context, id string) (*Deployment, error) @@ -379,6 +380,19 @@ const ( DeploymentStatusError DeploymentStatus = 4 ) +func (d DeploymentStatus) String() string { + switch d { + case DeploymentStatusPending: + return "Pending" + case DeploymentStatusOK: + return "OK" + case DeploymentStatusError: + return "Error" + default: + return "Unspecified" + } +} + // Deployment is a single deployment of a git branch. // Deployments belong to a project. type Deployment struct { diff --git a/admin/database/postgres/postgres.go b/admin/database/postgres/postgres.go index 6f02a35e380..c0705023865 100644 --- a/admin/database/postgres/postgres.go +++ b/admin/database/postgres/postgres.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "strings" "time" "github.com/XSAM/otelsql" @@ -441,6 +442,25 @@ func (c *connection) DeleteProjectWhitelistedDomain(ctx context.Context, id stri return checkDeleteRow("project whitelist domain", res, err) } +func (c *connection) FindDeployments(ctx context.Context, afterID string, limit int) ([]*database.Deployment, error) { + var qry strings.Builder + var args []any + qry.WriteString("SELECT d.* FROM deployments d ") + if afterID != "" { + qry.WriteString("WHERE d.id > $1 ORDER BY d.id LIMIT $2") + args = []any{afterID, limit} + } else { + qry.WriteString("ORDER BY d.id LIMIT $1") + args = []any{limit} + } + var res []*database.Deployment + err := c.getDB(ctx).SelectContext(ctx, &res, qry.String(), args...) + if err != nil { + return nil, parseErr("deployments", err) + } + return res, nil +} + // FindExpiredDeployments returns all the deployments which are expired as per prod ttl func (c *connection) FindExpiredDeployments(ctx context.Context) ([]*database.Deployment, error) { var res []*database.Deployment diff --git a/admin/deployments.go b/admin/deployments.go index b27c7974f84..25262c0facd 100644 --- a/admin/deployments.go +++ b/admin/deployments.go @@ -137,7 +137,7 @@ func (s *Service) createDeployment(ctx context.Context, opts *createDeploymentOp } // Open a runtime client - rt, err := s.openRuntimeClient(alloc.Host, alloc.Audience) + rt, err := s.OpenRuntimeClient(alloc.Host, alloc.Audience) if err != nil { err2 := p.Deprovision(ctx, provisionID) err3 := s.DB.DeleteDeployment(ctx, depl.ID) @@ -348,6 +348,24 @@ func (s *Service) HibernateDeployments(ctx context.Context) error { return nil } +func (s *Service) OpenRuntimeClient(host, audience string) (*client.Client, error) { + jwt, err := s.issuer.NewToken(auth.TokenOptions{ + AudienceURL: audience, + TTL: time.Hour, + SystemPermissions: []auth.Permission{auth.ManageInstances, auth.ReadInstance, auth.EditInstance, auth.ReadObjects}, + }) + if err != nil { + return nil, err + } + + rt, err := client.New(host, jwt) + if err != nil { + return nil, err + } + + return rt, nil +} + func (s *Service) teardownDeployment(ctx context.Context, depl *database.Deployment) error { // Delete the deployment err := s.DB.DeleteDeployment(ctx, depl.ID) @@ -384,25 +402,7 @@ func (s *Service) teardownDeployment(ctx context.Context, depl *database.Deploym } func (s *Service) openRuntimeClientForDeployment(d *database.Deployment) (*client.Client, error) { - return s.openRuntimeClient(d.RuntimeHost, d.RuntimeAudience) -} - -func (s *Service) openRuntimeClient(host, audience string) (*client.Client, error) { - jwt, err := s.issuer.NewToken(auth.TokenOptions{ - AudienceURL: audience, - TTL: time.Hour, - SystemPermissions: []auth.Permission{auth.ManageInstances, auth.ReadInstance, auth.EditInstance, auth.ReadObjects}, - }) - if err != nil { - return nil, err - } - - rt, err := client.New(host, jwt) - if err != nil { - return nil, err - } - - return rt, nil + return s.OpenRuntimeClient(d.RuntimeHost, d.RuntimeAudience) } type DeploymentAnnotations struct { diff --git a/admin/worker/deployments_health_check.go b/admin/worker/deployments_health_check.go new file mode 100644 index 00000000000..c3e45fb54b2 --- /dev/null +++ b/admin/worker/deployments_health_check.go @@ -0,0 +1,108 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/rilldata/rill/admin/database" + runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protojson" +) + +func (w *Worker) deploymentsHealthCheck(ctx context.Context) error { + afterID := "" + limit := 100 + seenHosts := map[string]bool{} + for { + deployments, err := w.admin.DB.FindDeployments(ctx, afterID, limit) + if err != nil { + return fmt.Errorf("deploymentsHealthCheck: failed to get deployments: %w", err) + } + if len(deployments) == 0 { + return nil + } + + group, cctx := errgroup.WithContext(ctx) + group.SetLimit(8) + for _, d := range deployments { + d := d + if d.Status != database.DeploymentStatusOK { + if time.Since(d.UpdatedOn) > time.Hour { + w.logger.Error("deploymentsHealthCheck: failing deployment", zap.String("id", d.ID), zap.String("status", d.Status.String()), zap.Duration("duration", time.Since(d.UpdatedOn))) + } + continue + } + if seenHosts[d.RuntimeHost] { + continue + } + seenHosts[d.RuntimeHost] = true + group.Go(func() error { + return w.deploymentHealthCheck(cctx, d) + }) + } + if err := group.Wait(); err != nil { + return err + } + if len(deployments) < limit { + return nil + } + afterID = deployments[len(deployments)-1].ID + // fetch again + } +} + +func (w *Worker) deploymentHealthCheck(ctx context.Context, d *database.Deployment) error { + client, err := w.admin.OpenRuntimeClient(d.RuntimeHost, d.RuntimeAudience) + if err != nil { + w.logger.Error("deploymentsHealthCheck: failed to open runtime client", zap.String("host", d.RuntimeHost), zap.Error(err)) + return nil + } + defer client.Close() + + resp, err := client.Health(ctx, &runtimev1.HealthRequest{}) + if err != nil { + if status.Code(err) != codes.Unavailable { + w.logger.Error("deploymentsHealthCheck: health check call failed", zap.String("host", d.RuntimeHost), zap.Error(err)) + return nil + } + // an unavailable error could also be because the deployment got deleted + d, dbErr := w.admin.DB.FindDeployment(ctx, d.ID) + if dbErr != nil { + if errors.Is(dbErr, database.ErrNotFound) { + // Deployment was deleted + return nil + } + w.logger.Error("deploymentsHealthCheck: failed to find deployment", zap.String("deployment", d.ID), zap.Error(dbErr)) + return nil + } + if d.Status == database.DeploymentStatusOK { + w.logger.Error("deploymentsHealthCheck: health check call failed", zap.String("host", d.RuntimeHost), zap.Error(err)) + } + // Deployment status changed (probably being deleted) + return nil + } + + if !isRuntimeHealthy(resp) { + s, _ := protojson.Marshal(resp) + w.logger.Error("deploymentsHealthCheck: runtime is unhealthy", zap.String("host", d.RuntimeHost), zap.ByteString("health_response", s)) + } + return nil +} + +func isRuntimeHealthy(r *runtimev1.HealthResponse) bool { + if r.LimiterError != "" || r.ConnCacheError != "" || r.MetastoreError != "" || r.NetworkError != "" { + return false + } + for _, v := range r.InstancesHealth { + if v.ControllerError != "" || v.OlapError != "" || v.RepoError != "" { + return false + } + } + return true +} diff --git a/admin/worker/worker.go b/admin/worker/worker.go index cbef6763fa3..50b7d984631 100644 --- a/admin/worker/worker.go +++ b/admin/worker/worker.go @@ -67,6 +67,9 @@ func (w *Worker) Run(ctx context.Context) error { group.Go(func() error { return w.schedule(ctx, "delete_unused_assets", w.deleteUnusedAssets, 6*time.Hour) }) + group.Go(func() error { + return w.schedule(ctx, "deployments_health_check", w.deploymentsHealthCheck, 10*time.Minute) + }) if w.admin.Biller.GetReportingWorkerCron() != "" { group.Go(func() error { diff --git a/runtime/drivers/admin/admin.go b/runtime/drivers/admin/admin.go index 825487fd8dd..86a96e2dc81 100644 --- a/runtime/drivers/admin/admin.go +++ b/runtime/drivers/admin/admin.go @@ -143,7 +143,9 @@ func (h *Handle) Ping(ctx context.Context) error { // check connectivity with admin service _, err := h.admin.Ping(ctx, &adminv1.PingRequest{}) - _ = h.repoMu.RLock(ctx) + if lockErr := h.repoMu.RLock(ctx); lockErr != nil { + return lockErr + } defer h.repoMu.RUnlock() return errors.Join(err, h.syncErr) } diff --git a/runtime/server/health.go b/runtime/server/health.go index acde65b2342..b914a55c0cf 100644 --- a/runtime/server/health.go +++ b/runtime/server/health.go @@ -3,6 +3,7 @@ package server import ( "context" "net" + "net/http" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime/pkg/observability" @@ -68,6 +69,39 @@ func (s *Server) InstanceHealth(ctx context.Context, req *runtimev1.InstanceHeal }, nil } +func (s *Server) healthCheckHandler(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + if err := s.limiter.Ping(ctx); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + // internet access + if err := pingCloudfareDNS(ctx); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + // runtime health + // we don't return 5xx on olap errors and hanging connections + status, err := s.runtime.Health(ctx) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + if status.Registry != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + for _, h := range status.InstancesHealth { + if h.Controller != nil || h.Repo != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + } + w.WriteHeader(http.StatusOK) +} + func pingCloudfareDNS(ctx context.Context) error { d := net.Dialer{} conn, err := d.DialContext(ctx, "tcp", "1.1.1.1:53") diff --git a/runtime/server/server.go b/runtime/server/server.go index 361af36cd5c..069f0fdd692 100644 --- a/runtime/server/server.go +++ b/runtime/server/server.go @@ -203,6 +203,9 @@ func (s *Server) HTTPHandler(ctx context.Context, registerAdditionalHandlers fun // Add gRPC-gateway on httpMux httpMux.Handle("/v1/", gwMux) + // Add HTTP handler for health check + observability.MuxHandle(httpMux, "/v1/health", observability.Middleware("runtime", s.logger, http.HandlerFunc(s.healthCheckHandler))) + // Add HTTP handler for query export downloads observability.MuxHandle(httpMux, "/v1/download", observability.Middleware("runtime", s.logger, auth.HTTPMiddleware(s.aud, http.HandlerFunc(s.downloadHandler))))