Skip to content

Commit

Permalink
health agent to monitor lotus
Browse files Browse the repository at this point in the history
watch if chain head changes in a given window of api polls
allows setting a threshold of how many times the chain head can remain
unchanged before failing health check
also can set interval for polling chain head
on failure, restarts systemd unit
  • Loading branch information
ognots committed Jan 14, 2020
1 parent 95e7546 commit d8d8ce7
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 0 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ stats:
.PHONY: stats
BINS+=stats

health:
rm -f lotus-health
go build -o lotus-health ./cmd/lotus-health
.PHONY: health
BINS+=health

# MISC

buildall: $(BINS)
Expand Down
253 changes: 253 additions & 0 deletions cmd/lotus-health/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package main

import (
"context"
"net/http"
"os"
"time"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/lib/jsonrpc"
"github.com/filecoin-project/lotus/node/repo"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
manet "github.com/multiformats/go-multiaddr-net"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
)

type CidWindow [][]cid.Cid

var log = logging.Logger("lotus-seed")

func main() {
logging.SetLogLevel("*", "INFO")

log.Info("Starting health agent")

local := []*cli.Command{
watchHeadCmd,
}

app := &cli.App{
Name: "lotus-health",
Usage: "Tools for monitoring lotus daemon health",
Version: build.UserVersion,
Commands: local,
}

if err := app.Run(os.Args); err != nil {
log.Warn(err)
return
}
}

var watchHeadCmd = &cli.Command{
Name: "watch-head",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
Usage: "lotus repo path",
},
&cli.IntFlag{
Name: "threshold",
Value: 3,
Usage: "number of times head remains unchanged before failing health check",
},
&cli.IntFlag{
Name: "interval",
Value: 45,
Usage: "interval in seconds between chain head checks",
},
&cli.StringFlag{
Name: "systemd-unit",
Value: "lotus-daemon.service",
Usage: "systemd unit name to restart on health check failure",
},
},
Action: func(c *cli.Context) error {
repo := c.String("repo")
threshold := c.Int("threshold")
interval := time.Duration(c.Int("interval"))
name := c.String("systemd-unit")

var headCheckWindow CidWindow
ctx := context.Background()

api, closer, err := GetFullNodeAPI(repo)
if err != nil {
return err
}
defer closer()

if err := WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}

ch := make(chan CidWindow, 1)
aCh := make(chan interface{}, 1)

go func() {
for {
headCheckWindow, err = updateWindow(ctx, api, headCheckWindow, threshold, ch)
if err != nil {
log.Fatal(err)
}
time.Sleep(interval * time.Second)
}
}()

go func() {
result, err := alertHandler(name, aCh)
if err != nil {
log.Fatal(err)
}
if result != "done" {
log.Fatal("systemd unit failed to restart:", result)
}
log.Info("restarting health agent")
// Exit health agent and let supervisor restart health agent
// Restarting lotus systemd unit kills api connection
os.Exit(130)
}()

for {
ok := checkWindow(ch, int(interval))
if !ok {
log.Warn("chain head has not updated. Restarting systemd service")
aCh <- nil
break
}
log.Info("chain head is healthy")
}
return nil
},
}

/*
* reads channel of slices of Cids
* compares slices of Cids when len is greater or equal to `t` - threshold
* if all slices are equal, head has not updated and returns false
*/
func checkWindow(ch chan CidWindow, t int) bool {
select {
case window := <-ch:
var dup int
windowLen := len(window)
if windowLen >= t {
cidWindow:
for i, cids := range window {
next := windowLen - 1 - i
// if array length is different, head is changing
if next >= 1 && len(window[next]) != len(window[next-1]) {
break cidWindow
}
// if cids are different, head is changing
for j := range cids {
if next >= 1 && window[next][j] != window[next-1][j] {
break cidWindow
}
}
if i < (t - 1) {
dup++
}
}

if dup == (t - 1) {
return false
}
}
return true
}
}

/*
* reads channel of slices of slices of Cids
* compares Cids when len of window is greater or equal to `t` - threshold
* if all slices are the equal, head has not updated and returns false
*/
func updateWindow(ctx context.Context, a api.FullNode, w CidWindow, t int, ch chan CidWindow) (CidWindow, error) {
head, err := a.ChainHead(ctx)
if err != nil {
return nil, err
}

window := appendCIDsToWindow(w, head.Cids(), t)
ch <- window
return window, nil
}

/*
* appends slice of Cids to window slice
* keeps a fixed window slice size, dropping older slices
* returns new window
*/
func appendCIDsToWindow(w CidWindow, c []cid.Cid, t int) CidWindow {
offset := len(w) - t + 1
if offset >= 0 {
return append(w[offset:], c)
}
return append(w, c)
}

/*
* initialize and return lotus api
*/
func getAPI(path string) (string, http.Header, error) {
r, err := repo.NewFS(path)
if err != nil {
return "", nil, err
}

ma, err := r.APIEndpoint()
if err != nil {
return "", nil, xerrors.Errorf("failed to get api endpoint: %w", err)
}
_, addr, err := manet.DialArgs(ma)
if err != nil {
return "", nil, err
}
var headers http.Header
token, err := r.APIToken()
if err != nil {
log.Warn("Couldn't load CLI token, capabilities may be limited: %w", err)
} else {
headers = http.Header{}
headers.Add("Authorization", "Bearer "+string(token))
}

return "ws://" + addr + "/rpc/v0", headers, nil
}

func GetFullNodeAPI(repo string) (api.FullNode, jsonrpc.ClientCloser, error) {
addr, headers, err := getAPI(repo)
if err != nil {
return nil, nil, err
}

return client.NewFullNodeRPC(addr, headers)
}

/*
* wait for node to sync
*/
func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(3 * time.Second):
head, err := napi.ChainHead(ctx)
if err != nil {
return err
}

if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay {
return nil
}
}
}
}
84 changes: 84 additions & 0 deletions cmd/lotus-health/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package main

import (
"testing"

cid "github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
"github.com/stretchr/testify/assert"
)

func TestAppendCIDsToWindow(t *testing.T) {
assert := assert.New(t)
var window CidWindow
threshold := 3
cid0 := makeCID("0")
cid1 := makeCID("1")
cid2 := makeCID("2")
cid3 := makeCID("3")
window = appendCIDsToWindow(window, []cid.Cid{cid0}, threshold)
window = appendCIDsToWindow(window, []cid.Cid{cid1}, threshold)
window = appendCIDsToWindow(window, []cid.Cid{cid2}, threshold)
window = appendCIDsToWindow(window, []cid.Cid{cid3}, threshold)
assert.Len(window, 3)
assert.Equal(window[0][0], cid1)
assert.Equal(window[1][0], cid2)
assert.Equal(window[2][0], cid3)
}

func TestCheckWindow(t *testing.T) {
assert := assert.New(t)
ch := make(chan CidWindow, 1)
och := make(chan bool, 1)
threshold := 3
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow CidWindow
healthyHeadCheckWindow = appendCIDsToWindow(healthyHeadCheckWindow, []cid.Cid{
makeCID("abcd"),
}, threshold)
healthyHeadCheckWindow = appendCIDsToWindow(healthyHeadCheckWindow, []cid.Cid{
makeCID("bbcd"),
makeCID("bbfe"),
}, threshold)
healthyHeadCheckWindow = appendCIDsToWindow(healthyHeadCheckWindow, []cid.Cid{
makeCID("bbcd"),
makeCID("bbfe"),
}, threshold)
ch <- healthyHeadCheckWindow
select {
case ok := <-och:
assert.True(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var unhealthyHeadCheckWindow CidWindow
unhealthyHeadCheckWindow = appendCIDsToWindow(unhealthyHeadCheckWindow, []cid.Cid{
makeCID("abcd"),
makeCID("fbcd"),
}, threshold)
unhealthyHeadCheckWindow = appendCIDsToWindow(unhealthyHeadCheckWindow, []cid.Cid{
makeCID("abcd"),
makeCID("fbcd"),
}, threshold)
unhealthyHeadCheckWindow = appendCIDsToWindow(unhealthyHeadCheckWindow, []cid.Cid{
makeCID("abcd"),
makeCID("fbcd"),
}, threshold)
ch <- unhealthyHeadCheckWindow
select {
case ok := <-och:
assert.False(ok)
}

}

func makeCID(s string) cid.Cid {
h1, err := mh.Sum([]byte(s), mh.SHA2_256, -1)
if err != nil {
log.Fatal(err)
}
return cid.NewCidV1(0x55, h1)
}
24 changes: 24 additions & 0 deletions cmd/lotus-health/systemd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"github.com/coreos/go-systemd/dbus"
)

func alertHandler(n string, ch chan interface{}) (string, error) {
select {
case <-ch:
statusCh := make(chan string, 1)
c, err := dbus.New()
if err != nil {
return "", err
}
_, err = c.TryRestartUnit(n, "fail", statusCh)
if err != nil {
return "", err
}
select {
case result := <-statusCh:
return result, nil
}
}
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/GeertJohan/go.rice v1.0.0
github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/coreos/go-systemd v0.0.0-00010101000000-000000000000
github.com/docker/go-units v0.4.0
github.com/filecoin-project/chain-validation v0.0.3
github.com/filecoin-project/filecoin-ffi v0.0.0-20191219131535-bb699517a590
Expand Down Expand Up @@ -109,3 +110,5 @@ require (
replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v1.18.0

replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi

replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0
Loading

0 comments on commit d8d8ce7

Please sign in to comment.