Skip to content

Commit

Permalink
fix: react to pod events to regen preprocess conf
Browse files Browse the repository at this point in the history
 - add kube pod informer for pods add/rm in namespace we monitor, rerun preprocess configs
 - fixes vmware#289
 - fixes vmware#253
 - fixes issue with respecting `c.cfg.AnnotConfigmapName` on configmap namespaces discovery
 - introduces `hashstructure` package for smarter hashing of changing objects
 - make smarter hashing for `allConfigsHash` metahashing (without `reflect.DeepEqual()`)

Signed-off-by: Anton Ouzounov <aouzounov@vmware.com>
  • Loading branch information
Anton Ouzounov committed Nov 24, 2021
1 parent cce2e01 commit 8bec2a3
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 34 deletions.
24 changes: 15 additions & 9 deletions config-reloader/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ import (
"github.com/vmware/kube-fluentd-operator/config-reloader/datasource"
"github.com/vmware/kube-fluentd-operator/config-reloader/fluentd"
"github.com/vmware/kube-fluentd-operator/config-reloader/generator"
"github.com/vmware/kube-fluentd-operator/config-reloader/util"

"github.com/sirupsen/logrus"
)

type Controller struct {
Updater Updater
OutputDir string
Reloader *fluentd.Reloader
Datasource datasource.Datasource
Generator *generator.Generator
NumTotalConfigNS int
Updater Updater
OutputDir string
Reloader *fluentd.Reloader
Datasource datasource.Datasource
Generator *generator.Generator
AllConfigsHash uint64
}

func (c *Controller) Run(ctx context.Context, stop <-chan struct{}) {
Expand Down Expand Up @@ -104,15 +105,20 @@ func (c *Controller) RunOnce(ctx context.Context) error {

if newHash != nsConfig.PreviousConfigHash {
needsReload = true
logrus.Debugf("Previous Config hash for ns %s is %v", nsConfig.Name, nsConfig.PreviousConfigHash)
logrus.Debugf("New Config hash for ns %s is %v", nsConfig.Name, newHash)
c.Datasource.WriteCurrentConfigHash(nsConfig.Name, newHash)
}
}

// lastly, if number of configs has changed, then need to reload configurations obviously!
// lastly, if number of all configs has changed, then need to reload configurations obviously!
// this means a crd was deleted or reapplied, and GetNamespaces does not return it anymore
if c.NumTotalConfigNS != len(allConfigNamespaces) {
// metahashing, hashing the object of hashes :)
allConfigsHash, _ := util.MakeStructureHash(configHashes)
if c.AllConfigsHash != allConfigsHash {
needsReload = true
c.NumTotalConfigNS = len(allConfigNamespaces)
c.AllConfigsHash = allConfigsHash
logrus.Debugf("All Configs hash for all KFO is %v", c.AllConfigsHash)
}

if needsReload {
Expand Down
6 changes: 3 additions & 3 deletions config-reloader/datasource/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var template = `
`

type fakeDatasource struct {
hashes map[string]string
confHashes map[string]string
}

func makeFakeConfig(namespace string) string {
Expand Down Expand Up @@ -59,7 +59,7 @@ func (d *fakeDatasource) GetNamespaces(ctx context.Context) ([]*NamespaceConfig,
}

func (d *fakeDatasource) WriteCurrentConfigHash(namespace string, hash string) {
d.hashes[namespace] = hash
d.confHashes[namespace] = hash
}

func (d *fakeDatasource) UpdateStatus(ctx context.Context, namespace string, status string) {
Expand All @@ -69,6 +69,6 @@ func (d *fakeDatasource) UpdateStatus(ctx context.Context, namespace string, sta
// NewFakeDatasource returns a predefined set of namespaces + configs
func NewFakeDatasource(ctx context.Context) Datasource {
return &fakeDatasource{
hashes: make(map[string]string),
confHashes: make(map[string]string),
}
}
8 changes: 4 additions & 4 deletions config-reloader/datasource/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

type fsDatasource struct {
hashes map[string]string
confHashes map[string]string
rootDir string
statusOutputDir string
}
Expand All @@ -41,7 +41,7 @@ func (d *fsDatasource) GetNamespaces(ctx context.Context) ([]*NamespaceConfig, e
cfg := &NamespaceConfig{
Name: ns,
FluentdConfig: string(contents),
PreviousConfigHash: d.hashes[ns],
PreviousConfigHash: d.confHashes[ns],
}

logrus.Infof("Loading namespace %s from file %s", ns, f)
Expand All @@ -52,7 +52,7 @@ func (d *fsDatasource) GetNamespaces(ctx context.Context) ([]*NamespaceConfig, e
}

func (d *fsDatasource) WriteCurrentConfigHash(namespace string, hash string) {
d.hashes[namespace] = hash
d.confHashes[namespace] = hash
}

func (d *fsDatasource) UpdateStatus(ctx context.Context, namespace string, status string) {
Expand All @@ -67,7 +67,7 @@ func (d *fsDatasource) UpdateStatus(ctx context.Context, namespace string, statu
// NewFileSystemDatasource turns all files matching *.conf patter in the given dir into namespace configs
func NewFileSystemDatasource(ctx context.Context, rootDir string, statusOutputDir string) Datasource {
return &fsDatasource{
hashes: make(map[string]string),
confHashes: make(map[string]string),
rootDir: rootDir,
statusOutputDir: statusOutputDir,
}
Expand Down
60 changes: 42 additions & 18 deletions config-reloader/datasource/kube_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
)

type kubeInformerConnection struct {
client kubernetes.Interface
hashes map[string]string
cfg *config.Config
kubeds kubedatasource.KubeDS
nslist listerv1.NamespaceLister
podlist listerv1.PodLister
cmlist listerv1.ConfigMapLister
fdlist kfoListersV1beta1.FluentdConfigLister
client kubernetes.Interface
confHashes map[string]string
cfg *config.Config
kubeds kubedatasource.KubeDS
nslist listerv1.NamespaceLister
podlist listerv1.PodLister
cmlist listerv1.ConfigMapLister
fdlist kfoListersV1beta1.FluentdConfigLister
}

// GetNamespaces queries the configured Kubernetes API to generate a list of NamespaceConfig objects.
Expand Down Expand Up @@ -77,7 +77,7 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac
nsconfigs = append(nsconfigs, &NamespaceConfig{
Name: ns,
FluentdConfig: configdata,
PreviousConfigHash: d.hashes[ns],
PreviousConfigHash: d.confHashes[ns],
Labels: nsobj.Labels,
MiniContainers: minis,
})
Expand All @@ -88,7 +88,7 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac

// WriteCurrentConfigHash is a setter for the hashtable maintained by this Datasource
func (d *kubeInformerConnection) WriteCurrentConfigHash(namespace string, hash string) {
d.hashes[namespace] = hash
d.confHashes[namespace] = hash
}

// UpdateStatus updates a namespace's status annotation with the latest result
Expand Down Expand Up @@ -168,6 +168,13 @@ func (d *kubeInformerConnection) discoverNamespaces(ctx context.Context) ([]stri
for _, cfmap := range confMapsList {
if cfmap.ObjectMeta.Name == d.cfg.DefaultConfigmapName {
namespaces = append(namespaces, cfmap.ObjectMeta.Namespace)
} else {
// We need to find configmaps that honor the global annotation for configmaps:
configMapNamespace, _ := d.nslist.Get(cfmap.ObjectMeta.Namespace)
configMapName := configMapNamespace.Annotations[d.cfg.AnnotConfigmapName]
if configMapName != "" {
namespaces = append(namespaces, cfmap.ObjectMeta.Namespace)
}
}
}
} else {
Expand Down Expand Up @@ -253,6 +260,23 @@ func NewKubernetesInformerDatasource(ctx context.Context, cfg *config.Config, up
}
}

factory.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(new interface{}) {
select {
case updateChan <- time.Now():
default:
}
},
UpdateFunc: func(old, new interface{}) {
},
DeleteFunc: func(new interface{}) {
select {
case updateChan <- time.Now():
default:
}
},
})

factory.Start(nil)
if !cache.WaitForCacheSync(nil,
factory.Core().V1().Namespaces().Informer().HasSynced,
Expand All @@ -264,13 +288,13 @@ func NewKubernetesInformerDatasource(ctx context.Context, cfg *config.Config, up
logrus.Infof("Synced local informer with upstream Kubernetes API")

return &kubeInformerConnection{
client: client,
hashes: make(map[string]string),
cfg: cfg,
kubeds: kubeds,
nslist: namespaceLister,
podlist: podLister,
cmlist: cmLister,
fdlist: fluentdconfigDSLister.Fdlist,
client: client,
confHashes: make(map[string]string),
cfg: cfg,
kubeds: kubeds,
nslist: namespaceLister,
podlist: podLister,
cmlist: cmLister,
fdlist: fluentdconfigDSLister.Fdlist,
}, nil
}
1 change: 1 addition & 0 deletions config-reloader/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/prometheus/client_golang v1.11.0
github.com/sirupsen/logrus v1.7.0
github.com/stretchr/testify v1.6.1
github.com/mitchellh/hashstructure/v2 v2.0.2
k8s.io/api v0.21.4
k8s.io/apiextensions-apiserver v0.21.4
k8s.io/apimachinery v0.21.4
Expand Down
2 changes: 2 additions & 0 deletions config-reloader/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
Expand Down
21 changes: 21 additions & 0 deletions config-reloader/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"github.com/mitchellh/hashstructure/v2"
"io/ioutil"
"os/exec"
"sort"
Expand Down Expand Up @@ -123,3 +124,23 @@ func TrimTrailingComment(line string) string {

return line
}

func MakeStructureHash(v interface{}) (uint64, error) {
hashV, err := hashstructure.Hash(v, hashstructure.FormatV2, nil)
if err != nil {
return hashV, err
}

return hashV, nil
}

func AreStructureHashEqual(v interface{}, f interface{}) bool {
hashV, _ := hashstructure.Hash(v, hashstructure.FormatV2, nil)
hashF, _ := hashstructure.Hash(f, hashstructure.FormatV2, nil)

if hashV != 0 && hashF != 0 {
return hashV == hashF
}

return false
}
72 changes: 72 additions & 0 deletions config-reloader/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,32 @@ import (
"github.com/stretchr/testify/assert"
)

type Mount struct {
Path string
VolumeName string
SubPath string
}

// MiniContainer container subset with the parent pod's metadata
type MiniContainer struct {
// the pod id
PodID string
PodName string

Image string
ContainerID string

// pod labels
Labels map[string]string

// container name
Name string
// only the emptyDir mounts, never empty, sorted by len(Path), descending
HostMounts []*Mount

NodeName string
}

func TestMakeFluentdSafeName(t *testing.T) {
assert.Equal(t, "a", MakeFluentdSafeName("a"))
assert.Equal(t, "123", MakeFluentdSafeName("123"))
Expand Down Expand Up @@ -41,3 +67,49 @@ func TestTrimTrailingComment(t *testing.T) {
assert.Equal(t, "a", TrimTrailingComment("a"))
assert.Equal(t, "a", TrimTrailingComment("a#########"))
}

func TestMakeStructureHash(t *testing.T) {
mini1 := &MiniContainer{
PodID: "4b519aaf-67f1-4588-8164-f679b2298e25",
PodName: "kfo-log-router-nwxtj",
Name: "config-reloader",
NodeName: "vdp-dev-control-plane",
Image: "testing/kfo:delete-problems-3",
ContainerID: "containerd://37dce75ed2f01c5f858b4c4cc96b23ebacaba6569af93ed64b3904be9a676cb1",
}

hashMini1, err := MakeStructureHash(mini1)
assert.Nil(t, err)
assert.Equal(t, uint64(0xa92a93a3863f8fd6), hashMini1)
}

func TestAreStructureHashEqual(t *testing.T) {
mini1 := &MiniContainer{
PodID: "4b519aaf-67f1-4588-8164-f679b2298e25",
PodName: "kfo-log-router-nwxtj",
Name: "config-reloader",
NodeName: "vdp-dev-control-plane",
Image: "testing/kfo:delete-problems-3",
ContainerID: "containerd://37dce75ed2f01c5f858b4c4cc96b23ebacaba6569af93ed64b3904be9a676cb1",
}
mini2 := &MiniContainer{
PodID: "4b519aaf-67f1-4588-8164-f679b2298e25",
PodName: "kfo-log-router-nwxtj",
Name: "config-reloader",
NodeName: "vdp-dev-control-plane",
Image: "testing/kfo:delete-problems-3",
ContainerID: "containerd://37dce75ed2f01c5f858b4c4cc96b23ebacaba6569af93ed64b3904be9a676cb1",
}
mini3 := &MiniContainer{
PodID: "4b519aaf-67f1-4588-8164-f679b2298e25",
PodName: "kfo-log-router-next",
Name: "config-reloader",
NodeName: "vdp-dev-control-plane",
Image: "testing/kfo:delete-problems-3",
ContainerID: "containerd://37dce75ed2f01c5f858b4c4cc96b23ebacaba6569af93ed64b3904be9a676cb1",
}

assert.Equal(t, true, AreStructureHashEqual(mini1, mini2))
assert.NotEqual(t, true, AreStructureHashEqual(mini1, mini3))
assert.Equal(t, false, AreStructureHashEqual(mini1, mini3))
}

0 comments on commit 8bec2a3

Please sign in to comment.