Skip to content

Commit

Permalink
add metric: check_cluster_health and sync_operation_total
Browse files Browse the repository at this point in the history
  • Loading branch information
Sindweller committed Aug 18, 2021
1 parent f78248a commit 66a4bfe
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 33 deletions.
43 changes: 24 additions & 19 deletions pkg/apisix/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/metrics"
"github.com/apache/apisix-ingress-controller/pkg/types"
)

Expand Down Expand Up @@ -80,23 +81,24 @@ type ClusterOptions struct {
}

type cluster struct {
name string
baseURL string
baseURLHost string
adminKey string
cli *http.Client
cacheState int32
cache cache.Cache
cacheSynced chan struct{}
cacheSyncErr error
route Route
upstream Upstream
ssl SSL
streamRoute StreamRoute
globalRules GlobalRule
consumer Consumer
plugin Plugin
schema Schema
name string
baseURL string
baseURLHost string
adminKey string
cli *http.Client
cacheState int32
cache cache.Cache
cacheSynced chan struct{}
cacheSyncErr error
route Route
upstream Upstream
ssl SSL
streamRoute StreamRoute
globalRules GlobalRule
consumer Consumer
plugin Plugin
schema Schema
metricsCollector metrics.Collector
}

func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
Expand Down Expand Up @@ -125,8 +127,9 @@ func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
Timeout: o.Timeout,
Transport: _defaultTransport,
},
cacheState: _cacheSyncing, // default state
cacheSynced: make(chan struct{}),
cacheState: _cacheSyncing, // default state
cacheSynced: make(chan struct{}),
metricsCollector: metrics.NewPrometheusCollector(),
}
c.route = newRouteClient(c)
c.upstream = newUpstreamClient(c)
Expand Down Expand Up @@ -322,6 +325,7 @@ func (c *cluster) syncSchema(ctx context.Context, interval time.Duration) {
for {
if err := c.syncSchemaOnce(ctx); err != nil {
log.Warnf("failed to sync schema: %s", err)
c.metricsCollector.IncrSyncOperation("schema", "failure")
}

select {
Expand Down Expand Up @@ -379,6 +383,7 @@ func (c *cluster) syncSchemaOnce(ctx context.Context) error {
continue
}
}
c.metricsCollector.IncrSyncOperation("schema", "success")
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/ingress/apisix_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,11 @@ func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) er
)
c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.metricsCollector.IncrSyncOperation("consumer", "failure")
return err
}

c.controller.metricsCollector.IncrSyncOperation("consumer", "success")
c.controller.recorderEvent(ac, corev1.EventTypeNormal, _resourceSynced, nil)
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/apisix_tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}

c.controller.recorderEvent(tls, corev1.EventTypeNormal, _resourceSynced, nil)
c.controller.recordStatus(tls, _resourceSynced, nil, metav1.ConditionTrue)
return err
Expand All @@ -173,13 +172,15 @@ func (c *apisixTlsController) syncSecretSSL(secretKey string, apisixTlsKey strin
func (c *apisixTlsController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
c.controller.metricsCollector.IncrSyncOperation("ssl", "success")
return
}
log.Warnw("sync ApisixTls failed, will retry",
zap.Any("object", obj),
zap.Error(err),
)
c.workqueue.AddRateLimited(obj)
c.controller.metricsCollector.IncrSyncOperation("ssl", "failure")
}

func (c *apisixTlsController) onAdd(obj interface{}) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func NewController(cfg *config.Config) (*Controller, error) {
cfg: cfg,
apiServer: apiSrv,
apisix: client,
metricsCollector: metrics.NewPrometheusCollector(podName, podNamespace),
metricsCollector: metrics.NewPrometheusCollector(),
kubeClient: kubeClient,
watchingNamespace: watchingNamespace,
secretSSLMap: new(sync.Map),
Expand Down Expand Up @@ -617,5 +617,6 @@ func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.
return
}
log.Debugf("success check health for default cluster")
c.metricsCollector.IncrCheckClusterHealth(c.name)
}
}
2 changes: 2 additions & 0 deletions pkg/ingress/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@ func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {
func (c *endpointsController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
c.controller.metricsCollector.IncrSyncOperation("endpoint", "success")
return
}
log.Warnw("sync endpoints failed, will retry",
zap.Any("object", obj),
)
c.workqueue.AddRateLimited(obj)
c.controller.metricsCollector.IncrSyncOperation("endpoint", "failure")
}

func (c *endpointsController) onAdd(obj interface{}) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ingress/endpointslice.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,14 @@ func (c *endpointSliceController) sync(ctx context.Context, ev *types.Event) err
func (c *endpointSliceController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
c.controller.metricsCollector.IncrSyncOperation("endpointSlices", "success")
return
}
log.Warnw("sync endpointSlice failed, will retry",
zap.Any("object", obj),
)
c.workqueue.AddRateLimited(obj)
c.controller.metricsCollector.IncrSyncOperation("endpointSlices", "failure")
}

func (c *endpointSliceController) onAdd(obj interface{}) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ingress/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,15 @@ func (c *secretController) sync(ctx context.Context, ev *types.Event) error {
func (c *secretController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
c.controller.metricsCollector.IncrSyncOperation("secret", "success")
return
}
log.Warnw("sync ApisixTls failed, will retry",
zap.Any("object", obj),
zap.Error(err),
)
c.workqueue.AddRateLimited(obj)
c.controller.metricsCollector.IncrSyncOperation("secret", "failure")
}

func (c *secretController) onAdd(obj interface{}) {
Expand Down
64 changes: 59 additions & 5 deletions pkg/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package metrics

import (
"os"
"strconv"
"time"

Expand All @@ -37,20 +38,33 @@ type Collector interface {
RecordAPISIXLatency(time.Duration)
// IncrAPISIXRequest increases the number of requests to apisix.
IncrAPISIXRequest(string)
// increases the number of cluster health check operations with the controller
// name label
IncrCheckClusterHealth(string)
// IncrSyncOperation increases the number of sync operations with the resource
// type label
IncrSyncOperation(string, string)
}

// collector contains necessary messages to collect Prometheus metrics.
type collector struct {
isLeader prometheus.Gauge
apisixLatency prometheus.Summary
apisixRequests *prometheus.CounterVec
apisixCodes *prometheus.GaugeVec
isLeader prometheus.Gauge
apisixLatency prometheus.Summary
apisixRequests *prometheus.CounterVec
apisixCodes *prometheus.GaugeVec
checkClusterHealth *prometheus.CounterVec
syncOperation *prometheus.CounterVec
}

// NewPrometheusCollectors creates the Prometheus metrics collector.
// It also registers all internal metric collector to prometheus,
// so do not call this function duplicately.
func NewPrometheusCollector(podName, podNamespace string) Collector {
func NewPrometheusCollector() Collector {
podName := os.Getenv("POD_NAME")
podNamespace := os.Getenv("POD_NAMESPACE")
if podNamespace == "" {
podNamespace = "default"
}
constLabels := prometheus.Labels{
"controller_pod": podName,
"controller_namespace": podNamespace,
Expand Down Expand Up @@ -91,6 +105,24 @@ func NewPrometheusCollector(podName, podNamespace string) Collector {
},
[]string{"resource"},
),
checkClusterHealth: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: _namespace,
Name: "check_cluster_health_total",
Help: "Number of cluster health check operations",
ConstLabels: constLabels,
},
[]string{"name"},
),
syncOperation: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: _namespace,
Name: "sync_operation_total",
Help: "Number of sync operations",
ConstLabels: constLabels,
},
[]string{"resource", "result"},
),
}

// Since we use the DefaultRegisterer, in test cases, the metrics
Expand All @@ -99,12 +131,16 @@ func NewPrometheusCollector(podName, podNamespace string) Collector {
prometheus.Unregister(collector.apisixCodes)
prometheus.Unregister(collector.apisixLatency)
prometheus.Unregister(collector.apisixRequests)
prometheus.Unregister(collector.checkClusterHealth)
prometheus.Unregister(collector.syncOperation)

prometheus.MustRegister(
collector.isLeader,
collector.apisixCodes,
collector.apisixLatency,
collector.apisixRequests,
collector.checkClusterHealth,
collector.syncOperation,
)

return collector
Expand Down Expand Up @@ -140,13 +176,29 @@ func (c *collector) IncrAPISIXRequest(resource string) {
c.apisixRequests.WithLabelValues(resource).Inc()
}

// IncrCheckClusterHealth increases the number of cluster health check
func (c *collector) IncrCheckClusterHealth(name string) {
c.checkClusterHealth.WithLabelValues(name).Inc()
}

// IncrSyncOperation increases the number of sync operations for specific
// resource (e.g. Route, Upstream and etc).
func (c *collector) IncrSyncOperation(resource, result string) {
c.syncOperation.With(prometheus.Labels{
"resource": resource,
"result": result,
}).Inc()
}

// Collect collects the prometheus.Collect.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
c.isLeader.Collect(ch)
c.apisixLatency.Collect(ch)
c.apisixRequests.Collect(ch)
c.apisixLatency.Collect(ch)
c.apisixCodes.Collect(ch)
c.checkClusterHealth.Collect(ch)
c.syncOperation.Collect(ch)
}

// Describe describes the prometheus.Describe.
Expand All @@ -156,4 +208,6 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
c.apisixRequests.Describe(ch)
c.apisixLatency.Describe(ch)
c.apisixCodes.Describe(ch)
c.checkClusterHealth.Describe(ch)
c.syncOperation.Describe(ch)
}
Loading

0 comments on commit 66a4bfe

Please sign in to comment.