Skip to content

Commit

Permalink
shovel: add prometheus metrics endpoint (#248)
Browse files Browse the repository at this point in the history
related: #238
  • Loading branch information
ryandotsmith committed Apr 13, 2024
1 parent 05ba00a commit 8e730e8
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 1 deletion.
1 change: 1 addition & 0 deletions cmd/shovel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", wh.Index)
mux.HandleFunc("/diag", wh.Diag)
mux.HandleFunc("/metrics", wh.Prom)
mux.HandleFunc("/login", wh.Login)
mux.Handle("/task-updates", wh.Authn(wh.Updates))
mux.Handle("/add-source", wh.Authn(wh.AddSource))
Expand Down
38 changes: 37 additions & 1 deletion indexsupply.com/shovel/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ Shovel's main thing is a task. Tasks are derived from Shovel's configuration. Sh
<hr>
# Monitoring
## Monitoring
Shovel provides an unauthenticated diagnostics JSON endpoint at: `/diag` which returns:
Expand Down Expand Up @@ -991,8 +991,44 @@ Shovel provides an unauthenticated diagnostics JSON endpoint at: `/diag` which r
This endpoint will iterate through all the [eth sources](#ethereum-sources) and query for the latest block on both the eth source and the `shovel.task_updates` table.
This endpoint is rate limited to 1 request per second.
Latency is measured in milliseconds.
### Prometheus
Shovel also exposes a `/metrics` endponit that prints the following Prometheus metrics:
```
# HELP shovel_latest_block_local last block processed
# TYPE shovel_latest_block_local gauge
shovel_latest_block_local{src="mainnet"} 19648035

# HELP shovel_pg_ping number of ms to make basic status query
# TYPE shovel_pg_ping gauge
shovel_pg_ping 0

# HELP shovel_pg_ping_error number of errors in making basic status query
# TYPE shovel_pg_ping_error gauge
shovel_pg_ping_error 0

# HELP shovel_latest_block_remote latest block height from rpc api
# TYPE shovel_latest_block_remote gauge
shovel_latest_block_remote{src="mainnet"} 19648035

# HELP shovel_rpc_ping number of ms to make a basic http request to rpc api
# TYPE shovel_rpc_ping gauge
shovel_rpc_ping{src="mainnet"} 127

# HELP shovel_rpc_ping_error number of errors in making basic rpc api request
# TYPE shovel_rpc_ping_error gauge
shovel_rpc_ping_error{src="mainnet"} 0
```
This endpoint will iterate through all the [eth sources](#ethereum-sources) and query for the latest block on both the eth source and the `shovel.task_updates` table. Each source will use a separate Prometheus label.
This endpoint is rate limited to 1 request per second.
<hr>
## Logging
Expand Down
100 changes: 100 additions & 0 deletions shovel/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -64,6 +65,10 @@ type Handler struct {

sess session.Config
password []byte

// global rate limit for diag requests
diagLastReqMut sync.Mutex
diagLastReq time.Time
}

func New(mgr *shovel.Manager, conf *config.Root, pgp *pgxpool.Pool) *Handler {
Expand Down Expand Up @@ -169,7 +174,102 @@ type DiagResult struct {
PGError string `json:"pg_error"`
}

func (h *Handler) Prom(w http.ResponseWriter, r *http.Request) {
h.diagLastReqMut.Lock()
if time.Since(h.diagLastReq) < time.Second {
h.diagLastReqMut.Unlock()
slog.InfoContext(r.Context(), "rate limiting metrics")
fmt.Fprintf(w, "too many diag requests")
return
}
h.diagLastReq = time.Now()
h.diagLastReqMut.Unlock()

checkPG := func(srcName string) []string {
var (
res []string
start = time.Now()
latest uint64
nerr int
)
const q = `
select num
from shovel.task_updates
where src_name = $1
order by num desc
limit 1
`
err := h.pgp.QueryRow(r.Context(), q, srcName).Scan(&latest)
if err != nil {
nerr++
}
res = append(res, "# HELP shovel_latest_block_local last block processed")
res = append(res, "# TYPE shovel_latest_block_local gauge")
res = append(res, fmt.Sprintf(`shovel_latest_block_local{src="%s"} %d`, srcName, latest))

res = append(res, "# HELP shovel_pg_ping number of ms to make basic status query")
res = append(res, "# TYPE shovel_pg_ping gauge")
res = append(res, fmt.Sprintf(`shovel_pg_ping %d`, uint64(time.Since(start)/time.Millisecond)))

res = append(res, "# HELP shovel_pg_ping_error number of errors in making basic status query")
res = append(res, "# TYPE shovel_pg_ping_error gauge")
res = append(res, fmt.Sprintf(`shovel_pg_ping_error %d`, nerr))
return res
}
checkSrc := func(sname string, src shovel.Source) []string {
var (
start = time.Now()
res []string
nerr int
)
n, _, err := src.Latest(r.Context(), 0)
if err != nil {
nerr++
}

res = append(res, "# HELP shovel_latest_block_remote latest block height from rpc api")
res = append(res, "# TYPE shovel_latest_block_remote gauge")
res = append(res, fmt.Sprintf(`shovel_latest_block_remote{src="%s"} %d`, sname, n))

res = append(res, "# HELP shovel_rpc_ping number of ms to make a basic http request to rpc api")
res = append(res, "# TYPE shovel_rpc_ping gauge")
res = append(res, fmt.Sprintf(`shovel_rpc_ping{src="%s"} %d`, sname, uint64(time.Since(start)/time.Millisecond)))

res = append(res, "# HELP shovel_rpc_ping_error number of errors in making basic rpc api request")
res = append(res, "# TYPE shovel_rpc_ping_error gauge")
res = append(res, fmt.Sprintf(`shovel_rpc_ping_error{src="%s"} %d`, sname, nerr))
return res
}

scs, err := h.conf.AllSources(r.Context(), h.pgp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var res []string
for _, sc := range scs {
src := jrpc2.New(sc.URL)
for _, line := range checkPG(sc.Name) {
res = append(res, line)
}
for _, line := range checkSrc(sc.Name, src) {
res = append(res, line)
}
}
fmt.Fprintf(w, strings.Join(res, "\n"))
}

func (h *Handler) Diag(w http.ResponseWriter, r *http.Request) {
h.diagLastReqMut.Lock()
if time.Since(h.diagLastReq) < time.Second {
h.diagLastReqMut.Unlock()
slog.InfoContext(r.Context(), "rate limiting metrics")
fmt.Fprintf(w, "too many diag requests")
return
}
h.diagLastReq = time.Now()
h.diagLastReqMut.Unlock()

var (
res []DiagResult
ctx = r.Context()
Expand Down

0 comments on commit 8e730e8

Please sign in to comment.