Skip to content

Commit

Permalink
feat: http health api and admin job for health rpc (#5192)
Browse files Browse the repository at this point in the history
* http health api and admin job for health rpc

* small change

* deployment health check instead of runtime health check with some extra checks

* remove unused code

* use id for pagination
  • Loading branch information
k-anshul committed Jul 8, 2024
1 parent 0222717 commit ed7eca5
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 21 deletions.
14 changes: 14 additions & 0 deletions admin/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type DB interface {
InsertProjectWhitelistedDomain(ctx context.Context, opts *InsertProjectWhitelistedDomainOptions) (*ProjectWhitelistedDomain, error)
DeleteProjectWhitelistedDomain(ctx context.Context, id string) error

FindDeployments(ctx context.Context, afterID string, limit int) ([]*Deployment, error)
FindExpiredDeployments(ctx context.Context) ([]*Deployment, error)
FindDeploymentsForProject(ctx context.Context, projectID string) ([]*Deployment, error)
FindDeployment(ctx context.Context, id string) (*Deployment, error)
Expand Down Expand Up @@ -364,6 +365,19 @@ const (
DeploymentStatusError DeploymentStatus = 4
)

func (d DeploymentStatus) String() string {
switch d {
case DeploymentStatusPending:
return "Pending"
case DeploymentStatusOK:
return "OK"
case DeploymentStatusError:
return "Error"
default:
return "Unspecified"
}
}

// Deployment is a single deployment of a git branch.
// Deployments belong to a project.
type Deployment struct {
Expand Down
20 changes: 20 additions & 0 deletions admin/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"strings"
"time"

"github.com/XSAM/otelsql"
Expand Down Expand Up @@ -441,6 +442,25 @@ func (c *connection) DeleteProjectWhitelistedDomain(ctx context.Context, id stri
return checkDeleteRow("project whitelist domain", res, err)
}

func (c *connection) FindDeployments(ctx context.Context, afterID string, limit int) ([]*database.Deployment, error) {
var qry strings.Builder
var args []any
qry.WriteString("SELECT d.* FROM deployments d ")
if afterID != "" {
qry.WriteString("WHERE d.id > $1 ORDER BY d.id LIMIT $2")
args = []any{afterID, limit}
} else {
qry.WriteString("ORDER BY d.id LIMIT $1")
args = []any{limit}
}
var res []*database.Deployment
err := c.getDB(ctx).SelectContext(ctx, &res, qry.String(), args...)
if err != nil {
return nil, parseErr("deployments", err)
}
return res, nil
}

// FindExpiredDeployments returns all the deployments which are expired as per prod ttl
func (c *connection) FindExpiredDeployments(ctx context.Context) ([]*database.Deployment, error) {
var res []*database.Deployment
Expand Down
40 changes: 20 additions & 20 deletions admin/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *Service) createDeployment(ctx context.Context, opts *createDeploymentOp
}

// Open a runtime client
rt, err := s.openRuntimeClient(alloc.Host, alloc.Audience)
rt, err := s.OpenRuntimeClient(alloc.Host, alloc.Audience)
if err != nil {
err2 := p.Deprovision(ctx, provisionID)
err3 := s.DB.DeleteDeployment(ctx, depl.ID)
Expand Down Expand Up @@ -348,6 +348,24 @@ func (s *Service) HibernateDeployments(ctx context.Context) error {
return nil
}

func (s *Service) OpenRuntimeClient(host, audience string) (*client.Client, error) {
jwt, err := s.issuer.NewToken(auth.TokenOptions{
AudienceURL: audience,
TTL: time.Hour,
SystemPermissions: []auth.Permission{auth.ManageInstances, auth.ReadInstance, auth.EditInstance, auth.ReadObjects},
})
if err != nil {
return nil, err
}

rt, err := client.New(host, jwt)
if err != nil {
return nil, err
}

return rt, nil
}

func (s *Service) teardownDeployment(ctx context.Context, depl *database.Deployment) error {
// Delete the deployment
err := s.DB.DeleteDeployment(ctx, depl.ID)
Expand Down Expand Up @@ -384,25 +402,7 @@ func (s *Service) teardownDeployment(ctx context.Context, depl *database.Deploym
}

func (s *Service) openRuntimeClientForDeployment(d *database.Deployment) (*client.Client, error) {
return s.openRuntimeClient(d.RuntimeHost, d.RuntimeAudience)
}

func (s *Service) openRuntimeClient(host, audience string) (*client.Client, error) {
jwt, err := s.issuer.NewToken(auth.TokenOptions{
AudienceURL: audience,
TTL: time.Hour,
SystemPermissions: []auth.Permission{auth.ManageInstances, auth.ReadInstance, auth.EditInstance, auth.ReadObjects},
})
if err != nil {
return nil, err
}

rt, err := client.New(host, jwt)
if err != nil {
return nil, err
}

return rt, nil
return s.OpenRuntimeClient(d.RuntimeHost, d.RuntimeAudience)
}

type DeploymentAnnotations struct {
Expand Down
108 changes: 108 additions & 0 deletions admin/worker/deployments_health_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package worker

import (
"context"
"errors"
"fmt"
"time"

"github.com/rilldata/rill/admin/database"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
)

func (w *Worker) deploymentsHealthCheck(ctx context.Context) error {
afterID := ""
limit := 100
seenHosts := map[string]bool{}
for {
deployments, err := w.admin.DB.FindDeployments(ctx, afterID, limit)
if err != nil {
return fmt.Errorf("deploymentsHealthCheck: failed to get deployments: %w", err)
}
if len(deployments) == 0 {
return nil
}

group, cctx := errgroup.WithContext(ctx)
group.SetLimit(8)
for _, d := range deployments {
d := d
if d.Status != database.DeploymentStatusOK {
if time.Since(d.UpdatedOn) > time.Hour {
w.logger.Error("deploymentsHealthCheck: failing deployment", zap.String("id", d.ID), zap.String("status", d.Status.String()), zap.Duration("duration", time.Since(d.UpdatedOn)))
}
continue
}
if seenHosts[d.RuntimeHost] {
continue
}
seenHosts[d.RuntimeHost] = true
group.Go(func() error {
return w.deploymentHealthCheck(cctx, d)
})
}
if err := group.Wait(); err != nil {
return err
}
if len(deployments) < limit {
return nil
}
afterID = deployments[len(deployments)-1].ID
// fetch again
}
}

func (w *Worker) deploymentHealthCheck(ctx context.Context, d *database.Deployment) error {
client, err := w.admin.OpenRuntimeClient(d.RuntimeHost, d.RuntimeAudience)
if err != nil {
w.logger.Error("deploymentsHealthCheck: failed to open runtime client", zap.String("host", d.RuntimeHost), zap.Error(err))
return nil
}
defer client.Close()

resp, err := client.Health(ctx, &runtimev1.HealthRequest{})
if err != nil {
if status.Code(err) != codes.Unavailable {
w.logger.Error("deploymentsHealthCheck: health check call failed", zap.String("host", d.RuntimeHost), zap.Error(err))
return nil
}
// an unavailable error could also be because the deployment got deleted
d, dbErr := w.admin.DB.FindDeployment(ctx, d.ID)
if dbErr != nil {
if errors.Is(dbErr, database.ErrNotFound) {
// Deployment was deleted
return nil
}
w.logger.Error("deploymentsHealthCheck: failed to find deployment", zap.String("deployment", d.ID), zap.Error(dbErr))
return nil
}
if d.Status == database.DeploymentStatusOK {
w.logger.Error("deploymentsHealthCheck: health check call failed", zap.String("host", d.RuntimeHost), zap.Error(err))
}
// Deployment status changed (probably being deleted)
return nil
}

if !isRuntimeHealthy(resp) {
s, _ := protojson.Marshal(resp)
w.logger.Error("deploymentsHealthCheck: runtime is unhealthy", zap.String("host", d.RuntimeHost), zap.ByteString("health_response", s))
}
return nil
}

func isRuntimeHealthy(r *runtimev1.HealthResponse) bool {
if r.LimiterError != "" || r.ConnCacheError != "" || r.MetastoreError != "" || r.NetworkError != "" {
return false
}
for _, v := range r.InstancesHealth {
if v.ControllerError != "" || v.OlapError != "" || v.RepoError != "" {
return false
}
}
return true
}
3 changes: 3 additions & 0 deletions admin/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (w *Worker) Run(ctx context.Context) error {
group.Go(func() error {
return w.schedule(ctx, "delete_unused_assets", w.deleteUnusedAssets, 6*time.Hour)
})
group.Go(func() error {
return w.schedule(ctx, "deployments_health_check", w.deploymentsHealthCheck, 10*time.Minute)
})

if w.admin.Biller.GetReportingWorkerCron() != "" {
group.Go(func() error {
Expand Down
4 changes: 3 additions & 1 deletion runtime/drivers/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ func (h *Handle) Ping(ctx context.Context) error {
// check connectivity with admin service
_, err := h.admin.Ping(ctx, &adminv1.PingRequest{})

_ = h.repoMu.RLock(ctx)
if lockErr := h.repoMu.RLock(ctx); lockErr != nil {
return lockErr
}
defer h.repoMu.RUnlock()
return errors.Join(err, h.syncErr)
}
Expand Down
34 changes: 34 additions & 0 deletions runtime/server/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"net"
"net/http"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/pkg/observability"
Expand Down Expand Up @@ -68,6 +69,39 @@ func (s *Server) InstanceHealth(ctx context.Context, req *runtimev1.InstanceHeal
}, nil
}

func (s *Server) healthCheckHandler(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
if err := s.limiter.Ping(ctx); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

// internet access
if err := pingCloudfareDNS(ctx); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

// runtime health
// we don't return 5xx on olap errors and hanging connections
status, err := s.runtime.Health(ctx)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if status.Registry != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
for _, h := range status.InstancesHealth {
if h.Controller != nil || h.Repo != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
w.WriteHeader(http.StatusOK)
}

func pingCloudfareDNS(ctx context.Context) error {
d := net.Dialer{}
conn, err := d.DialContext(ctx, "tcp", "1.1.1.1:53")
Expand Down
3 changes: 3 additions & 0 deletions runtime/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ func (s *Server) HTTPHandler(ctx context.Context, registerAdditionalHandlers fun
// Add gRPC-gateway on httpMux
httpMux.Handle("/v1/", gwMux)

// Add HTTP handler for health check
observability.MuxHandle(httpMux, "/v1/health", observability.Middleware("runtime", s.logger, http.HandlerFunc(s.healthCheckHandler)))

// Add HTTP handler for query export downloads
observability.MuxHandle(httpMux, "/v1/download", observability.Middleware("runtime", s.logger, auth.HTTPMiddleware(s.aud, http.HandlerFunc(s.downloadHandler))))

Expand Down

0 comments on commit ed7eca5

Please sign in to comment.