Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: http health api and admin job for health rpc #5192

Merged
merged 5 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions admin/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type DB interface {
InsertProjectWhitelistedDomain(ctx context.Context, opts *InsertProjectWhitelistedDomainOptions) (*ProjectWhitelistedDomain, error)
DeleteProjectWhitelistedDomain(ctx context.Context, id string) error

FindDeployments(ctx context.Context, createdAfter time.Time, limit int) ([]*Deployment, error)
FindExpiredDeployments(ctx context.Context) ([]*Deployment, error)
FindDeploymentsForProject(ctx context.Context, projectID string) ([]*Deployment, error)
FindDeployment(ctx context.Context, id string) (*Deployment, error)
Expand Down Expand Up @@ -364,6 +365,19 @@ const (
DeploymentStatusError DeploymentStatus = 4
)

func (d DeploymentStatus) String() string {
switch d {
case DeploymentStatusPending:
return "Pending"
case DeploymentStatusOK:
return "OK"
case DeploymentStatusError:
return "Error"
default:
return "Unspecified"
}
}

// Deployment is a single deployment of a git branch.
// Deployments belong to a project.
type Deployment struct {
Expand Down
9 changes: 9 additions & 0 deletions admin/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,15 @@ func (c *connection) DeleteProjectWhitelistedDomain(ctx context.Context, id stri
return checkDeleteRow("project whitelist domain", res, err)
}

func (c *connection) FindDeployments(ctx context.Context, createdAfter time.Time, limit int) ([]*database.Deployment, error) {
var res []*database.Deployment
err := c.getDB(ctx).SelectContext(ctx, &res, "SELECT d.* FROM deployments d WHERE d.created_on > $1 ORDER BY d.created_on LIMIT $2", createdAfter, limit)
begelundmuller marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, parseErr("deployments", err)
}
return res, nil
}

// FindExpiredDeployments returns all the deployments which are expired as per prod ttl
func (c *connection) FindExpiredDeployments(ctx context.Context) ([]*database.Deployment, error) {
var res []*database.Deployment
Expand Down
40 changes: 20 additions & 20 deletions admin/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *Service) createDeployment(ctx context.Context, opts *createDeploymentOp
}

// Open a runtime client
rt, err := s.openRuntimeClient(alloc.Host, alloc.Audience)
rt, err := s.OpenRuntimeClient(alloc.Host, alloc.Audience)
if err != nil {
err2 := p.Deprovision(ctx, provisionID)
err3 := s.DB.DeleteDeployment(ctx, depl.ID)
Expand Down Expand Up @@ -348,6 +348,24 @@ func (s *Service) HibernateDeployments(ctx context.Context) error {
return nil
}

func (s *Service) OpenRuntimeClient(host, audience string) (*client.Client, error) {
jwt, err := s.issuer.NewToken(auth.TokenOptions{
AudienceURL: audience,
TTL: time.Hour,
SystemPermissions: []auth.Permission{auth.ManageInstances, auth.ReadInstance, auth.EditInstance, auth.ReadObjects},
})
if err != nil {
return nil, err
}

rt, err := client.New(host, jwt)
if err != nil {
return nil, err
}

return rt, nil
}

func (s *Service) teardownDeployment(ctx context.Context, depl *database.Deployment) error {
// Delete the deployment
err := s.DB.DeleteDeployment(ctx, depl.ID)
Expand Down Expand Up @@ -384,25 +402,7 @@ func (s *Service) teardownDeployment(ctx context.Context, depl *database.Deploym
}

func (s *Service) openRuntimeClientForDeployment(d *database.Deployment) (*client.Client, error) {
return s.openRuntimeClient(d.RuntimeHost, d.RuntimeAudience)
}

func (s *Service) openRuntimeClient(host, audience string) (*client.Client, error) {
jwt, err := s.issuer.NewToken(auth.TokenOptions{
AudienceURL: audience,
TTL: time.Hour,
SystemPermissions: []auth.Permission{auth.ManageInstances, auth.ReadInstance, auth.EditInstance, auth.ReadObjects},
})
if err != nil {
return nil, err
}

rt, err := client.New(host, jwt)
if err != nil {
return nil, err
}

return rt, nil
return s.OpenRuntimeClient(d.RuntimeHost, d.RuntimeAudience)
}

type DeploymentAnnotations struct {
Expand Down
108 changes: 108 additions & 0 deletions admin/worker/deployments_health_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package worker

import (
"context"
"errors"
"fmt"
"time"

"github.com/rilldata/rill/admin/database"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
)

func (w *Worker) deploymentsHealthCheck(ctx context.Context) error {
var createdAfter time.Time
limit := 100
seenHosts := map[string]bool{}
for {
deployments, err := w.admin.DB.FindDeployments(ctx, createdAfter, limit)
if err != nil {
return fmt.Errorf("deploymentsHealthCheck: failed to get deployments: %w", err)
}
if len(deployments) == 0 {
return nil
}

group, cctx := errgroup.WithContext(ctx)
group.SetLimit(8)
for _, d := range deployments {
d := d
if d.Status != database.DeploymentStatusOK {
if time.Since(d.UpdatedOn) > time.Hour {
w.logger.Error("deploymentsHealthCheck: failing deployment", zap.String("id", d.ID), zap.String("status", d.Status.String()), zap.Duration("duration", time.Since(d.UpdatedOn)))
}
continue
}
if seenHosts[d.RuntimeHost] {
continue
}
seenHosts[d.RuntimeHost] = true
group.Go(func() error {
return w.deploymentHealthCheck(cctx, d)
})
}
if err := group.Wait(); err != nil {
return err
}
if len(deployments) < limit {
return nil
}
createdAfter = deployments[len(deployments)-1].CreatedOn
// fetch again
}
}

func (w *Worker) deploymentHealthCheck(ctx context.Context, d *database.Deployment) error {
client, err := w.admin.OpenRuntimeClient(d.RuntimeHost, d.RuntimeAudience)
if err != nil {
w.logger.Error("deploymentsHealthCheck: failed to open runtime client", zap.String("host", d.RuntimeHost), zap.Error(err))
return nil
}
defer client.Close()

resp, err := client.Health(ctx, &runtimev1.HealthRequest{})
if err != nil {
if status.Code(err) != codes.Unavailable {
w.logger.Error("deploymentsHealthCheck: health check call failed", zap.String("host", d.RuntimeHost), zap.Error(err))
return nil
}
// an unavailable error could also be because the deployment got deleted
d, dbErr := w.admin.DB.FindDeployment(ctx, d.ID)
if dbErr != nil {
if errors.Is(dbErr, database.ErrNotFound) {
// Deployment was deleted
return nil
}
w.logger.Error("deploymentsHealthCheck: failed to find deployment", zap.String("deployment", d.ID), zap.Error(dbErr))
return nil
}
if d.Status == database.DeploymentStatusOK {
w.logger.Error("deploymentsHealthCheck: health check call failed", zap.String("host", d.RuntimeHost), zap.Error(err))
}
// Deployment status changed (probably being deleted)
return nil
}

if !isRuntimeHealthy(resp) {
s, _ := protojson.Marshal(resp)
w.logger.Error("deploymentsHealthCheck: runtime is unhealthy", zap.String("host", d.RuntimeHost), zap.ByteString("health_response", s))
}
return nil
}

func isRuntimeHealthy(r *runtimev1.HealthResponse) bool {
if r.LimiterError != "" || r.ConnCacheError != "" || r.MetastoreError != "" || r.NetworkError != "" {
return false
}
for _, v := range r.InstancesHealth {
if v.ControllerError != "" || v.OlapError != "" || v.RepoError != "" {
return false
}
}
return true
}
3 changes: 3 additions & 0 deletions admin/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (w *Worker) Run(ctx context.Context) error {
group.Go(func() error {
return w.schedule(ctx, "delete_unused_assets", w.deleteUnusedAssets, 6*time.Hour)
})
group.Go(func() error {
return w.schedule(ctx, "deployments_health_check", w.deploymentsHealthCheck, 10*time.Minute)
})

if w.admin.Biller.GetReportingWorkerCron() != "" {
group.Go(func() error {
Expand Down
4 changes: 3 additions & 1 deletion runtime/drivers/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ func (h *Handle) Ping(ctx context.Context) error {
// check connectivity with admin service
_, err := h.admin.Ping(ctx, &adminv1.PingRequest{})

_ = h.repoMu.RLock(ctx)
if lockErr := h.repoMu.RLock(ctx); lockErr != nil {
return lockErr
}
begelundmuller marked this conversation as resolved.
Show resolved Hide resolved
defer h.repoMu.RUnlock()
return errors.Join(err, h.syncErr)
}
Expand Down
34 changes: 34 additions & 0 deletions runtime/server/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"net"
"net/http"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/pkg/observability"
Expand Down Expand Up @@ -68,6 +69,39 @@ func (s *Server) InstanceHealth(ctx context.Context, req *runtimev1.InstanceHeal
}, nil
}

func (s *Server) healthCheckHandler(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
if err := s.limiter.Ping(ctx); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

// internet access
if err := pingCloudfareDNS(ctx); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

// runtime health
// we don't return 5xx on olap errors and hanging connections
status, err := s.runtime.Health(ctx)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if status.Registry != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
for _, h := range status.InstancesHealth {
if h.Controller != nil || h.Repo != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
w.WriteHeader(http.StatusOK)
}

func pingCloudfareDNS(ctx context.Context) error {
d := net.Dialer{}
conn, err := d.DialContext(ctx, "tcp", "1.1.1.1:53")
Expand Down
3 changes: 3 additions & 0 deletions runtime/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ func (s *Server) HTTPHandler(ctx context.Context, registerAdditionalHandlers fun
// Add gRPC-gateway on httpMux
httpMux.Handle("/v1/", gwMux)

// Add HTTP handler for health check
observability.MuxHandle(httpMux, "/v1/health", observability.Middleware("runtime", s.logger, http.HandlerFunc(s.healthCheckHandler)))

// Add HTTP handler for query export downloads
observability.MuxHandle(httpMux, "/v1/download", observability.Middleware("runtime", s.logger, auth.HTTPMiddleware(s.aud, http.HandlerFunc(s.downloadHandler))))

Expand Down
Loading