Skip to content

Commit

Permalink
Ansible add metrics (operator-framework#5438)
Browse files Browse the repository at this point in the history
* Users can now create metrics through Ansible

Metrics are created with the osdk_metrics module in
operator-sdk-ansible-util which communicates with an ansible API
server.

Signed-off-by: austin <austin@redhat.com>

* bump collection

Signed-off-by: austin <austin@redhat.com>

* cleanup

Signed-off-by: austin <austin@redhat.com>

* Add copyright info

Signed-off-by: austin <austin@redhat.com>

* clean up log message levels

Signed-off-by: austin <austin@redhat.com>

* add metrics address option and fix errors

Signed-off-by: austin <austin@redhat.com>

* log cleanup

Signed-off-by: austin <austin@redhat.com>

* sanity checks

Signed-off-by: austin <austin@redhat.com>

* comments

Signed-off-by: austin <austin@redhat.com>

* bump retries to avoid flake

Signed-off-by: austin <austin@redhat.com>

* feedback from ryan

Signed-off-by: austin <austin@redhat.com>

* helper functions

Signed-off-by: austin <austin@redhat.com>

* handle error

Signed-off-by: austin <austin@redhat.com>

* refactor metric creation into helper

Signed-off-by: austin <austin@redhat.com>

* revert Run changes for now

Signed-off-by: austin <austin@redhat.com>

* put Run in goroutine, not goroutine in Run

Signed-off-by: austin <austin@redhat.com>

* s/server/err since it is error type

Signed-off-by: austin <austin@redhat.com>

* read and close request body, catch error on negative counter.Add value

Signed-off-by: austin <austin@redhat.com>

* explicitly ignore errors returned by copy

Signed-off-by: austin <austin@redhat.com>
  • Loading branch information
asmacdo authored Jan 26, 2022
1 parent 00cef3a commit 697a426
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@
name: "externalnamespace"
register: external_namespace
until: external_namespace.resources[0].metadata.labels["foo"] == "bar"
retries: 6

2 changes: 1 addition & 1 deletion hack/tests/e2e-ansible-molecule.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pip3 install cryptography==3.3.2 molecule==3.0.2
pip3 install ansible-lint yamllint
pip3 install docker==4.2.2 openshift==0.12.1 jmespath
ansible-galaxy collection install 'kubernetes.core:==2.2.0'
ansible-galaxy collection install 'operator_sdk.util:==0.4.0'

header_text "Copying molecule testdata scenarios"
ROOTDIR="$(pwd)"
Expand All @@ -48,7 +49,6 @@ else
fi
KUSTOMIZE_PATH=${KUSTOMIZE} TEST_OPERATOR_NAMESPACE=default molecule test -s kind


header_text "Running Default test with advanced-molecule-operator"
pushd $TMPDIR/advanced-molecule-operator

Expand Down
75 changes: 75 additions & 0 deletions internal/ansible/apiserver/apiserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2022 The Operator-SDK Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"encoding/json"
"fmt"
"io"
"net/http"

logf "sigs.k8s.io/controller-runtime/pkg/log"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

"github.com/operator-framework/operator-sdk/internal/ansible/metrics"
)

var log = logf.Log.WithName("apiserver")

type Options struct {
Address string
Port int
}

func Run(options Options) error {
mux := http.NewServeMux()
mux.HandleFunc("/metrics", metricsHandler)

server := http.Server{
Addr: fmt.Sprintf("%s:%d", options.Address, options.Port),
Handler: mux,
}
log.Info("Starting to serve metrics listener", "Address", server.Addr)
return server.ListenAndServe()
}

func metricsHandler(w http.ResponseWriter, r *http.Request) {
defer func() {
_, _ = io.Copy(io.Discard, r.Body)
r.Body.Close()
}()
log.V(3).Info(fmt.Sprintf("%s %s", r.Method, r.URL))

var userMetric metrics.UserMetric

switch r.Method {
case http.MethodPost:
log.V(3).Info("The apiserver has received a POST")
err := json.NewDecoder(r.Body).Decode(&userMetric)
if err != nil {
log.Info(err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err = metrics.HandleUserMetric(crmetrics.Registry, userMetric)
if err != nil {
log.Info(err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
}
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}

}
3 changes: 1 addition & 2 deletions internal/ansible/controller/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,8 @@ func (r *AnsibleOperatorReconciler) Reconcile(ctx context.Context, request recon
return reconcile.Result{}, err
}
}

if module, found := event.EventData["task_action"]; found {
if module == "operator_sdk.util.requeue_after" && event.Event != eventapi.EventRunnerOnFailed {
if module == "operator_sdk.util.requeue_after" || module == "requeue_after" && event.Event != eventapi.EventRunnerOnFailed {
if data, exists := event.EventData["res"]; exists {
if fields, check := data.(map[string]interface{}); check {
requeueDuration, err := time.ParseDuration(fields["period"].(string))
Expand Down
162 changes: 162 additions & 0 deletions internal/ansible/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package metrics

import (
"errors"
"fmt"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -60,6 +61,8 @@ var (
[]string{
"GVK",
})

userMetrics = map[string]prometheus.Collector{}
)

func init() {
Expand All @@ -82,6 +85,165 @@ func RegisterBuildInfo(r prometheus.Registerer) {
r.MustRegister(buildInfo)
}

type UserMetric struct {
Name string `json:"name" yaml:"name"`
Help string `json:"description" yaml:"description"`
Counter *UserMetricCounter `json:"counter,omitempty" yaml:"counter,omitempty"`
Gauge *UserMetricGauge `json:"gauge,omitempty" yaml:"gauge,omitempty"`
Histogram *UserMetricHistogram `json:"histogram,omitempty" yaml:"histogram,omitempty"`
Summary *UserMetricSummary `json:"summary,omitempty" yaml:"summary,omitempty"`
}

type UserMetricCounter struct {
Inc bool `json:"increment,omitempty" yaml:"increment,omitempty"`
Add float64 `json:"add,omitempty" yaml:"add,omitempty"`
}

type UserMetricGauge struct {
Set float64 `json:"set,omitempty" yaml:"set,omitempty"`
Inc bool `json:"increment,omitempty" yaml:"increment,omitempty"`
Dec bool `json:"decrement,omitempty" yaml:"decrement,omitempty"`
SetToCurrentTime bool `json:"set_to_current_time,omitempty" yaml:"set_to_current_time,omitempty"`
Add float64 `json:"add,omitempty" yaml:"add,omitempty"`
Sub float64 `json:"subtract,omitempty" yaml:"subtract,omitempty"`
}

type UserMetricHistogram struct {
Observe float64 `json:"observe,omitempty" yaml:"observe,omitempty"`
}

type UserMetricSummary struct {
Observe float64 `json:"observe,omitempty" yaml:"observe,omitempty"`
}

func validateMetricSpec(metricSpec UserMetric) error {
var metricConfigs int
if metricSpec.Counter != nil {
metricConfigs++
}
if metricSpec.Gauge != nil {
metricConfigs++
}
if metricSpec.Summary != nil {
metricConfigs++
}
if metricSpec.Histogram != nil {
metricConfigs++
}
if metricConfigs > 1 {
return errors.New("only one metric can be processed at a time")
} else if metricConfigs == 0 {
return errors.New("a request should contain at least one metric")
}
return nil
}

func handleCounter(metricSpec UserMetric, counter prometheus.Counter) error {
if metricSpec.Counter == nil {
return fmt.Errorf("cannot change metric type of %s, which is a counter", metricSpec.Name)
}
if metricSpec.Counter.Inc {
counter.Inc()
} else if metricSpec.Counter.Add != 0.0 {
if metricSpec.Counter.Add < 0 {
return errors.New("counter metrics cannot decrease in value")
}
counter.Add(metricSpec.Counter.Add)
}
return nil
}

func handleGauge(metricSpec UserMetric, gauge prometheus.Gauge) error {
if metricSpec.Gauge == nil {
return fmt.Errorf("cannot change metric type of %s, which is a gauge", metricSpec.Name)
}
if metricSpec.Gauge.Inc {
gauge.Inc()
} else if metricSpec.Gauge.Dec {
gauge.Dec()
} else if metricSpec.Gauge.Add != 0.0 {
gauge.Add(metricSpec.Gauge.Add)
} else if metricSpec.Gauge.Sub != 0.0 {
gauge.Sub(metricSpec.Gauge.Sub)
} else if metricSpec.Gauge.Set != 0.0 {
gauge.Set(metricSpec.Gauge.Set)
} else if metricSpec.Gauge.SetToCurrentTime {
gauge.SetToCurrentTime()
}
return nil
}

func handleSummaryOrHistogram(metricSpec UserMetric, summary prometheus.Summary) error {
if metricSpec.Histogram == nil && metricSpec.Summary == nil {
return fmt.Errorf("cannot change metric type of %s, which is a histogram or summary", metricSpec.Name)
}
if metricSpec.Histogram != nil {
summary.Observe(metricSpec.Histogram.Observe)
} else if metricSpec.Summary != nil {
summary.Observe(metricSpec.Summary.Observe)
}
return nil
}

func ensureMetric(r prometheus.Registerer, metricSpec UserMetric) {
if _, ok := userMetrics[metricSpec.Name]; !ok {
// This is the first time we've seen this metric
logf.Log.WithName("metrics").Info("Registering", "metric", metricSpec.Name)
if metricSpec.Counter != nil {
userMetrics[metricSpec.Name] = prometheus.NewCounter(prometheus.CounterOpts{
Name: metricSpec.Name,
Help: metricSpec.Help,
})
}
if metricSpec.Gauge != nil {
userMetrics[metricSpec.Name] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: metricSpec.Name,
Help: metricSpec.Help,
})
}
if metricSpec.Histogram != nil {
userMetrics[metricSpec.Name] = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: metricSpec.Name,
Help: metricSpec.Help,
})
}
if metricSpec.Summary != nil {
userMetrics[metricSpec.Name] = prometheus.NewSummary(prometheus.SummaryOpts{
Name: metricSpec.Name,
Help: metricSpec.Help,
})
}
if err := r.Register(userMetrics[metricSpec.Name]); err != nil {
logf.Log.WithName("metrics").Info("Unable to register %s metric with prometheus.", metricSpec.Name)
}
}
}

func HandleUserMetric(r prometheus.Registerer, metricSpec UserMetric) error {
if err := validateMetricSpec(metricSpec); err != nil {
return err
}
ensureMetric(r, metricSpec)
collector := userMetrics[metricSpec.Name]
switch v := collector.(type) {
// Gauge must be first, because a Counter is a Gauge, but a Gauge is not a Counter.
case prometheus.Gauge:
if err := handleGauge(metricSpec, v); err != nil {
return err
}
case prometheus.Counter:
if err := handleCounter(metricSpec, v); err != nil {
return err
}
// Histogram and Summary interfaces are identical, so we accept either case.
case prometheus.Histogram:
if err := handleSummaryOrHistogram(metricSpec, v); err != nil {
return err
}
}
return nil
}

func ReconcileSucceeded(gvk string) {
defer recoverMetricPanic()
reconcileResults.WithLabelValues(gvk, "succeeded").Inc()
Expand Down
9 changes: 9 additions & 0 deletions internal/cmd/ansible-operator/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

"github.com/operator-framework/operator-sdk/internal/ansible/apiserver"
"github.com/operator-framework/operator-sdk/internal/ansible/controller"
"github.com/operator-framework/operator-sdk/internal/ansible/events"
"github.com/operator-framework/operator-sdk/internal/ansible/flags"
Expand Down Expand Up @@ -262,6 +263,14 @@ func run(cmd *cobra.Command, f *flags.Flags) {
log.Error(err, "Error starting proxy.")
os.Exit(1)
}
// start the ansible-operator api server
go func() {
err = apiserver.Run(apiserver.Options{
Address: "localhost",
Port: 5050,
})
done <- err
}()

// start the operator
go func() {
Expand Down

0 comments on commit 697a426

Please sign in to comment.