Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metric 'check_cluster_health_total' and 'sync_operation_total' #627

Merged
merged 1 commit into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions pkg/apisix/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/metrics"
"github.com/apache/apisix-ingress-controller/pkg/types"
)

Expand Down Expand Up @@ -80,23 +81,24 @@ type ClusterOptions struct {
}

type cluster struct {
name string
baseURL string
baseURLHost string
adminKey string
cli *http.Client
cacheState int32
cache cache.Cache
cacheSynced chan struct{}
cacheSyncErr error
route Route
upstream Upstream
ssl SSL
streamRoute StreamRoute
globalRules GlobalRule
consumer Consumer
plugin Plugin
schema Schema
name string
baseURL string
baseURLHost string
adminKey string
cli *http.Client
cacheState int32
cache cache.Cache
cacheSynced chan struct{}
cacheSyncErr error
route Route
upstream Upstream
ssl SSL
streamRoute StreamRoute
globalRules GlobalRule
consumer Consumer
plugin Plugin
schema Schema
metricsCollector metrics.Collector
}

func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
Expand Down Expand Up @@ -125,8 +127,9 @@ func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
Timeout: o.Timeout,
Transport: _defaultTransport,
},
cacheState: _cacheSyncing, // default state
cacheSynced: make(chan struct{}),
cacheState: _cacheSyncing, // default state
cacheSynced: make(chan struct{}),
metricsCollector: metrics.NewPrometheusCollector(),
}
c.route = newRouteClient(c)
c.upstream = newUpstreamClient(c)
Expand Down Expand Up @@ -322,6 +325,7 @@ func (c *cluster) syncSchema(ctx context.Context, interval time.Duration) {
for {
if err := c.syncSchemaOnce(ctx); err != nil {
log.Warnf("failed to sync schema: %s", err)
c.metricsCollector.IncrSyncOperation("schema", "failure")
}

select {
Expand Down Expand Up @@ -379,6 +383,7 @@ func (c *cluster) syncSchemaOnce(ctx context.Context) error {
continue
}
}
c.metricsCollector.IncrSyncOperation("schema", "success")
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/ingress/apisix_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,11 @@ func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) er
)
c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.metricsCollector.IncrSyncOperation("consumer", "failure")
return err
}

c.controller.metricsCollector.IncrSyncOperation("consumer", "success")
c.controller.recorderEvent(ac, corev1.EventTypeNormal, _resourceSynced, nil)
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/apisix_tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}

c.controller.recorderEvent(tls, corev1.EventTypeNormal, _resourceSynced, nil)
c.controller.recordStatus(tls, _resourceSynced, nil, metav1.ConditionTrue)
return err
Expand All @@ -173,13 +172,15 @@ func (c *apisixTlsController) syncSecretSSL(secretKey string, apisixTlsKey strin
func (c *apisixTlsController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
c.controller.metricsCollector.IncrSyncOperation("ssl", "success")
return
}
log.Warnw("sync ApisixTls failed, will retry",
zap.Any("object", obj),
zap.Error(err),
)
c.workqueue.AddRateLimited(obj)
c.controller.metricsCollector.IncrSyncOperation("ssl", "failure")
}

func (c *apisixTlsController) onAdd(obj interface{}) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func NewController(cfg *config.Config) (*Controller, error) {
cfg: cfg,
apiServer: apiSrv,
apisix: client,
metricsCollector: metrics.NewPrometheusCollector(podName, podNamespace),
metricsCollector: metrics.NewPrometheusCollector(),
kubeClient: kubeClient,
watchingNamespace: watchingNamespace,
secretSSLMap: new(sync.Map),
Expand Down Expand Up @@ -617,5 +617,6 @@ func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.
return
}
log.Debugf("success check health for default cluster")
c.metricsCollector.IncrCheckClusterHealth(c.name)
}
}
2 changes: 2 additions & 0 deletions pkg/ingress/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@ func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {
func (c *endpointsController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
c.controller.metricsCollector.IncrSyncOperation("endpoint", "success")
return
}
log.Warnw("sync endpoints failed, will retry",
zap.Any("object", obj),
)
c.workqueue.AddRateLimited(obj)
c.controller.metricsCollector.IncrSyncOperation("endpoint", "failure")
}

func (c *endpointsController) onAdd(obj interface{}) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ingress/endpointslice.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,14 @@ func (c *endpointSliceController) sync(ctx context.Context, ev *types.Event) err
func (c *endpointSliceController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
c.controller.metricsCollector.IncrSyncOperation("endpointSlices", "success")
return
}
log.Warnw("sync endpointSlice failed, will retry",
zap.Any("object", obj),
)
c.workqueue.AddRateLimited(obj)
c.controller.metricsCollector.IncrSyncOperation("endpointSlices", "failure")
}

func (c *endpointSliceController) onAdd(obj interface{}) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ingress/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,15 @@ func (c *secretController) sync(ctx context.Context, ev *types.Event) error {
func (c *secretController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
c.controller.metricsCollector.IncrSyncOperation("secret", "success")
return
}
log.Warnw("sync ApisixTls failed, will retry",
zap.Any("object", obj),
zap.Error(err),
)
c.workqueue.AddRateLimited(obj)
c.controller.metricsCollector.IncrSyncOperation("secret", "failure")
}

func (c *secretController) onAdd(obj interface{}) {
Expand Down
65 changes: 60 additions & 5 deletions pkg/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package metrics

import (
"os"
"strconv"
"time"

Expand All @@ -37,20 +38,33 @@ type Collector interface {
RecordAPISIXLatency(time.Duration)
// IncrAPISIXRequest increases the number of requests to apisix.
IncrAPISIXRequest(string)
// IncrCheckClusterHealth increases the number of cluster health check operations
// with the cluster name label.
IncrCheckClusterHealth(string)
// IncrSyncOperation increases the number of sync operations with the resource
// type label.
IncrSyncOperation(string, string)
}

// collector contains necessary messages to collect Prometheus metrics.
type collector struct {
isLeader prometheus.Gauge
apisixLatency prometheus.Summary
apisixRequests *prometheus.CounterVec
apisixCodes *prometheus.GaugeVec
isLeader prometheus.Gauge
apisixLatency prometheus.Summary
apisixRequests *prometheus.CounterVec
apisixCodes *prometheus.GaugeVec
checkClusterHealth *prometheus.CounterVec
syncOperation *prometheus.CounterVec
}

// NewPrometheusCollectors creates the Prometheus metrics collector.
// It also registers all internal metric collector to prometheus,
// so do not call this function duplicately.
func NewPrometheusCollector(podName, podNamespace string) Collector {
func NewPrometheusCollector() Collector {
podName := os.Getenv("POD_NAME")
podNamespace := os.Getenv("POD_NAMESPACE")
if podNamespace == "" {
podNamespace = "default"
}
constLabels := prometheus.Labels{
"controller_pod": podName,
"controller_namespace": podNamespace,
Expand Down Expand Up @@ -91,6 +105,24 @@ func NewPrometheusCollector(podName, podNamespace string) Collector {
},
[]string{"resource"},
),
checkClusterHealth: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: _namespace,
Name: "check_cluster_health_total",
Help: "Number of cluster health check operations",
ConstLabels: constLabels,
},
[]string{"name"},
),
syncOperation: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: _namespace,
Name: "sync_operation_total",
Help: "Number of sync operations",
ConstLabels: constLabels,
},
[]string{"resource", "result"},
),
}

// Since we use the DefaultRegisterer, in test cases, the metrics
Expand All @@ -99,12 +131,16 @@ func NewPrometheusCollector(podName, podNamespace string) Collector {
prometheus.Unregister(collector.apisixCodes)
prometheus.Unregister(collector.apisixLatency)
prometheus.Unregister(collector.apisixRequests)
prometheus.Unregister(collector.checkClusterHealth)
prometheus.Unregister(collector.syncOperation)

prometheus.MustRegister(
collector.isLeader,
collector.apisixCodes,
collector.apisixLatency,
collector.apisixRequests,
collector.checkClusterHealth,
collector.syncOperation,
)

return collector
Expand Down Expand Up @@ -140,13 +176,30 @@ func (c *collector) IncrAPISIXRequest(resource string) {
c.apisixRequests.WithLabelValues(resource).Inc()
}

// IncrCheckClusterHealth increases the number of cluster health check
// operations.
func (c *collector) IncrCheckClusterHealth(name string) {
c.checkClusterHealth.WithLabelValues(name).Inc()
}

// IncrSyncOperation increases the number of sync operations for specific
// resource.
func (c *collector) IncrSyncOperation(resource, result string) {
c.syncOperation.With(prometheus.Labels{
"resource": resource,
"result": result,
}).Inc()
}

// Collect collects the prometheus.Collect.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
c.isLeader.Collect(ch)
c.apisixLatency.Collect(ch)
c.apisixRequests.Collect(ch)
c.apisixLatency.Collect(ch)
c.apisixCodes.Collect(ch)
c.checkClusterHealth.Collect(ch)
c.syncOperation.Collect(ch)
}

// Describe describes the prometheus.Describe.
Expand All @@ -156,4 +209,6 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
c.apisixRequests.Describe(ch)
c.apisixLatency.Describe(ch)
c.apisixCodes.Describe(ch)
c.checkClusterHealth.Describe(ch)
c.syncOperation.Describe(ch)
}
Loading