Skip to content
This repository was archived by the owner on Dec 7, 2022. It is now read-only.

compatible with slime metric reconfiguration #12

Merged
merged 3 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 41 additions & 21 deletions controllers/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@ package controllers

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"slime.io/slime/framework/model"

"k8s.io/apimachinery/pkg/api/errors"

event_source "slime.io/slime/framework/model/source"
"slime.io/slime/framework/model/metric"
lazyloadv1alpha1 "slime.io/slime/modules/lazyload/api/v1alpha1"
"strings"
)

const (
Expand All @@ -27,25 +24,48 @@ const (
CreatedByFenceController = "fence-controller"
)

func (r *ServicefenceReconciler) WatchSource(stop <-chan struct{}) {
go func() {
for {
select {
case <-stop:
func (r *ServicefenceReconciler) WatchMetric() {
log := log.WithField("reporter", "ServicefenceReconciler").WithField("function", "WatchMetric")
log.Infof("start watching metric")

for {
select {
case metric, ok := <-r.watcherMetricChan:
if !ok {
log.Warningf("watcher mertic channel closed, break process loop")
return
}
r.ConsumeMetric(metric)
case metric, ok := <-r.tickerMetricChan:
if !ok {
log.Warningf("ticker metric channel closed, break process loop")
return
case e := <-r.eventChan:
switch e.EventType {
case event_source.Update, event_source.Add:
if _, err := r.Refresh(reconcile.Request{NamespacedName: e.Loc}, e.Info); err != nil {
fmt.Printf("error:%v", err)
}
}
}
r.ConsumeMetric(metric)
}
}()
}

}

func (r *ServicefenceReconciler) Refresh(req reconcile.Request, args map[string]string) (reconcile.Result, error) {
func (r *ServicefenceReconciler) ConsumeMetric(metric metric.Metric) {
for meta, results := range metric {
log.Debugf("got metric for %s", meta)
namespace, name := strings.Split(meta, "/")[0], strings.Split(meta, "/")[1]
nn := types.NamespacedName{Namespace: namespace, Name: name}
if len(results) != 1 {
log.Errorf("wrong metric results length for %s", meta)
continue
}
value := results[0].Value
if _, err := r.Refresh(reconcile.Request{NamespacedName: nn}, value); err != nil {
log.Errorf("refresh error:%v", err)
}
}
}

func (r *ServicefenceReconciler) Refresh(req reconcile.Request, value map[string]string) (reconcile.Result, error) {
log := log.WithField("reporter", "ServicefenceReconciler").WithField("function", "Refresh")

sf := &lazyloadv1alpha1.ServiceFence{}
err := r.Client.Get(context.TODO(), req.NamespacedName, sf)

Expand Down Expand Up @@ -75,7 +95,7 @@ func (r *ServicefenceReconciler) Refresh(req reconcile.Request, args map[string]
}
}

sf.Status.MetricStatus = args
sf.Status.MetricStatus = value
err = r.Client.Status().Update(context.TODO(), sf)
if err != nil {
log.Errorf("can not update ServiceFence %s, %+v", req.NamespacedName.Name, err)
Expand Down
206 changes: 175 additions & 31 deletions controllers/servicefence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ package controllers
import (
"context"
"fmt"
prometheusApi "github.com/prometheus/client_golang/api"
prometheusV1 "github.com/prometheus/client_golang/api/prometheus/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"reflect"
"slime.io/slime/framework/apis/config/v1alpha1"
"slime.io/slime/framework/model/metric"
"slime.io/slime/framework/model/trigger"
"sort"
"strings"
"sync"
"time"

"slime.io/slime/framework/model"

"slime.io/slime/framework/apis/config/v1alpha1"

istio "istio.io/api/networking/v1alpha3"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -43,11 +48,9 @@ import (
"slime.io/slime/framework/apis/networking/v1alpha3"
"slime.io/slime/framework/bootstrap"
"slime.io/slime/framework/controllers"
event_source "slime.io/slime/framework/model/source"
"slime.io/slime/framework/model/source/aggregate"
"slime.io/slime/framework/model/source/k8s"
"slime.io/slime/framework/util"

stderrors "errors"
lazyloadv1alpha1 "slime.io/slime/modules/lazyload/api/v1alpha1"
modmodel "slime.io/slime/modules/lazyload/model"
)
Expand All @@ -57,41 +60,51 @@ type ServicefenceReconciler struct {
client.Client
Scheme *runtime.Scheme
cfg *v1alpha1.Fence
env *bootstrap.Environment
eventChan chan event_source.Event
source event_source.Source
reconcileLock sync.Mutex
env bootstrap.Environment
interestMeta map[string]bool
interestMetaCopy map[string]bool // for outside read
watcherMetricChan <-chan metric.Metric
tickerMetricChan <-chan metric.Metric
reconcileLock sync.RWMutex
staleNamespaces map[string]bool
enabledNamespaces map[string]bool
}

// NewReconciler returns a new reconcile.Reconciler
func NewReconciler(cfg *v1alpha1.Fence, mgr manager.Manager, env *bootstrap.Environment) *ServicefenceReconciler {
func NewReconciler(cfg *v1alpha1.Fence, mgr manager.Manager, env bootstrap.Environment) *ServicefenceReconciler {
log := modmodel.ModuleLog.WithField(model.LogFieldKeyFunction, "NewReconciler")

// generate producer config
pc, err := newProducerConfig(env)
if err != nil {
log.Errorf("%v", err)
return nil
}

r := &ServicefenceReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
cfg: cfg,
env: env,
interestMeta: map[string]bool{},
interestMetaCopy: map[string]bool{},
watcherMetricChan: pc.WatcherProducerConfig.MetricChan,
tickerMetricChan: pc.TickerProducerConfig.MetricChan,
staleNamespaces: map[string]bool{},
enabledNamespaces: map[string]bool{},
}

if env.Config.Metric != nil {
eventChan := make(chan event_source.Event)
src := &aggregate.Source{}
if ms, err := k8s.NewMetricSource(eventChan, env); err != nil {
log.Errorf("failed to create slime-metric, %+v", err)
} else {
src.Sources = append(src.Sources, ms)
r.eventChan = eventChan
r.source = src
// reconciler defines producer metric handler
pc.WatcherProducerConfig.NeedUpdateMetricHandler = r.handleWatcherEvent
pc.TickerProducerConfig.NeedUpdateMetricHandler = r.handleTickerEvent

r.source.Start(env.Stop)
r.WatchSource(env.Stop)
}
// start producer
metric.NewProducer(pc)
log.Infof("producers starts")

if env.Config.Metric != nil {
go r.WatchMetric()
}

return r
}

Expand All @@ -114,8 +127,10 @@ func (r *ServicefenceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error
if errors.IsNotFound(err) {
// TODO should be recovered? maybe we should call refreshFenceStatusOfService here
log.Info("serviceFence is deleted")
r.source.WatchRemove(req.NamespacedName)
return reconcile.Result{}, nil
//r.interestMeta.Pop(req.NamespacedName.String())
delete(r.interestMeta, req.NamespacedName.String())
r.updateInterestMetaCopy()
return r.refreshFenceStatusOfService(context.TODO(), nil, req.NamespacedName)
} else {
log.Errorf("get serviceFence error,%+v", err)
return reconcile.Result{}, err
Expand All @@ -127,22 +142,148 @@ func (r *ServicefenceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error
req.NamespacedName, rev, r.env.IstioRev())
return reconcile.Result{}, nil
}
log.Infof("get serviceFence, %+v", req.NamespacedName)
log.Infof("ServicefenceReconciler got serviceFence request, %+v", req.NamespacedName)

// 资源更新
diff := r.updateVisitedHostStatus(instance)
r.recordVisitor(instance, diff)
if instance.Spec.Enable {
if r.source != nil {
r.source.WatchAdd(types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace})
}
r.interestMeta[req.NamespacedName.String()] = true
r.updateInterestMetaCopy()
err = r.refreshSidecar(instance)
}

return ctrl.Result{}, err
}

func (r *ServicefenceReconciler) updateInterestMetaCopy() {
newInterestMeta := make(map[string]bool)
for k, v := range r.interestMeta {
newInterestMeta[k] = v
}
r.interestMetaCopy = newInterestMeta
}

func (r *ServicefenceReconciler) getInterestMeta() map[string]bool {
r.reconcileLock.RLock()
defer r.reconcileLock.RUnlock()
return r.interestMetaCopy
}

// call back function for watcher producer
func (r *ServicefenceReconciler) handleWatcherEvent(event trigger.WatcherEvent) metric.QueryMap {

// check event
gvks := []schema.GroupVersionKind{
{Group: "networking.istio.io", Version: "v1beta1", Kind: "Sidecar"},
}
invalidEvent := false
for _, gvk := range gvks {
if event.GVK == gvk && r.getInterestMeta()[event.NN.String()] {
invalidEvent = true
}
}
if !invalidEvent {
return nil
}

// generate query map for producer
qm := make(map[string][]metric.Handler)
var hs []metric.Handler
for pName, pHandler := range r.env.Config.Metric.Prometheus.Handlers {
hs = append(hs, generateHandler(event.NN.Name, event.NN.Namespace, pName, pHandler))
}
qm[event.NN.String()] = hs
return qm
}

// call back function for ticker producer
func (r *ServicefenceReconciler) handleTickerEvent(event trigger.TickerEvent) metric.QueryMap {

// no need to check time duration

// generate query map for producer
qm := make(map[string][]metric.Handler)
for meta := range r.getInterestMeta() {
namespace, name := strings.Split(meta, "/")[0], strings.Split(meta, "/")[1]
var hs []metric.Handler
for pName, pHandler := range r.env.Config.Metric.Prometheus.Handlers {
hs = append(hs, generateHandler(name, namespace, pName, pHandler))
}
qm[meta] = hs
}

return qm
}

func generateHandler(name, namespace, pName string, pHandler *v1alpha1.Prometheus_Source_Handler) metric.Handler {
query := strings.ReplaceAll(pHandler.Query, "$namespace", namespace)
query = strings.ReplaceAll(query, "$source_app", name)
return metric.Handler{Name: pName, Query: query}
}

func newProducerConfig(env bootstrap.Environment) (*metric.ProducerConfig, error) {

prometheusSourceConfig, err := newPrometheusSourceConfig(env)
if err != nil {
return nil, err
}

return &metric.ProducerConfig{
EnableWatcherProducer: true,
WatcherProducerConfig: metric.WatcherProducerConfig{
Name: "lazyload-watcher",
MetricChan: make(chan metric.Metric),
WatcherTriggerConfig: trigger.WatcherTriggerConfig{
Kinds: []schema.GroupVersionKind{
{
Group: "networking.istio.io",
Version: "v1beta1",
Kind: "Sidecar",
},
},
EventChan: make(chan trigger.WatcherEvent),
DynamicClient: env.DynamicClient,
},
PrometheusSourceConfig: prometheusSourceConfig,
},
EnableTickerProducer: true,
TickerProducerConfig: metric.TickerProducerConfig{
Name: "lazyload-ticker",
MetricChan: make(chan metric.Metric),
TickerTriggerConfig: trigger.TickerTriggerConfig{
Durations: []time.Duration{
30 * time.Second,
},
EventChan: make(chan trigger.TickerEvent),
},
PrometheusSourceConfig: prometheusSourceConfig,
},
StopChan: env.Stop,
}, nil

}

func newPrometheusSourceConfig(env bootstrap.Environment) (metric.PrometheusSourceConfig, error) {
ps := env.Config.Metric.Prometheus
if ps == nil {
return metric.PrometheusSourceConfig{}, stderrors.New("failure create prometheus client, empty prometheus config")
}
promClient, err := prometheusApi.NewClient(prometheusApi.Config{
Address: ps.Address,
RoundTripper: nil,
})
if err != nil {
return metric.PrometheusSourceConfig{}, err
}

return metric.PrometheusSourceConfig{
Api: prometheusV1.NewAPI(promClient),
}, nil
}

func (r *ServicefenceReconciler) refreshSidecar(instance *lazyloadv1alpha1.ServiceFence) error {
log := log.WithField("reporter", "ServicefenceReconciler").WithField("function", "refreshSidecar")
sidecar, err := newSidecar(instance, r.env)
if err != nil {
log.Errorf("servicefence generate sidecar failed, %+v", err)
Expand Down Expand Up @@ -184,7 +325,7 @@ func (r *ServicefenceReconciler) refreshSidecar(instance *lazyloadv1alpha1.Servi
nsName, rev, sfRev)
} else {
if !reflect.DeepEqual(found.Spec, sidecar.Spec) {
log.Infof("Update a Sidecarin %s:%s", sidecar.Namespace, sidecar.Name)
log.Infof("Update a Sidecar in %s:%s", sidecar.Namespace, sidecar.Name)
sidecar.ResourceVersion = found.ResourceVersion
err = r.Client.Update(context.TODO(), sidecar)
if err != nil {
Expand Down Expand Up @@ -373,7 +514,7 @@ func (r *ServicefenceReconciler) updateVisitedHostStatus(host *lazyloadv1alpha1.
return delta
}

func newSidecar(vhost *lazyloadv1alpha1.ServiceFence, env *bootstrap.Environment) (*v1alpha3.Sidecar, error) {
func newSidecar(vhost *lazyloadv1alpha1.ServiceFence, env bootstrap.Environment) (*v1alpha3.Sidecar, error) {
host := make([]string, 0)
if !vhost.Spec.Enable {
return nil, nil
Expand All @@ -385,6 +526,9 @@ func newSidecar(vhost *lazyloadv1alpha1.ServiceFence, env *bootstrap.Environment
}
}
}
// sort host to avoid map range random feature resulting in sidecar constant updates
sort.Strings(host)

// 需要加入一条根namespace的策略
host = append(host, env.Config.Global.IstioNamespace+"/*")
host = append(host, env.Config.Global.SlimeNamespace+"/*")
Expand Down
Loading