Skip to content
This repository was archived by the owner on Nov 1, 2022. It is now read-only.

Make Kubernetes resources exclusion configurable #2749

Merged
merged 4 commits into from
Jan 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ func main() {
k8sNamespaceWhitelist = fs.StringSlice("k8s-namespace-whitelist", []string{}, "restrict the view of the cluster to the namespaces listed. All namespaces are included if this is not set")
k8sAllowNamespace = fs.StringSlice("k8s-allow-namespace", []string{}, "restrict all operations to the provided namespaces")
k8sDefaultNamespace = fs.String("k8s-default-namespace", "", "the namespace to use for resources where a namespace is not specified")

k8sVerbosity = fs.Int("k8s-verbosity", 0, "klog verbosity level")
k8sExcludeResource = fs.StringSlice("k8s-exclude-resource", []string{"*metrics.k8s.io/*", "webhook.certmanager.k8s.io/*", "v1/Event"}, "do not attempt to obtain cluster resources whose group/version/kind matches these glob expressions")
k8sVerbosity = fs.Int("k8s-verbosity", 0, "klog verbosity level")

// SSH key generation
sshKeyBits = optionalVar(fs, &ssh.KeyBitsValue{}, "ssh-keygen-bits", "-b argument to ssh-keygen (default unspecified)")
Expand Down Expand Up @@ -505,7 +505,7 @@ func main() {
for _, n := range append(*k8sNamespaceWhitelist, *k8sAllowNamespace...) {
allowedNamespaces[n] = struct{}{}
}
k8sInst := kubernetes.NewCluster(client, kubectlApplier, sshKeyRing, logger, allowedNamespaces, *registryExcludeImage)
k8sInst := kubernetes.NewCluster(client, kubectlApplier, sshKeyRing, logger, allowedNamespaces, *registryExcludeImage, *k8sExcludeResource)
k8sInst.GC = *syncGC
k8sInst.DryGC = *dryGC

Expand Down
1 change: 1 addition & 0 deletions docs/references/daemon.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Version controlling of cluster manifests provides reproducibility and a historic
| **k8s configuration**
| --k8s-allow-namespace | | restrict all operations to the provided namespaces
| --k8s-default-namespace | | the namespace to use for resources where a namespace is not specified
| --k8s-exclude-resource | `["*metrics.k8s.io/*", "webhook.certmanager.k8s.io/*", "v1/Event"]` | do not attempt to obtain cluster resources whose group/version/kind matches these glob expressions, e.g. `coordination.k8s.io/v1beta1/Lease`, `coordination.k8s.io/*/Lease` or `coordination.k8s.io/*`
| **upstream service**
| --connect | | connect to an upstream service e.g., Weave Cloud, at this base address
| --token | | authentication token for upstream service
Expand Down
22 changes: 12 additions & 10 deletions pkg/cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,22 @@ type Cluster struct {
allowedNamespaces map[string]struct{}
loggedAllowedNS map[string]bool // to keep track of whether we've logged a problem with seeing an allowed namespace

imageExcludeList []string
mu sync.Mutex
imageExcludeList []string
resourceExcludeList []string
mu sync.Mutex
}

// NewCluster returns a usable cluster.
func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing, logger log.Logger, allowedNamespaces map[string]struct{}, imageExcludeList []string) *Cluster {
func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing, logger log.Logger, allowedNamespaces map[string]struct{}, imageExcludeList []string, resourceExcludeList []string) *Cluster {
c := &Cluster{
client: client,
applier: applier,
logger: logger,
sshKeyRing: sshKeyRing,
allowedNamespaces: allowedNamespaces,
loggedAllowedNS: map[string]bool{},
imageExcludeList: imageExcludeList,
client: client,
applier: applier,
logger: logger,
sshKeyRing: sshKeyRing,
allowedNamespaces: allowedNamespaces,
loggedAllowedNS: map[string]bool{},
imageExcludeList: imageExcludeList,
resourceExcludeList: resourceExcludeList,
}

return c
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func testGetAllowedNamespaces(t *testing.T, namespace []string, expected []strin
for _, n := range namespace {
allowedNamespaces[n] = struct{}{}
}
c := NewCluster(client, nil, nil, log.NewNopLogger(), allowedNamespaces, []string{})
c := NewCluster(client, nil, nil, log.NewNopLogger(), allowedNamespaces, []string{}, []string{})

namespaces, err := c.getAllowedAndExistingNamespaces(context.Background())
if err != nil {
Expand Down
45 changes: 41 additions & 4 deletions pkg/cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/base64"
"encoding/hex"
"fmt"
"github.com/ryanuber/go-glob"
"io"
"os/exec"
"sort"
Expand Down Expand Up @@ -201,6 +202,29 @@ func (r *kuberesource) GetGCMark() string {
return r.obj.GetLabels()[gcMarkLabel]
}

func (c *Cluster) filterResources(resources *meta_v1.APIResourceList) *meta_v1.APIResourceList {
list := []meta_v1.APIResource{}
for _, apiResource := range resources.APIResources {
fullName := fmt.Sprintf("%s/%s", resources.GroupVersion, apiResource.Kind)
excluded := false
for _, exp := range c.resourceExcludeList {
if glob.Glob(exp, fullName) {
excluded = true
break
}
}
if !excluded {
list = append(list, apiResource)
}
}

return &meta_v1.APIResourceList{
TypeMeta: resources.TypeMeta,
GroupVersion: resources.GroupVersion,
APIResources: list,
}
}

func (c *Cluster) getAllowedResourcesBySelector(selector string) (map[string]*kuberesource, error) {
listOptions := meta_v1.ListOptions{}
if selector != "" {
Expand All @@ -215,11 +239,19 @@ func (c *Cluster) getAllowedResourcesBySelector(selector string) (map[string]*ku
resources := []*meta_v1.APIResourceList{}
for i := range sgs.Groups {
gv := sgs.Groups[i].PreferredVersion.GroupVersion
// exclude the *.metrics.k8s.io resources to avoid querying the cluster metrics
if sgs.Groups[i].Name != "metrics.k8s.io" && !strings.HasSuffix(sgs.Groups[i].Name, ".metrics.k8s.io") {

excluded := false
for _, exp := range c.resourceExcludeList {
if glob.Glob(exp, fmt.Sprintf("%s/", gv)) {
excluded = true
break
}
}

if !excluded {
if r, err := c.client.discoveryClient.ServerResourcesForGroupVersion(gv); err == nil {
if r != nil {
resources = append(resources, r)
resources = append(resources, c.filterResources(r))
}
} else {
// ignore errors for resources with empty group version instead of failing to sync
Expand Down Expand Up @@ -271,7 +303,12 @@ func (c *Cluster) getAllowedResourcesBySelector(selector string) (map[string]*ku
if itemDesc == "v1:ComponentStatus" || itemDesc == "v1:Endpoints" {
continue
}
// TODO(michael) also exclude anything that has an ownerReference (that isn't "standard"?)

// exclude anything that has an ownerReference
owners := item.GetOwnerReferences()
if owners != nil && len(owners) > 0 {
continue
}

res := &kuberesource{obj: &list[i], namespaced: apiResource.Namespaced}
result[res.ResourceID().String()] = res
Expand Down
12 changes: 9 additions & 3 deletions pkg/cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,10 @@ func setup(t *testing.T) (*Cluster, *fakeApplier, func()) {
clients, cancel := fakeClients()
applier := &fakeApplier{dynamicClient: clients.dynamicClient, coreClient: clients.coreClient, defaultNS: defaultTestNamespace}
kube := &Cluster{
applier: applier,
client: clients,
logger: log.NewLogfmtLogger(os.Stdout),
applier: applier,
client: clients,
logger: log.NewLogfmtLogger(os.Stdout),
resourceExcludeList: []string{"*metrics.k8s.io/*", "webhook.certmanager.k8s.io/v1beta1/*"},
}
return kube, applier, cancel
}
Expand Down Expand Up @@ -307,6 +308,11 @@ func TestSyncTolerateMetricsErrors(t *testing.T) {
fakeClient.Resources = []*metav1.APIResourceList{{GroupVersion: "custom.metrics.k8s.io/v1"}}
err = kube.Sync(cluster.SyncSet{})
assert.NoError(t, err)

kube.client.discoveryClient.(*cachedDiscovery).CachedDiscoveryInterface.Invalidate()
fakeClient.Resources = []*metav1.APIResourceList{{GroupVersion: "webhook.certmanager.k8s.io/v1beta1"}}
err = kube.Sync(cluster.SyncSet{})
assert.NoError(t, err)
}

func TestSync(t *testing.T) {
Expand Down