Skip to content

Commit

Permalink
Reload Configuration only when a pod matching the label set in mounte…
Browse files Browse the repository at this point in the history
…d-file is created or deleted (#347)

* Reload configuration only if changed pod with mounted labels (#289)
* add test for util/Match function (#289)

Signed-off-by: huskykurt <rkmwiaim1@gmail.com>
  • Loading branch information
huskykurt authored Dec 15, 2022
1 parent f6ff431 commit b021e3a
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 142 deletions.
124 changes: 104 additions & 20 deletions config-reloader/datasource/kube_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package datasource
import (
"context"
"fmt"
"github.com/vmware/kube-fluentd-operator/config-reloader/fluentd"
"github.com/vmware/kube-fluentd-operator/config-reloader/util"
"os"
"sort"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -25,14 +28,16 @@ import (
)

type kubeInformerConnection struct {
client kubernetes.Interface
hashes map[string]string
cfg *config.Config
kubeds kubedatasource.KubeDS
nslist listerv1.NamespaceLister
podlist listerv1.PodLister
cmlist listerv1.ConfigMapLister
fdlist kfoListersV1beta1.FluentdConfigLister
client kubernetes.Interface
confHashes map[string]string
mountedLabels map[string][]map[string]string
cfg *config.Config
kubeds kubedatasource.KubeDS
nslist listerv1.NamespaceLister
podlist listerv1.PodLister
cmlist listerv1.ConfigMapLister
fdlist kfoListersV1beta1.FluentdConfigLister
updateChan chan time.Time
}

// NewKubernetesInformerDatasource builds a new Datasource from the provided config.
Expand Down Expand Up @@ -101,16 +106,31 @@ func NewKubernetesInformerDatasource(ctx context.Context, cfg *config.Config, up
}
logrus.Infof("Synced local informer with upstream Kubernetes API")

return &kubeInformerConnection{
client: client,
hashes: make(map[string]string),
cfg: cfg,
kubeds: kubeds,
nslist: namespaceLister,
podlist: podLister,
cmlist: cmLister,
fdlist: fluentdconfigDSLister.Fdlist,
}, nil
kubeInfoCx := &kubeInformerConnection{
client: client,
confHashes: make(map[string]string),
mountedLabels: make(map[string][]map[string]string),
cfg: cfg,
kubeds: kubeds,
nslist: namespaceLister,
podlist: podLister,
cmlist: cmLister,
fdlist: fluentdconfigDSLister.Fdlist,
updateChan: updateChan,
}

factory.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
kubeInfoCx.handlePodChange(ctx, obj)
},
UpdateFunc: func(old, obj interface{}) {
},
DeleteFunc: func(obj interface{}) {
kubeInfoCx.handlePodChange(ctx, obj)
},
})

return kubeInfoCx, nil
}

// GetNamespaces queries the configured Kubernetes API to generate a list of NamespaceConfig objects.
Expand All @@ -136,6 +156,26 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac
return nil, err
}

fragment, err := fluentd.ParseString(configdata)
if err != nil {
return nil, err
}

var mountedLabels []map[string]string
for _, frag := range fragment {
if frag.Name == "source" && frag.Type() == "mounted-file" {
paramLabels := frag.Param("labels")
paramLabels = util.TrimTrailingComment(paramLabels)
currLabels, err := util.ParseTagToLabels(fmt.Sprintf("$labels(%s)", paramLabels))
if err != nil {
return nil, err
}
mountedLabels = append(mountedLabels, currLabels)
}
}

d.updateMountedLabels(ns, mountedLabels)

// Create a compact representation of the pods running in the namespace
// under consideration
pods, err := d.podlist.Pods(ns).List(labels.NewSelector())
Expand All @@ -155,7 +195,7 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac
nsconfigs = append(nsconfigs, &NamespaceConfig{
Name: ns,
FluentdConfig: configdata,
PreviousConfigHash: d.hashes[ns],
PreviousConfigHash: d.confHashes[ns],
Labels: nsobj.Labels,
MiniContainers: minis,
})
Expand All @@ -166,7 +206,11 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac

// WriteCurrentConfigHash is a setter for the hashtable maintained by this Datasource
func (d *kubeInformerConnection) WriteCurrentConfigHash(namespace string, hash string) {
d.hashes[namespace] = hash
d.confHashes[namespace] = hash
}

func (d *kubeInformerConnection) updateMountedLabels(namespace string, labels []map[string]string) {
d.mountedLabels[namespace] = labels
}

// UpdateStatus updates a namespace's status annotation with the latest result
Expand Down Expand Up @@ -239,6 +283,13 @@ func (d *kubeInformerConnection) discoverNamespaces(ctx context.Context) ([]stri
for _, cfmap := range confMapsList {
if cfmap.ObjectMeta.Name == d.cfg.DefaultConfigmapName {
namespaces = append(namespaces, cfmap.ObjectMeta.Namespace)
} else {
// We need to find configmaps that honor the global annotation for configmaps:
configMapNamespace, _ := d.nslist.Get(cfmap.ObjectMeta.Namespace)
configMapName := configMapNamespace.Annotations[d.cfg.AnnotConfigmapName]
if configMapName != "" {
namespaces = append(namespaces, cfmap.ObjectMeta.Namespace)
}
}
}
if d.cfg.CRDMigrationMode {
Expand Down Expand Up @@ -275,6 +326,39 @@ func (d *kubeInformerConnection) discoverNamespaces(ctx context.Context) ([]stri
return nsList, nil
}

func (d *kubeInformerConnection) handlePodChange(ctx context.Context, obj interface{}) {
mObj := obj.(*core.Pod)
logrus.Infof("Detected pod change %s in namespace: %s", mObj.GetName(), mObj.GetNamespace())
configdata, err := d.kubeds.GetFluentdConfig(ctx, mObj.GetNamespace())
nsConfigStr := fmt.Sprintf("%#v", configdata)

if err == nil {
if strings.Contains(nsConfigStr, "mounted-file") {
podLabels := mObj.GetLabels()
mountedLabel := d.mountedLabels[mObj.GetNamespace()]
for _, container := range mObj.Spec.Containers {
if matchAny(podLabels, mountedLabel, container.Name) {
logrus.Infof("Detected mounted-file pod change %s in namespace: %s", mObj.GetName(), mObj.GetNamespace())
select {
case d.updateChan <- time.Now():
default:
}
}
}
}
}
}

func matchAny(contLabels map[string]string, mountedLabelsInNs []map[string]string, name string) bool {
for _, mountedLabels := range mountedLabelsInNs {
if util.Match(mountedLabels, contLabels, name) {
return true
}
}

return false
}

func (d *kubeInformerConnection) discoverFluentdConfigNamespaces() ([]string, error) {
if d.fdlist == nil {
return nil, fmt.Errorf("Failed to initialize the fluentdconfig crd client, d.fclient = nil")
Expand Down
69 changes: 7 additions & 62 deletions config-reloader/processors/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package processors

import (
"bytes"
"errors"
"fmt"
"reflect"
"regexp"
Expand All @@ -16,11 +15,6 @@ import (
"github.com/vmware/kube-fluentd-operator/config-reloader/util"
)

const (
macroLabels = "$labels"
containerLabel = "_container"
)

type expandLabelsMacroState struct {
BaseProcessorState
}
Expand All @@ -33,9 +27,6 @@ var reSafe = regexp.MustCompile(`[.-]|^$`)
// an alphanumeric character (e.g. 'MyValue', or 'my_value', or '12345', regex used for validation is
// '(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?'

var reValidLabelName = regexp.MustCompile(`^([A-Za-z0-9][-A-Za-z0-9\/_.]*)?[A-Za-z0-9]$`)
var reValidLabelValue = regexp.MustCompile(`^(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?$`)

var fns = template.FuncMap{
"last": func(x int, a interface{}) bool {
return x == reflect.ValueOf(a).Len()-1
Expand Down Expand Up @@ -67,56 +58,10 @@ var retagTemplate = template.Must(template.New("retagTemplate").Funcs(fns).Parse
</filter>
`))

func parseTagToLabels(tag string) (map[string]string, error) {
if !strings.HasPrefix(tag, macroLabels+"(") &&
!strings.HasSuffix(tag, ")") {
return nil, fmt.Errorf("bad $labels macro use: %s", tag)
}

labelsOnly := tag[len(macroLabels)+1 : len(tag)-1]

result := map[string]string{}

records := strings.Split(labelsOnly, ",")
for _, rec := range records {
if rec == "" {
// be generous
continue
}
kv := strings.Split(rec, "=")
if len(kv) != 2 {
return nil, fmt.Errorf("bad label definition: %s", kv)
}

k := util.Trim(kv[0])
if k != containerLabel {
if !reValidLabelName.MatchString(k) {
return nil, fmt.Errorf("bad label name: %s", k)
}
}

v := util.Trim(kv[1])
if !reValidLabelValue.MatchString(v) {
return nil, fmt.Errorf("bad label value: %s", v)
}
if k == containerLabel && v == "" {
return nil, fmt.Errorf("value for %s cannot be empty string", containerLabel)
}

result[k] = v
}

if len(result) == 0 {
return nil, errors.New("at least one label must be given")
}

return result, nil
}

func makeTagFromFilter(ns string, sortedLabelNames []string, labelNames map[string]string) string {
buf := &bytes.Buffer{}

if cont, ok := labelNames[containerLabel]; ok {
if cont, ok := labelNames[util.ContainerLabel]; ok {
// if the special label _container is used then its name goes to the
// part of the tag that denotes the container
buf.WriteString(fmt.Sprintf("kube.%s.*.%s._labels.", ns, cont))
Expand All @@ -125,7 +70,7 @@ func makeTagFromFilter(ns string, sortedLabelNames []string, labelNames map[stri
}

for i, lb := range sortedLabelNames {
if lb == containerLabel {
if lb == util.ContainerLabel {
continue
}

Expand Down Expand Up @@ -157,11 +102,11 @@ func (p *expandLabelsMacroState) Process(input fluentd.Fragment) (fluentd.Fragme
return nil
}

if !strings.HasPrefix(d.Tag, macroLabels) {
if !strings.HasPrefix(d.Tag, util.MacroLabels) {
return nil
}

labelNames, err := parseTagToLabels(d.Tag)
labelNames, err := util.ParseTagToLabels(d.Tag)
if err != nil {
return err
}
Expand All @@ -180,19 +125,19 @@ func (p *expandLabelsMacroState) Process(input fluentd.Fragment) (fluentd.Fragme
return input, nil
}

delete(allReferencedLabels, containerLabel)
delete(allReferencedLabels, util.ContainerLabel)
sortedLabelNames := util.SortedKeys(allReferencedLabels)

replaceLabels := func(d *fluentd.Directive, ctx *ProcessorContext) error {
if d.Name != "filter" && d.Name != "match" {
return nil
}

if !strings.HasPrefix(d.Tag, macroLabels) {
if !strings.HasPrefix(d.Tag, util.MacroLabels) {
return nil
}

labelNames, err := parseTagToLabels(d.Tag)
labelNames, err := util.ParseTagToLabels(d.Tag)
if err != nil {
// should never happen as the error should be caught beforehand
return nil
Expand Down
42 changes: 0 additions & 42 deletions config-reloader/processors/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestLabelsParseOk(t *testing.T) {
inputs := map[string]map[string]string{
"$labels(a=b,,,)": {"a": "b"},
"$labels(a=1, b=2)": {"a": "1", "b": "2"},
"$labels(x=y,b=1)": {"b": "1", "x": "y"},
"$labels(x=1, b = 1)": {"b": "1", "x": "1"},
"$labels(x=1, a=)": {"a": "", "x": "1"},
"$labels(hello/world=ok, a=value)": {"hello/world": "ok", "a": "value"},
"$labels(x=1, _container=main)": {"_container": "main", "x": "1"},
}

for tag, result := range inputs {
processed, err := parseTagToLabels(tag)
assert.Nil(t, err, "Got an error instead: %+v", err)
assert.Equal(t, result, processed)
}
}

func TestSafeLabel(t *testing.T) {
// empty string is a valid label value
assert.Equal(t, "_", safeLabelValue(""))
Expand All @@ -41,30 +23,6 @@ func TestSafeLabel(t *testing.T) {
assert.Equal(t, "app_kubernetes_io/name=nginx_ingress", safeLabelValue("app.kubernetes.io/name=nginx-ingress"))
}

func TestLabelsParseNotOk(t *testing.T) {
inputs := []string{
"$labels",
"$labels()",
"$labels(=)",
"$labels(=f)",
"$labels(.=*)",
"$labels(a=.)",
"$labels(a==1)",
"$labels(-a=sfd)",
"$labels(a=-sfd)",
"$labels(a*=hello)",
"$labels(a=*)",
"$labels(a=1, =2)",
"$labels(_container=)", // empty container name
"$labels(app.kubernetes.io/name=*)",
}

for _, tag := range inputs {
res, err := parseTagToLabels(tag)
assert.NotNil(t, err, "Got this instead for %s: %+v", tag, res)
}
}

func TestLabelNoLabels(t *testing.T) {
s := `
<filter **>
Expand Down
Loading

0 comments on commit b021e3a

Please sign in to comment.