Skip to content

Commit

Permalink
interference manager: add metrics provider and implement prometheus p…
Browse files Browse the repository at this point in the history
…rovider (#6)

Signed-off-by: songtao98 <songtao2603060@gmail.com>
  • Loading branch information
songtao98 authored Mar 8, 2023
1 parent 25b35d3 commit 89c8012
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.1.1
github.com/koordinator-sh/koordinator v1.1.1-0.20230301120008-b66fbe0f57f0
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
go.uber.org/atomic v1.10.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
k8s.io/api v0.26.0
Expand Down Expand Up @@ -105,7 +106,6 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/seccomp/libseccomp-golang v0.9.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22
github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ=
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down Expand Up @@ -732,6 +733,7 @@ github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8m
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mvdan/xurls v1.1.0/go.mod h1:tQlNn3BED8bE/15hnSL2HLkDeLWpNPAwtw7wkEq44oU=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
Expand Down
82 changes: 82 additions & 0 deletions pkg/interferencemanager/metric-provider/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common

import (
prommodel "github.com/prometheus/common/model"
)

type MetricQueryOptions struct {
MetricName string
FilterLabels map[string]string

PromSumByLabels []string
}

type Metric struct {
Labels map[string]string
Value float64
}

type ProviderType string

const (
PrometheusProvider ProviderType = "prometheus_provider"
)

const (
ContainerID string = "container_id"
ContainerName string = "container_name"
Node string = "node"
PodName string = "pod_name"
PodNamespace string = "pod_namespace"
PodUID string = "pod_uid"

CPIField string = "cpi_field"
)

const (
KoordletContainerCPI string = "koordlet_container_cpi"
KoordletPodCPI string = "koordlet_pod_cpi"

Cycles string = "cycles"
Instructions string = "instructions"
)

type MakeLabelsFunc func(metric prommodel.Metric) (map[string]string, error)

func MakeContainerCPILabels(metric prommodel.Metric) (map[string]string, error) {
labels := map[string]string{
ContainerID: string(metric["container_id"]),
ContainerName: string(metric["container_name"]),
PodUID: string(metric["pod_uid"]),
PodNamespace: string(metric["pod_namespace"]),
PodName: string(metric["pod_name"]),
Node: string(metric["node"]),
}
return labels, nil
}

func MakePodCPILabels(metric prommodel.Metric) (map[string]string, error) {
labels := map[string]string{
PodUID: string(metric["pod_uid"]),
PodNamespace: string(metric["pod_namespace"]),
PodName: string(metric["pod_name"]),
Node: string(metric["node"]),
}
return labels, nil
}
34 changes: 34 additions & 0 deletions pkg/interferencemanager/metric-provider/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package config

import (
"time"

mp "github.com/koordinator-sh/koordetector/pkg/interferencemanager/metric-provider/common"
)

type MetricProviderConfig struct {
ProviderType mp.ProviderType
PromConf PrometheusProviderConfig
}

type PrometheusProviderConfig struct {
// todo: prom-server service cluster IP, port default 9090, use flag
Address string
QueryTimeout time.Duration
}
41 changes: 41 additions & 0 deletions pkg/interferencemanager/metric-provider/metric_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metric_provider

import (
"fmt"

"github.com/koordinator-sh/koordetector/pkg/interferencemanager/metric-provider/common"
"github.com/koordinator-sh/koordetector/pkg/interferencemanager/metric-provider/config"
"github.com/koordinator-sh/koordetector/pkg/interferencemanager/metric-provider/prometheus"
)

type MetricProvider interface {
GetCPI(options common.MetricQueryOptions, labelFunc common.MakeLabelsFunc) ([]*common.Metric, error)
}

func NewMetricsProvider(config config.MetricProviderConfig) (MetricProvider, error) {
switch config.ProviderType {
case common.PrometheusProvider:
provider, err := prometheus.NewPrometheusProvider(config.PromConf)
if err != nil {
return nil, err
}
return provider, nil
}
return nil, fmt.Errorf("metric provider does not support type: %v", config.ProviderType)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package prometheus

import (
"context"
"fmt"
"strings"
"time"

promapi "github.com/prometheus/client_golang/api"
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
prommodel "github.com/prometheus/common/model"

"github.com/koordinator-sh/koordetector/pkg/interferencemanager/metric-provider/common"
"github.com/koordinator-sh/koordetector/pkg/interferencemanager/metric-provider/config"
)

const (
SumBy string = "sum by"
)

type prometheusProvider struct {
prometheusClient prometheusv1.API
config config.PrometheusProviderConfig
queryTimeout time.Duration
}

// NewPrometheusProvider contructs a metric provider that gets data from Prometheus.
func NewPrometheusProvider(config config.PrometheusProviderConfig) (*prometheusProvider, error) {
promClient, err := promapi.NewClient(promapi.Config{
Address: config.Address,
})
if err != nil {
return &prometheusProvider{}, err
}
return &prometheusProvider{
prometheusClient: prometheusv1.NewAPI(promClient),
config: config,
queryTimeout: config.QueryTimeout,
}, nil
}

func (p *prometheusProvider) GetCPI(options common.MetricQueryOptions, labelFunc common.MakeLabelsFunc) ([]*common.Metric, error) {
query, err := MakeQueryCPIString(options)
if err != nil {
return nil, err
}
var result []*common.Metric
promResult, err := p.query(query)
if err != nil {
return nil, err
}
for _, metric := range promResult {
labels, err := labelFunc(metric.Metric)
if err != nil {
return nil, err
}
result = append(result, &common.Metric{
Labels: labels,
Value: float64(metric.Value),
})
}
return result, nil
}

// query delicate prometheus query api.
func (p *prometheusProvider) query(query string) (prommodel.Vector, error) {
ctx, cancel := context.WithTimeout(context.Background(), p.queryTimeout)
defer cancel()

result, _, err := p.prometheusClient.Query(ctx, query, time.Now())
if err != nil {
return nil, fmt.Errorf("cannot get metrics for query %v: %v", query, err)
}

vector, ok := result.(prommodel.Vector)
if !ok {
return nil, fmt.Errorf("expected query to return a vector; got result type %T", result)
}

return vector, nil
}

func NewDefaultCPISumByLabels(metricName string) ([]string, error) {
switch metricName {
case common.KoordletContainerCPI:
return []string{
common.ContainerID,
common.ContainerName,
common.PodUID,
common.PodNamespace,
common.PodName,
common.Node,
}, nil
case common.KoordletPodCPI:
return []string{
common.PodUID,
common.PodNamespace,
common.PodName,
common.Node,
}, nil
}
return nil, fmt.Errorf("metric name %v not supported", metricName)
}

// MakeQueryCPIString constructs PromQL style query string based on @options.
//
// @return
// "sum by (container_id, container_name, pod_uid, pod_namespace, pod_name, node) " +
// "(koordlet_container_cpi{cpi_field=\"cycles\"}) / " +
// "sum by (container_id, container_name, pod_uid, pod_namespace, pod_name, node) " +
// "(koordlet_container_cpi{cpi_field=\"instructions\"})"
//
// where
// (container_id, container_name, pod_uid, pod_namespace, pod_name, node) is the PromSumByLabels slice
// {cpi_field=\"cycles\"} is the FilterLabels map
// koordlet_container_cpi is options.MetricName
func MakeQueryCPIString(options common.MetricQueryOptions) (string, error) {
if options.PromSumByLabels == nil {
labels, err := NewDefaultCPISumByLabels(options.MetricName)
if err != nil {
return "", err
}
options.PromSumByLabels = labels
}
if options.FilterLabels == nil {
options.FilterLabels = map[string]string{}
}
options.FilterLabels[common.CPIField] = common.Cycles
queryCycles := makeSumByString(options.PromSumByLabels) + makeMetricFilterString(common.KoordletContainerCPI, options.FilterLabels)
options.FilterLabels[common.CPIField] = common.Instructions
queryInstructions := makeSumByString(options.PromSumByLabels) + makeMetricFilterString(common.KoordletContainerCPI, options.FilterLabels)
return queryCycles + "/" + queryInstructions, nil
}

func makeSumByString(labels []string) string {
labelsString := strings.Join(labels, ",")
return fmt.Sprintf("%v(%v)", SumBy, labelsString)
}

func makeMetricFilterString(metricName string, filters map[string]string) string {
var filterSlice []string
for label, value := range filters {
filterSlice = append(filterSlice, fmt.Sprintf("%v=\"%v\"", label, value))
}
filterString := strings.Join(filterSlice, ",")
return fmt.Sprintf("(%v{%v})", metricName, filterString)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package prometheus

import (
"fmt"
"testing"

mp "github.com/koordinator-sh/koordetector/pkg/interferencemanager/metric-provider/common"
)

func TestMakeQueryCPIString(t *testing.T) {
sumLabels, err := NewDefaultCPISumByLabels(mp.KoordletContainerCPI)
if err != nil {
t.Errorf("NewPrometheusProvider() = %v", err)
}
opt := mp.MetricQueryOptions{
MetricName: mp.KoordletContainerCPI,
PromSumByLabels: sumLabels,
}
query, err := MakeQueryCPIString(opt)
if err != nil {
t.Errorf("Query() = %v", err)
}
fmt.Printf("result: %v", query)
}

0 comments on commit 89c8012

Please sign in to comment.