Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: http health api and admin job for health rpc #5192

Merged
merged 5 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions admin/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ type DB interface {

ResolveRuntimeSlotsUsed(ctx context.Context) ([]*RuntimeSlotsUsed, error)

// FindAllocatedRuntimes returns runtimes on which atleast one instance is allocated
FindAllocatedRuntimes(ctx context.Context, afterRuntime string, limit int) ([]*AllocatedRuntime, error)
k-anshul marked this conversation as resolved.
Show resolved Hide resolved

FindUsers(ctx context.Context) ([]*User, error)
FindUsersByEmailPattern(ctx context.Context, emailPattern, afterEmail string, limit int) ([]*User, error)
FindUser(ctx context.Context, id string) (*User, error)
Expand Down Expand Up @@ -405,6 +408,12 @@ type RuntimeSlotsUsed struct {
SlotsUsed int `db:"slots_used"`
}

// AllocatedRuntime is the result of FindAllocatedRuntimes query.
type AllocatedRuntime struct {
Host string `db:"runtime_host"`
Audience string `db:"runtime_audience"`
}

// User is a person registered in Rill.
// Users may belong to multiple organizations and projects.
type User struct {
Expand Down
13 changes: 13 additions & 0 deletions admin/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,19 @@ func (c *connection) ResolveRuntimeSlotsUsed(ctx context.Context) ([]*database.R
return res, nil
}

func (c *connection) FindAllocatedRuntimes(ctx context.Context, afterRuntime string, limit int) ([]*database.AllocatedRuntime, error) {
var res []*database.AllocatedRuntime
err := c.getDB(ctx).SelectContext(ctx, &res, `
SELECT DISTINCT ON (lower(d.runtime_host)) runtime_host, d.runtime_audience FROM deployments d
WHERE lower(d.runtime_host) > lower($1)
ORDER BY lower(d.runtime_host)
LIMIT $2`, afterRuntime, limit)
if err != nil {
return nil, parseErr("deployments", err)
}
return res, nil
}

func (c *connection) FindUsers(ctx context.Context) ([]*database.User, error) {
var res []*database.User
err := c.getDB(ctx).SelectContext(ctx, &res, "SELECT u.* FROM users u")
Expand Down
40 changes: 20 additions & 20 deletions admin/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *Service) createDeployment(ctx context.Context, opts *createDeploymentOp
}

// Open a runtime client
rt, err := s.openRuntimeClient(alloc.Host, alloc.Audience)
rt, err := s.OpenRuntimeClient(alloc.Host, alloc.Audience)
if err != nil {
err2 := p.Deprovision(ctx, provisionID)
err3 := s.DB.DeleteDeployment(ctx, depl.ID)
Expand Down Expand Up @@ -348,6 +348,24 @@ func (s *Service) HibernateDeployments(ctx context.Context) error {
return nil
}

func (s *Service) OpenRuntimeClient(host, audience string) (*client.Client, error) {
jwt, err := s.issuer.NewToken(auth.TokenOptions{
AudienceURL: audience,
TTL: time.Hour,
SystemPermissions: []auth.Permission{auth.ManageInstances, auth.ReadInstance, auth.EditInstance, auth.ReadObjects},
})
if err != nil {
return nil, err
}

rt, err := client.New(host, jwt)
if err != nil {
return nil, err
}

return rt, nil
}

func (s *Service) teardownDeployment(ctx context.Context, depl *database.Deployment) error {
// Delete the deployment
err := s.DB.DeleteDeployment(ctx, depl.ID)
Expand Down Expand Up @@ -384,25 +402,7 @@ func (s *Service) teardownDeployment(ctx context.Context, depl *database.Deploym
}

func (s *Service) openRuntimeClientForDeployment(d *database.Deployment) (*client.Client, error) {
return s.openRuntimeClient(d.RuntimeHost, d.RuntimeAudience)
}

func (s *Service) openRuntimeClient(host, audience string) (*client.Client, error) {
jwt, err := s.issuer.NewToken(auth.TokenOptions{
AudienceURL: audience,
TTL: time.Hour,
SystemPermissions: []auth.Permission{auth.ManageInstances, auth.ReadInstance, auth.EditInstance, auth.ReadObjects},
})
if err != nil {
return nil, err
}

rt, err := client.New(host, jwt)
if err != nil {
return nil, err
}

return rt, nil
return s.OpenRuntimeClient(d.RuntimeHost, d.RuntimeAudience)
}

type DeploymentAnnotations struct {
Expand Down
73 changes: 73 additions & 0 deletions admin/worker/runtime_health_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package worker

import (
"context"
"fmt"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/encoding/protojson"
)

const _allocatedRuntimesPageSize = 20
k-anshul marked this conversation as resolved.
Show resolved Hide resolved

func (w *Worker) runtimeHealthCheck(ctx context.Context) error {
lastRuntime := ""
k-anshul marked this conversation as resolved.
Show resolved Hide resolved
for {
runtimes, err := w.admin.DB.FindAllocatedRuntimes(ctx, lastRuntime, _allocatedRuntimesPageSize)
if err != nil {
return fmt.Errorf("failed to get runtimes: %w", err)
}
if len(runtimes) == 0 {
return nil
}
lastRuntime = runtimes[len(runtimes)-1].Host

group, cctx := errgroup.WithContext(ctx)
group.SetLimit(8)
for _, rt := range runtimes {
rt := rt
group.Go(func() error {
client, err := w.admin.OpenRuntimeClient(rt.Host, rt.Audience)
if err != nil {
w.logger.Error("runtimeHealthCheck: failed to open runtime client", zap.String("host", rt.Host), zap.Error(err))
return nil
}

k-anshul marked this conversation as resolved.
Show resolved Hide resolved
resp, err := client.Health(cctx, &runtimev1.HealthRequest{})
if err != nil {
client.Close()
w.logger.Error("runtimeHealthCheck: health check call failed", zap.String("host", rt.Host), zap.Error(err))
return nil
}

if !isRuntimeHealthy(resp) {
s, _ := protojson.Marshal(resp)
w.logger.Error("runtimeHealthCheck: runtime is unhealthy", zap.String("host", rt.Host), zap.ByteString("health_response", s))
}
kaspersjo marked this conversation as resolved.
Show resolved Hide resolved
client.Close()
return nil
})
}
if err := group.Wait(); err != nil {
return err
}
if len(runtimes) < _allocatedRuntimesPageSize {
return nil
}
// fetch again
}
}

func isRuntimeHealthy(r *runtimev1.HealthResponse) bool {
if r.LimiterError != "" || r.ConnCacheError != "" || r.MetastoreError != "" || r.NetworkError != "" {
return false
}
for _, v := range r.InstancesHealth {
if v.ControllerError != "" || v.OlapError != "" || v.RepoError != "" {
return false
}
}
return true
}
3 changes: 3 additions & 0 deletions admin/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (w *Worker) Run(ctx context.Context) error {
group.Go(func() error {
return w.schedule(ctx, "delete_unused_assets", w.deleteUnusedAssets, 6*time.Hour)
})
group.Go(func() error {
return w.schedule(ctx, "runtime_health_check", w.runtimeHealthCheck, 10*time.Minute)
k-anshul marked this conversation as resolved.
Show resolved Hide resolved
})

if w.admin.Biller.GetReportingWorkerCron() != "" {
group.Go(func() error {
Expand Down
4 changes: 3 additions & 1 deletion runtime/drivers/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ func (h *Handle) Ping(ctx context.Context) error {
// check connectivity with admin service
_, err := h.admin.Ping(ctx, &adminv1.PingRequest{})

_ = h.repoMu.RLock(ctx)
if lockErr := h.repoMu.RLock(ctx); lockErr != nil {
return lockErr
}
begelundmuller marked this conversation as resolved.
Show resolved Hide resolved
defer h.repoMu.RUnlock()
return errors.Join(err, h.syncErr)
}
Expand Down
34 changes: 34 additions & 0 deletions runtime/server/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"net"
"net/http"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/pkg/observability"
Expand Down Expand Up @@ -68,6 +69,39 @@ func (s *Server) InstanceHealth(ctx context.Context, req *runtimev1.InstanceHeal
}, nil
}

func (s *Server) healthCheckHandler(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
if err := s.limiter.Ping(ctx); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

// internet access
if err := pingCloudfareDNS(ctx); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

// runtime health
// we don't return 5xx on olap errors and hanging connections
status, err := s.runtime.Health(ctx)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if status.Registry != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
for _, h := range status.InstancesHealth {
if h.Controller != nil || h.Repo != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
w.WriteHeader(http.StatusOK)
}

func pingCloudfareDNS(ctx context.Context) error {
d := net.Dialer{}
conn, err := d.DialContext(ctx, "tcp", "1.1.1.1:53")
Expand Down
3 changes: 3 additions & 0 deletions runtime/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ func (s *Server) HTTPHandler(ctx context.Context, registerAdditionalHandlers fun
// Add gRPC-gateway on httpMux
httpMux.Handle("/v1/", gwMux)

// Add HTTP handler for health check
observability.MuxHandle(httpMux, "/v1/health", observability.Middleware("runtime", s.logger, http.HandlerFunc(s.healthCheckHandler)))

// Add HTTP handler for query export downloads
observability.MuxHandle(httpMux, "/v1/download", observability.Middleware("runtime", s.logger, auth.HTTPMiddleware(s.aud, http.HandlerFunc(s.downloadHandler))))

Expand Down
Loading