Skip to content

Commit

Permalink
Enhancement: Better Health Checks (#125)
Browse files Browse the repository at this point in the history
* better health check service

* fix linter

* added feedback

* removed replace socket conn
  • Loading branch information
Tíghearnán Carroll authored Feb 25, 2022
1 parent 53bff9f commit a2f5b8e
Show file tree
Hide file tree
Showing 33 changed files with 1,460 additions and 55 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ create-alias:
@go run -race main.go create $(alias)

install-linter:
@curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)bin v1.43.0
@curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)bin v1.44.2

install-swagger-gen:
@go get -d github.com/swaggo/swag/cmd/swag
Expand Down
18 changes: 14 additions & 4 deletions cmd/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"net/url"
"time"

"github.com/InVisionApp/go-health/v2"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/libsv/payd"
"github.com/libsv/payd/config"
paydSQL "github.com/libsv/payd/data/sqlite"
"github.com/libsv/payd/docs"
"github.com/libsv/payd/dpp"
"github.com/libsv/payd/log"
"github.com/libsv/payd/service"
thttp "github.com/libsv/payd/transports/http"
Expand Down Expand Up @@ -107,8 +109,16 @@ func SetupSocketServer(cfg config.Socket, e *echo.Echo) *server.SocketServer {
}

// SetupHealthEndpoint setup the health check.
func SetupHealthEndpoint(cfg config.Config, g *echo.Group, c *client.Client) {
thttp.NewHealthHandler(service.NewHealthService(c, cfg.P4)).RegisterRoutes(g)
func SetupHealthEndpoint(cfg config.Config, g *echo.Group, c *client.Client, deps *SocketDeps) error {
h := health.New()

if err := dpp.NewHealthCheck(h, c, deps.InvoiceService, deps.ConnectService, cfg.P4).Start(); err != nil {
return errors.Wrap(err, "failed to start dpp health check")
}

thttp.NewHealthHandler(service.NewHealthService(h)).RegisterRoutes(g)

return errors.Wrap(h.Start(), "failed to start health checker")
}

// ResumeActiveChannels resume listening to active peer channels.
Expand Down Expand Up @@ -142,13 +152,13 @@ func ResumeSocketConnections(deps *SocketDeps, cfg *config.P4) error {
}

ctx := context.Background()
invoices, err := deps.InvoiceService.Invoices(ctx)
invoices, err := deps.InvoiceService.InvoicesPending(ctx)
if err != nil {
return errors.Wrap(err, "failed to retrieve invoices")
}

for _, invoice := range invoices {
if time.Now().UTC().Unix() <= invoice.ExpiresAt.Time.UTC().Unix() && invoice.State == payd.StateInvoicePending {
if time.Now().UTC().Unix() <= invoice.ExpiresAt.Time.UTC().Unix() {
if err := deps.ConnectService.Connect(ctx, payd.ConnectArgs{
InvoiceID: invoice.ID,
}); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ func main() {
internal.SetupSocketClient(*cfg, deps, c)
// setup socket endpoints
internal.SetupSocketHTTPEndpoints(*cfg.Deployment, deps, g)
internal.SetupHealthEndpoint(*cfg, g, c)
if err := internal.SetupHealthEndpoint(*cfg, g, c, deps); err != nil {
log.Fatal(err, "failed to create health checks")
}

if err := internal.ResumeActiveChannels(deps); err != nil {
log.Fatal(err, "failed to resume active peer channels")
Expand Down
18 changes: 18 additions & 0 deletions data/sqlite/invoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ const (
WHERE state != 'deleted'
`

sqlPendingInvoices = `
SELECT invoice_id, satoshis, description, spv_required, payment_reference, payment_received_at, expires_at, state, refund_to, refunded_at, created_at, updated_at, deleted_at
FROM invoices
WHERE state == 'pending'
`

// TODO - sort updates when working on rest of Invoice API.
sqlInvoiceUpdate = `
UPDATE invoices
Expand Down Expand Up @@ -69,6 +75,18 @@ func (s *sqliteStore) Invoices(ctx context.Context) ([]payd.Invoice, error) {
return resp, nil
}

// InvoicesPending will return any invoices that have the status 'pending`.
func (s *sqliteStore) InvoicesPending(ctx context.Context) ([]payd.Invoice, error) {
var resp []payd.Invoice
if err := s.db.SelectContext(ctx, &resp, sqlPendingInvoices); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, lathos.NewErrNotFound(errcodes.ErrInvoicesNotFound, "no invoices found")
}
return nil, errors.Wrapf(err, "failed to get invoices")
}
return resp, nil
}

// Create will persist a new Invoice in the data store.
func (s *sqliteStore) InvoiceCreate(ctx context.Context, req payd.InvoiceCreate) (*payd.Invoice, error) {
tx, err := s.newTx(ctx)
Expand Down
143 changes: 143 additions & 0 deletions dpp/healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package dpp

import (
"context"
"net/url"
"time"

"github.com/InVisionApp/go-health/v2"
"github.com/libsv/payd"
"github.com/libsv/payd/config"
"github.com/pkg/errors"
"github.com/theflyingcodr/lathos"
"github.com/theflyingcodr/sockets"
"github.com/theflyingcodr/sockets/client"
)

type healthCheck struct {
h health.IHealth
c *client.Client
cfg *config.P4
invSvc payd.InvoiceService
connSvc payd.ConnectService
}

// NewHealthCheck return a new DPP health check.
func NewHealthCheck(h health.IHealth, c *client.Client, invSvc payd.InvoiceService, connSvc payd.ConnectService, cfg *config.P4) payd.HealthCheck {
return &healthCheck{
h: h,
c: c,
cfg: cfg,
invSvc: invSvc,
connSvc: connSvc,
}
}

// Start the health check.
func (h *healthCheck) Start() error {
u, err := url.Parse(h.cfg.ServerHost)
if err != nil {
return err
}
if u.Scheme != "ws" && u.Scheme != "wss" {
return nil
}

if err := h.commsCheck(); err != nil {
return errors.Wrap(err, "failed to start comms health check")
}
if err := h.channelCheck(); err != nil {
return errors.Wrap(err, "failed to start channel health check")
}
return nil
}

func (h *healthCheck) commsCheck() error {
if err := h.h.AddCheck(&health.Config{
Name: "p4-comms",
Checker: &commsCheck{
c: h.c,
host: h.cfg.ServerHost,
},
Interval: time.Duration(2) * time.Second,
}); err != nil {
return errors.Wrap(err, "failed to create p4-comms healthcheck")
}
if err := h.h.AddCheck(&health.Config{
Name: "p4-channel-conn",
Checker: &channelCheck{
c: h.c,
host: h.cfg.ServerHost,
invSvc: h.invSvc,
connSvc: h.connSvc,
},
Interval: time.Duration(10) * time.Second,
}); err != nil {
return errors.Wrap(err, "failed to create p4-channel-conn healthcheck")
}
return nil
}

func (h *healthCheck) channelCheck() error {
return nil
}

type commsCheck struct {
c *client.Client
host string
}

// Status of communication.
func (ch *commsCheck) Status() (interface{}, error) {
if err := ch.c.JoinChannel(ch.host, "health", nil, map[string]string{
"internal": "true",
}); err != nil {
return nil, errors.Wrap(err, "failed to join p4 health channel")
}
if err := ch.c.Publish(sockets.Request{
ChannelID: "health",
MessageKey: "my-p4",
Body: "ping",
}); err != nil {
return nil, errors.Wrap(err, "failed to ping p4")
}
ch.c.LeaveChannel("health", nil)
return nil, nil
}

type channelCheck struct {
c *client.Client
host string
invSvc payd.InvoiceService
connSvc payd.ConnectService
}

// Status of channels.
func (ch *channelCheck) Status() (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()

invoices, err := ch.invSvc.InvoicesPending(context.Background())
if err != nil {
if lathos.IsNotFound(err) {
return nil, nil
}
return nil, errors.Wrap(err, "failed to get invoices for channel check")
}
for _, invoice := range invoices {
if invoice.ExpiresAt.Time.UTC().Before(time.Now().UTC()) {
continue
}
if ch.c.HasChannel(invoice.ID) {
continue
}

if err := ch.connSvc.Connect(ctx, payd.ConnectArgs{
InvoiceID: invoice.ID,
}); err != nil {
return nil, errors.Wrapf(err, "failed reconnecting to channel for invoice '%s'", invoice.ID)
}

}
return nil, nil
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/libsv/go-bc v0.1.8
github.com/rs/zerolog v1.26.1
github.com/theflyingcodr/sockets v0.0.11-beta.0.20220222160101-76100ef886b5
github.com/theflyingcodr/sockets v0.0.11-beta.0.20220225103542-c6eecb16f586
)

require (
github.com/InVisionApp/go-logger v1.0.1 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
Expand Down Expand Up @@ -90,6 +91,7 @@ require (
)

require (
github.com/InVisionApp/go-health/v2 v2.1.2
github.com/libsv/go-p4 v0.0.8
github.com/libsv/go-spvchannels v0.0.1
)
Expand Down
Loading

0 comments on commit a2f5b8e

Please sign in to comment.