diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go index 34e4492b93a..ef4ef983431 100644 --- a/pkg/apisix/cluster.go +++ b/pkg/apisix/cluster.go @@ -35,6 +35,7 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" "github.com/apache/apisix-ingress-controller/pkg/log" + "github.com/apache/apisix-ingress-controller/pkg/metrics" "github.com/apache/apisix-ingress-controller/pkg/types" ) @@ -80,23 +81,24 @@ type ClusterOptions struct { } type cluster struct { - name string - baseURL string - baseURLHost string - adminKey string - cli *http.Client - cacheState int32 - cache cache.Cache - cacheSynced chan struct{} - cacheSyncErr error - route Route - upstream Upstream - ssl SSL - streamRoute StreamRoute - globalRules GlobalRule - consumer Consumer - plugin Plugin - schema Schema + name string + baseURL string + baseURLHost string + adminKey string + cli *http.Client + cacheState int32 + cache cache.Cache + cacheSynced chan struct{} + cacheSyncErr error + route Route + upstream Upstream + ssl SSL + streamRoute StreamRoute + globalRules GlobalRule + consumer Consumer + plugin Plugin + schema Schema + metricsCollector metrics.Collector } func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) { @@ -125,8 +127,9 @@ func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) { Timeout: o.Timeout, Transport: _defaultTransport, }, - cacheState: _cacheSyncing, // default state - cacheSynced: make(chan struct{}), + cacheState: _cacheSyncing, // default state + cacheSynced: make(chan struct{}), + metricsCollector: metrics.NewPrometheusCollector(), } c.route = newRouteClient(c) c.upstream = newUpstreamClient(c) @@ -322,6 +325,7 @@ func (c *cluster) syncSchema(ctx context.Context, interval time.Duration) { for { if err := c.syncSchemaOnce(ctx); err != nil { log.Warnf("failed to sync schema: %s", err) + c.metricsCollector.IncrSyncOperation("schema", "failure") } select { @@ -379,6 +383,7 @@ func (c *cluster) syncSchemaOnce(ctx context.Context) error { continue } } + c.metricsCollector.IncrSyncOperation("schema", "success") return nil } diff --git a/pkg/ingress/apisix_consumer.go b/pkg/ingress/apisix_consumer.go index f54aad40385..0cf28ba5cf0 100644 --- a/pkg/ingress/apisix_consumer.go +++ b/pkg/ingress/apisix_consumer.go @@ -131,9 +131,11 @@ func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) er ) c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err) c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse) + c.controller.metricsCollector.IncrSyncOperation("consumer", "failure") return err } + c.controller.metricsCollector.IncrSyncOperation("consumer", "success") c.controller.recorderEvent(ac, corev1.EventTypeNormal, _resourceSynced, nil) return nil } diff --git a/pkg/ingress/apisix_tls.go b/pkg/ingress/apisix_tls.go index 2ba036d59cd..79c94786fa2 100644 --- a/pkg/ingress/apisix_tls.go +++ b/pkg/ingress/apisix_tls.go @@ -146,7 +146,6 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error { c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse) return err } - c.controller.recorderEvent(tls, corev1.EventTypeNormal, _resourceSynced, nil) c.controller.recordStatus(tls, _resourceSynced, nil, metav1.ConditionTrue) return err @@ -173,6 +172,7 @@ func (c *apisixTlsController) syncSecretSSL(secretKey string, apisixTlsKey strin func (c *apisixTlsController) handleSyncErr(obj interface{}, err error) { if err == nil { c.workqueue.Forget(obj) + c.controller.metricsCollector.IncrSyncOperation("ssl", "success") return } log.Warnw("sync ApisixTls failed, will retry", @@ -180,6 +180,7 @@ func (c *apisixTlsController) handleSyncErr(obj interface{}, err error) { zap.Error(err), ) c.workqueue.AddRateLimited(obj) + c.controller.metricsCollector.IncrSyncOperation("ssl", "failure") } func (c *apisixTlsController) onAdd(obj interface{}) { diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go index 4d59b52dc15..2af856d8bf9 100644 --- a/pkg/ingress/controller.go +++ b/pkg/ingress/controller.go @@ -166,7 +166,7 @@ func NewController(cfg *config.Config) (*Controller, error) { cfg: cfg, apiServer: apiSrv, apisix: client, - metricsCollector: metrics.NewPrometheusCollector(podName, podNamespace), + metricsCollector: metrics.NewPrometheusCollector(), kubeClient: kubeClient, watchingNamespace: watchingNamespace, secretSSLMap: new(sync.Map), @@ -617,5 +617,6 @@ func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context. return } log.Debugf("success check health for default cluster") + c.metricsCollector.IncrCheckClusterHealth(c.name) } } diff --git a/pkg/ingress/endpoint.go b/pkg/ingress/endpoint.go index c2eb236b8c0..53889f812dc 100644 --- a/pkg/ingress/endpoint.go +++ b/pkg/ingress/endpoint.go @@ -90,12 +90,14 @@ func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error { func (c *endpointsController) handleSyncErr(obj interface{}, err error) { if err == nil { c.workqueue.Forget(obj) + c.controller.metricsCollector.IncrSyncOperation("endpoint", "success") return } log.Warnw("sync endpoints failed, will retry", zap.Any("object", obj), ) c.workqueue.AddRateLimited(obj) + c.controller.metricsCollector.IncrSyncOperation("endpoint", "failure") } func (c *endpointsController) onAdd(obj interface{}) { diff --git a/pkg/ingress/endpointslice.go b/pkg/ingress/endpointslice.go index c8eaa059ee0..691e4ed2fb9 100644 --- a/pkg/ingress/endpointslice.go +++ b/pkg/ingress/endpointslice.go @@ -109,12 +109,14 @@ func (c *endpointSliceController) sync(ctx context.Context, ev *types.Event) err func (c *endpointSliceController) handleSyncErr(obj interface{}, err error) { if err == nil { c.workqueue.Forget(obj) + c.controller.metricsCollector.IncrSyncOperation("endpointSlices", "success") return } log.Warnw("sync endpointSlice failed, will retry", zap.Any("object", obj), ) c.workqueue.AddRateLimited(obj) + c.controller.metricsCollector.IncrSyncOperation("endpointSlices", "failure") } func (c *endpointSliceController) onAdd(obj interface{}) { diff --git a/pkg/ingress/secret.go b/pkg/ingress/secret.go index 2d8e7638e86..700bdd77286 100644 --- a/pkg/ingress/secret.go +++ b/pkg/ingress/secret.go @@ -215,6 +215,7 @@ func (c *secretController) sync(ctx context.Context, ev *types.Event) error { func (c *secretController) handleSyncErr(obj interface{}, err error) { if err == nil { c.workqueue.Forget(obj) + c.controller.metricsCollector.IncrSyncOperation("secret", "success") return } log.Warnw("sync ApisixTls failed, will retry", @@ -222,6 +223,7 @@ func (c *secretController) handleSyncErr(obj interface{}, err error) { zap.Error(err), ) c.workqueue.AddRateLimited(obj) + c.controller.metricsCollector.IncrSyncOperation("secret", "failure") } func (c *secretController) onAdd(obj interface{}) { diff --git a/pkg/metrics/prometheus.go b/pkg/metrics/prometheus.go index 5f3fbfc5c74..933a7667565 100644 --- a/pkg/metrics/prometheus.go +++ b/pkg/metrics/prometheus.go @@ -15,6 +15,7 @@ package metrics import ( + "os" "strconv" "time" @@ -37,20 +38,33 @@ type Collector interface { RecordAPISIXLatency(time.Duration) // IncrAPISIXRequest increases the number of requests to apisix. IncrAPISIXRequest(string) + // increases the number of cluster health check operations with the controller + // name label + IncrCheckClusterHealth(string) + // IncrSyncOperation increases the number of sync operations with the resource + // type label + IncrSyncOperation(string, string) } // collector contains necessary messages to collect Prometheus metrics. type collector struct { - isLeader prometheus.Gauge - apisixLatency prometheus.Summary - apisixRequests *prometheus.CounterVec - apisixCodes *prometheus.GaugeVec + isLeader prometheus.Gauge + apisixLatency prometheus.Summary + apisixRequests *prometheus.CounterVec + apisixCodes *prometheus.GaugeVec + checkClusterHealth *prometheus.CounterVec + syncOperation *prometheus.CounterVec } // NewPrometheusCollectors creates the Prometheus metrics collector. // It also registers all internal metric collector to prometheus, // so do not call this function duplicately. -func NewPrometheusCollector(podName, podNamespace string) Collector { +func NewPrometheusCollector() Collector { + podName := os.Getenv("POD_NAME") + podNamespace := os.Getenv("POD_NAMESPACE") + if podNamespace == "" { + podNamespace = "default" + } constLabels := prometheus.Labels{ "controller_pod": podName, "controller_namespace": podNamespace, @@ -91,6 +105,24 @@ func NewPrometheusCollector(podName, podNamespace string) Collector { }, []string{"resource"}, ), + checkClusterHealth: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: _namespace, + Name: "check_cluster_health_total", + Help: "Number of cluster health check operations", + ConstLabels: constLabels, + }, + []string{"name"}, + ), + syncOperation: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: _namespace, + Name: "sync_operation_total", + Help: "Number of sync operations", + ConstLabels: constLabels, + }, + []string{"resource", "result"}, + ), } // Since we use the DefaultRegisterer, in test cases, the metrics @@ -99,12 +131,16 @@ func NewPrometheusCollector(podName, podNamespace string) Collector { prometheus.Unregister(collector.apisixCodes) prometheus.Unregister(collector.apisixLatency) prometheus.Unregister(collector.apisixRequests) + prometheus.Unregister(collector.checkClusterHealth) + prometheus.Unregister(collector.syncOperation) prometheus.MustRegister( collector.isLeader, collector.apisixCodes, collector.apisixLatency, collector.apisixRequests, + collector.checkClusterHealth, + collector.syncOperation, ) return collector @@ -140,6 +176,20 @@ func (c *collector) IncrAPISIXRequest(resource string) { c.apisixRequests.WithLabelValues(resource).Inc() } +// IncrCheckClusterHealth increases the number of cluster health check +func (c *collector) IncrCheckClusterHealth(name string) { + c.checkClusterHealth.WithLabelValues(name).Inc() +} + +// IncrSyncOperation increases the number of sync operations for specific +// resource (e.g. Route, Upstream and etc). +func (c *collector) IncrSyncOperation(resource, result string) { + c.syncOperation.With(prometheus.Labels{ + "resource": resource, + "result": result, + }).Inc() +} + // Collect collects the prometheus.Collect. func (c *collector) Collect(ch chan<- prometheus.Metric) { c.isLeader.Collect(ch) @@ -147,6 +197,8 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { c.apisixRequests.Collect(ch) c.apisixLatency.Collect(ch) c.apisixCodes.Collect(ch) + c.checkClusterHealth.Collect(ch) + c.syncOperation.Collect(ch) } // Describe describes the prometheus.Describe. @@ -156,4 +208,6 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) { c.apisixRequests.Describe(ch) c.apisixLatency.Describe(ch) c.apisixCodes.Describe(ch) + c.checkClusterHealth.Describe(ch) + c.syncOperation.Describe(ch) } diff --git a/pkg/metrics/prometheus_test.go b/pkg/metrics/prometheus_test.go index b8d41922e6c..e62cdd9d956 100644 --- a/pkg/metrics/prometheus_test.go +++ b/pkg/metrics/prometheus_test.go @@ -30,11 +30,12 @@ func apisixBadStatusCodesTestHandler(t *testing.T, metrics []*io_prometheus_clie assert.Equal(t, metric.Type.String(), "GAUGE") m := metric.GetMetric() assert.Len(t, m, 2) + assert.Equal(t, *m[0].Gauge.Value, float64(1)) assert.Equal(t, *m[0].Label[0].Name, "controller_namespace") assert.Equal(t, *m[0].Label[0].Value, "default") assert.Equal(t, *m[0].Label[1].Name, "controller_pod") - assert.Equal(t, *m[0].Label[1].Value, "test") + assert.Equal(t, *m[0].Label[1].Value, "") assert.Equal(t, *m[0].Label[2].Name, "resource") assert.Equal(t, *m[0].Label[2].Value, "route") assert.Equal(t, *m[0].Label[3].Name, "status_code") @@ -44,7 +45,7 @@ func apisixBadStatusCodesTestHandler(t *testing.T, metrics []*io_prometheus_clie assert.Equal(t, *m[1].Label[0].Name, "controller_namespace") assert.Equal(t, *m[1].Label[0].Value, "default") assert.Equal(t, *m[1].Label[1].Name, "controller_pod") - assert.Equal(t, *m[1].Label[1].Value, "test") + assert.Equal(t, *m[1].Label[1].Value, "") assert.Equal(t, *m[1].Label[2].Name, "resource") assert.Equal(t, *m[1].Label[2].Value, "upstream") assert.Equal(t, *m[1].Label[3].Name, "status_code") @@ -64,7 +65,7 @@ func isLeaderTestHandler(t *testing.T, metrics []*io_prometheus_client.MetricFam assert.Equal(t, *m[0].Label[0].Name, "controller_namespace") assert.Equal(t, *m[0].Label[0].Value, "default") assert.Equal(t, *m[0].Label[1].Name, "controller_pod") - assert.Equal(t, *m[0].Label[1].Value, "test") + assert.Equal(t, *m[0].Label[1].Value, "") } } @@ -81,7 +82,7 @@ func apisixLatencyTestHandler(t *testing.T, metrics []*io_prometheus_client.Metr assert.Equal(t, *m[0].Label[0].Name, "controller_namespace") assert.Equal(t, *m[0].Label[0].Value, "default") assert.Equal(t, *m[0].Label[1].Name, "controller_pod") - assert.Equal(t, *m[0].Label[1].Value, "test") + assert.Equal(t, *m[0].Label[1].Value, "") } } @@ -97,7 +98,7 @@ func apisixRequestTestHandler(t *testing.T, metrics []*io_prometheus_client.Metr assert.Equal(t, *m[0].Label[0].Name, "controller_namespace") assert.Equal(t, *m[0].Label[0].Value, "default") assert.Equal(t, *m[0].Label[1].Name, "controller_pod") - assert.Equal(t, *m[0].Label[1].Value, "test") + assert.Equal(t, *m[0].Label[1].Value, "") assert.Equal(t, *m[0].Label[2].Name, "resource") assert.Equal(t, *m[0].Label[2].Value, "route") @@ -105,14 +106,62 @@ func apisixRequestTestHandler(t *testing.T, metrics []*io_prometheus_client.Metr assert.Equal(t, *m[1].Label[0].Name, "controller_namespace") assert.Equal(t, *m[1].Label[0].Value, "default") assert.Equal(t, *m[1].Label[1].Name, "controller_pod") - assert.Equal(t, *m[1].Label[1].Value, "test") + assert.Equal(t, *m[1].Label[1].Value, "") assert.Equal(t, *m[1].Label[2].Name, "resource") assert.Equal(t, *m[1].Label[2].Value, "upstream") } } +func checkClusterHealthTestHandler(t *testing.T, metrics []*io_prometheus_client.MetricFamily) func(t *testing.T) { + return func(t *testing.T) { + metric := findMetric("apisix_ingress_controller_check_cluster_health_total", metrics) + assert.NotNil(t, metric) + assert.Equal(t, metric.Type.String(), "COUNTER") + m := metric.GetMetric() + assert.Len(t, m, 1) + + assert.Equal(t, *m[0].Counter.Value, float64(1)) + assert.Equal(t, *m[0].Label[0].Name, "controller_namespace") + assert.Equal(t, *m[0].Label[0].Value, "default") + assert.Equal(t, *m[0].Label[1].Name, "controller_pod") + assert.Equal(t, *m[0].Label[1].Value, "") + assert.Equal(t, *m[0].Label[2].Name, "name") + assert.Equal(t, *m[0].Label[2].Value, "test") + } +} + +func syncOperationTestHandler(t *testing.T, metrics []*io_prometheus_client.MetricFamily) func(t *testing.T) { + return func(t *testing.T) { + metric := findMetric("apisix_ingress_controller_sync_operation_total", metrics) + assert.NotNil(t, metric) + assert.Equal(t, metric.Type.String(), "COUNTER") + m := metric.GetMetric() + assert.Len(t, m, 2) + + assert.Equal(t, *m[0].Counter.Value, float64(1)) + assert.Equal(t, *m[0].Label[0].Name, "controller_namespace") + assert.Equal(t, *m[0].Label[0].Value, "default") + assert.Equal(t, *m[0].Label[1].Name, "controller_pod") + assert.Equal(t, *m[0].Label[1].Value, "") + assert.Equal(t, *m[0].Label[2].Name, "resource") + assert.Equal(t, *m[0].Label[2].Value, "endpoint") + assert.Equal(t, *m[0].Label[3].Name, "result") + assert.Equal(t, *m[0].Label[3].Value, "success") + + assert.Equal(t, *m[1].Counter.Value, float64(1)) + assert.Equal(t, *m[1].Label[0].Name, "controller_namespace") + assert.Equal(t, *m[1].Label[0].Value, "default") + assert.Equal(t, *m[1].Label[1].Name, "controller_pod") + assert.Equal(t, *m[1].Label[1].Value, "") + assert.Equal(t, *m[1].Label[2].Name, "resource") + assert.Equal(t, *m[1].Label[2].Value, "schema") + assert.Equal(t, *m[1].Label[3].Name, "result") + assert.Equal(t, *m[1].Label[3].Value, "failure") + } +} + func TestPrometheusCollector(t *testing.T) { - c := NewPrometheusCollector("test", "default") + c := NewPrometheusCollector() c.ResetLeader(true) c.RecordAPISIXCode(404, "route") c.RecordAPISIXCode(500, "upstream") @@ -120,6 +169,9 @@ func TestPrometheusCollector(t *testing.T) { c.IncrAPISIXRequest("route") c.IncrAPISIXRequest("route") c.IncrAPISIXRequest("upstream") + c.IncrCheckClusterHealth("test") + c.IncrSyncOperation("schema", "failure") + c.IncrSyncOperation("endpoint", "success") metrics, err := prometheus.DefaultGatherer.Gather() assert.Nil(t, err) @@ -128,6 +180,8 @@ func TestPrometheusCollector(t *testing.T) { t.Run("is_leader", isLeaderTestHandler(t, metrics)) t.Run("apisix_request_latencies", apisixLatencyTestHandler(t, metrics)) t.Run("apisix_requests", apisixRequestTestHandler(t, metrics)) + t.Run("check_cluster_health_total", checkClusterHealthTestHandler(t, metrics)) + t.Run("sync_operation_total", syncOperationTestHandler(t, metrics)) } func findMetric(name string, metrics []*io_prometheus_client.MetricFamily) *io_prometheus_client.MetricFamily {