Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 29 additions & 25 deletions apiserver/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,33 @@ import (
"garm/apiserver/params"
"garm/auth"
gErrors "garm/errors"
"garm/metrics"
runnerParams "garm/params"
"garm/runner"
"garm/util"
wsWriter "garm/websocket"

"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)

func NewAPIController(r *runner.Runner, auth *auth.Authenticator, hub *wsWriter.Hub, controllerInfo runnerParams.ControllerInfo) (*APIController, error) {
func NewAPIController(r *runner.Runner, authenticator *auth.Authenticator, hub *wsWriter.Hub) (*APIController, error) {
return &APIController{
r: r,
auth: auth,
auth: authenticator,
hub: hub,
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 16384,
},
controllerInfo: controllerInfo,
}, nil
}

type APIController struct {
r *runner.Runner
auth *auth.Authenticator
hub *wsWriter.Hub
upgrader websocket.Upgrader
controllerInfo runnerParams.ControllerInfo
r *runner.Runner
auth *auth.Authenticator
hub *wsWriter.Hub
upgrader websocket.Upgrader
}

func handleError(w http.ResponseWriter, err error) {
Expand Down Expand Up @@ -89,18 +87,16 @@ func handleError(w http.ResponseWriter, err error) {
}
}

// metric to count total webhooks received
// at this point the webhook is not yet authenticated and
// we don't know if it's meant for us or not
var webhooksReceived = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "garm_webhooks_received",
Help: "The total number of webhooks received",
}, []string{"valid", "reason", "hostname", "controller_id"})

func init() {
err := prometheus.Register(webhooksReceived)
func (a *APIController) webhookMetricLabelValues(valid, reason string) []string {
controllerInfo, err := a.r.GetControllerInfo(auth.GetAdminContext())
if err != nil {
log.Printf("error registering prometheus metric: %q", err)
log.Printf("failed to get controller info: %s", err)
// If labels are empty, not attempt will be made to record webhook.
return []string{}
}
return []string{
valid, reason,
controllerInfo.Hostname, controllerInfo.ControllerID.String(),
}
}

Expand All @@ -115,23 +111,31 @@ func (a *APIController) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Re
signature := r.Header.Get("X-Hub-Signature-256")
hookType := r.Header.Get("X-Github-Hook-Installation-Target-Type")

controllerInfo := a.r.GetControllerInfo(r.Context())
var labelValues []string
defer func() {
if len(labelValues) == 0 {
return
}
if err := metrics.RecordWebhookWithLabels(labelValues...); err != nil {
log.Printf("failed to record metric: %s", err)
}
}()

if err := a.r.DispatchWorkflowJob(hookType, signature, body); err != nil {
if errors.Is(err, gErrors.ErrNotFound) {
webhooksReceived.WithLabelValues("false", "owner_unknown", controllerInfo.Hostname, controllerInfo.ControllerID.String()).Inc()
labelValues = a.webhookMetricLabelValues("false", "owner_unknown")
log.Printf("got not found error from DispatchWorkflowJob. webhook not meant for us?: %q", err)
return
} else if strings.Contains(err.Error(), "signature") { // TODO: check error type
webhooksReceived.WithLabelValues("false", "signature_invalid", controllerInfo.Hostname, controllerInfo.ControllerID.String()).Inc()
labelValues = a.webhookMetricLabelValues("false", "signature_invalid")
} else {
webhooksReceived.WithLabelValues("false", "unknown", controllerInfo.Hostname, controllerInfo.ControllerID.String()).Inc()
labelValues = a.webhookMetricLabelValues("false", "unknown")
}

handleError(w, err)
return
}
webhooksReceived.WithLabelValues("true", "", controllerInfo.Hostname, controllerInfo.ControllerID.String()).Inc()
labelValues = a.webhookMetricLabelValues("true", "")
}

func (a *APIController) CatchAll(w http.ResponseWriter, r *http.Request) {
Expand Down
26 changes: 15 additions & 11 deletions apiserver/routers/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,28 @@ import (

"garm/apiserver/controllers"
"garm/auth"
"garm/config"
"garm/util"
)

func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, cfg *config.Config, authMiddleware, initMiddleware, instanceMiddleware, metricsMiddlerware auth.Middleware) *mux.Router {
func WithMetricsRouter(parentRouter *mux.Router, disableAuth bool, metricsMiddlerware auth.Middleware) *mux.Router {
if parentRouter == nil {
return nil
}

metricsRouter := parentRouter.PathPrefix("/metrics").Subrouter()
if !disableAuth {
metricsRouter.Use(metricsMiddlerware.Middleware)
}
metricsRouter.Handle("/", promhttp.Handler()).Methods("GET", "OPTIONS")
metricsRouter.Handle("", promhttp.Handler()).Methods("GET", "OPTIONS")
return parentRouter
}

func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, authMiddleware, initMiddleware, instanceMiddleware auth.Middleware) *mux.Router {
router := mux.NewRouter()
logMiddleware := util.NewLoggingMiddleware(logWriter)
router.Use(logMiddleware)

if cfg.Metrics.Enable {
metricsRouter := router.PathPrefix("/metrics").Subrouter()
if !cfg.Metrics.DisableAuth {
metricsRouter.Use(metricsMiddlerware.Middleware)
}
metricsRouter.Handle("/", promhttp.Handler()).Methods("GET", "OPTIONS")
metricsRouter.Handle("", promhttp.Handler()).Methods("GET", "OPTIONS")
}

// Handles github webhooks
webhookRouter := router.PathPrefix("/webhooks").Subrouter()
webhookRouter.PathPrefix("/").Handler(http.HandlerFunc(han.CatchAll))
Expand Down
4 changes: 2 additions & 2 deletions cmd/garm-cli/cmd/repo_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func formatPools(pools []params.Pool) {
belongsTo = pool.EnterpriseName
level = "enterprise"
}
t.AppendRow(table.Row{pool.ID, pool.Image, pool.Flavor, strings.Join(tags, " "), belongsTo, level, pool.Enabled, pool.RunnerPrefix})
t.AppendRow(table.Row{pool.ID, pool.Image, pool.Flavor, strings.Join(tags, " "), belongsTo, level, pool.Enabled, pool.GetRunnerPrefix()})
t.AppendSeparator()
}
fmt.Println(t.Render())
Expand Down Expand Up @@ -319,7 +319,7 @@ func formatOnePool(pool params.Pool) {
t.AppendRow(table.Row{"Belongs to", belongsTo})
t.AppendRow(table.Row{"Level", level})
t.AppendRow(table.Row{"Enabled", pool.Enabled})
t.AppendRow(table.Row{"Runner Prefix", pool.RunnerPrefix})
t.AppendRow(table.Row{"Runner Prefix", pool.GetRunnerPrefix()})

if len(pool.Instances) > 0 {
for _, instance := range pool.Instances {
Expand Down
24 changes: 12 additions & 12 deletions cmd/garm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import (
"garm/config"
"garm/database"
"garm/database/common"
"garm/metrics"
"garm/runner"
"garm/util"
"garm/websocket"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)

var (
Expand Down Expand Up @@ -111,19 +111,13 @@ func main() {
log.Fatalf("failed to create controller: %+v", err)
}

controllerInfo, err := db.ControllerInfo()
if err != nil {
log.Fatal(err)
}

// If there are many repos/pools, this may take a long time.
// TODO: start pool managers in the background and log errors.
if err := runner.Start(); err != nil {
log.Fatal(err)
}

authenticator := auth.NewAuthenticator(cfg.JWTAuth, db)
controller, err := controllers.NewAPIController(runner, authenticator, hub, controllerInfo)
controller, err := controllers.NewAPIController(runner, authenticator, hub)
if err != nil {
log.Fatalf("failed to create controller: %+v", err)
}
Expand All @@ -147,12 +141,18 @@ func main() {
if err != nil {
log.Fatal(err)
}
err = prometheus.Register(controllers.NewGarmCollector(runner))
if err != nil {
log.Println("failed to register garm collector in prometheus", err)

router := routers.NewAPIRouter(controller, multiWriter, jwtMiddleware, initMiddleware, instanceMiddleware)

if cfg.Metrics.Enable {
log.Printf("registering prometheus metrics collectors")
if err := metrics.RegisterCollectors(runner); err != nil {
log.Fatal(err)
}
log.Printf("setting up metric routes")
router = routers.WithMetricsRouter(router, cfg.Metrics.DisableAuth, metricsMiddleware)
}

router := routers.NewAPIRouter(controller, multiWriter, cfg, jwtMiddleware, initMiddleware, instanceMiddleware, metricsMiddleware)
corsMw := mux.CORSMethodMiddleware(router)
router.Use(corsMw)

Expand Down
78 changes: 68 additions & 10 deletions apiserver/controllers/metrics.go → metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,69 @@
package controllers
package metrics

import (
"log"

"garm/auth"
"garm/params"
"garm/runner"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)

type GarmCollector struct {
healthMetric *prometheus.Desc
instanceMetric *prometheus.Desc
runner *runner.Runner
var webhooksReceived *prometheus.CounterVec = nil

// RecordWebhookWithLabels will increment a webhook metric identified by specific
// values. If metrics are disabled, this function is a noop.
func RecordWebhookWithLabels(lvs ...string) error {
if webhooksReceived == nil {
// not registered. Noop
return nil
}

counter, err := webhooksReceived.GetMetricWithLabelValues(lvs...)
if err != nil {
return errors.Wrap(err, "recording metric")
}
counter.Inc()
return nil
}

func NewGarmCollector(r *runner.Runner) *GarmCollector {
func RegisterCollectors(runner *runner.Runner) error {
if webhooksReceived != nil {
// Already registered.
return nil
}

garmCollector, err := NewGarmCollector(runner)
if err != nil {
return errors.Wrap(err, "getting collector")
}

if err := prometheus.Register(garmCollector); err != nil {
return errors.Wrap(err, "registering collector")
}

// metric to count total webhooks received
// at this point the webhook is not yet authenticated and
// we don't know if it's meant for us or not
webhooksReceived = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "garm_webhooks_received",
Help: "The total number of webhooks received",
}, []string{"valid", "reason", "hostname", "controller_id"})

err = prometheus.Register(webhooksReceived)
if err != nil {
return errors.Wrap(err, "registering webhooks recv counter")
}
return nil
}

func NewGarmCollector(r *runner.Runner) (*GarmCollector, error) {
controllerInfo, err := r.GetControllerInfo(auth.GetAdminContext())
if err != nil {
return nil, errors.Wrap(err, "fetching controller info")
}
return &GarmCollector{
runner: r,
instanceMetric: prometheus.NewDesc(
Expand All @@ -28,7 +76,15 @@ func NewGarmCollector(r *runner.Runner) *GarmCollector {
"Health of the runner",
[]string{"hostname", "controller_id"}, nil,
),
}
cachedControllerInfo: controllerInfo,
}, nil
}

type GarmCollector struct {
healthMetric *prometheus.Desc
instanceMetric *prometheus.Desc
runner *runner.Runner
cachedControllerInfo params.ControllerInfo
}

func (c *GarmCollector) Describe(ch chan<- *prometheus.Desc) {
Expand All @@ -37,8 +93,11 @@ func (c *GarmCollector) Describe(ch chan<- *prometheus.Desc) {
}

func (c *GarmCollector) Collect(ch chan<- prometheus.Metric) {
controllerInfo := c.runner.GetControllerInfo(auth.GetAdminContext())

controllerInfo, err := c.runner.GetControllerInfo(auth.GetAdminContext())
if err != nil {
log.Printf("failed to get controller info: %s", err)
return
}
c.CollectInstanceMetric(ch, controllerInfo.Hostname, controllerInfo.ControllerID.String())
c.CollectHealthMetric(ch, controllerInfo.Hostname, controllerInfo.ControllerID.String())
}
Expand All @@ -61,7 +120,6 @@ func (c *GarmCollector) CollectHealthMetric(ch chan<- prometheus.Metric, hostnam
// CollectInstanceMetric collects the metrics for the runner instances
// reflecting the statuses and the pool they belong to.
func (c *GarmCollector) CollectInstanceMetric(ch chan<- prometheus.Metric, hostname string, controllerID string) {

ctx := auth.GetAdminContext()

instances, err := c.runner.ListAllInstances(ctx)
Expand Down
8 changes: 1 addition & 7 deletions runner/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ import (
"garm/util"

"github.com/google/go-github/v48/github"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/teris-io/shortid"
)

var (
Expand Down Expand Up @@ -396,11 +394,7 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string) error {
return errors.Wrap(err, "fetching pool")
}

suffix, err := shortid.Generate()
if err != nil {
suffix = uuid.New().String()
}
name := fmt.Sprintf("%s-%s", pool.GetRunnerPrefix(), suffix)
name := fmt.Sprintf("%s-%s", pool.GetRunnerPrefix(), util.NewID())

createParams := params.CreateInstanceParams{
Name: name,
Expand Down
Loading