Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/doc-gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
const (
firstParagraph = `# API Docs
This Document documents the types introduced by the %s Operator.
> Note this document is generated from code comments. When contributing a change to this document please do so by changing the code comments.`
> Note this document is generated from code comments.
> When contributing a change to this document please do so by changing the code comments.`

fluentbitPluginPath = "apis/fluentbit/v1alpha2/plugins/"
fluentdPluginPath = "apis/fluentd/v1alpha1/plugins/"
Expand Down
8 changes: 7 additions & 1 deletion controllers/fluent_controller_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,13 @@ func (r *FluentdReconciler) delete(ctx context.Context, fd *fluentdv1alpha1.Flue
func (r *FluentdReconciler) mutate(obj client.Object, fd *fluentdv1alpha1.Fluentd) controllerutil.MutateFn {
switch o := obj.(type) {
case *rbacv1.ClusterRole:
expected, _, _ := operator.MakeRBACObjects(fd.Name, fd.Namespace, "fluentd", fd.Spec.RBACRules, fd.Spec.ServiceAccountAnnotations)
expected, _, _ := operator.MakeRBACObjects(
fd.Name,
fd.Namespace,
"fluentd",
fd.Spec.RBACRules,
fd.Spec.ServiceAccountAnnotations,
)

return func() error {
o.Rules = expected.Rules
Expand Down
27 changes: 23 additions & 4 deletions controllers/fluentbitconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ func listNamespacedResources[T client.ObjectList](
if err != nil {
return err
}
if err := cli.List(ctx, list, client.InNamespace(namespace), client.MatchingLabelsSelector{Selector: sel}); err != nil {
matchingLabelSelector := client.MatchingLabelsSelector{Selector: sel}
if err := cli.List(ctx, list, client.InNamespace(namespace), matchingLabelSelector); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -323,15 +324,33 @@ func (r *FluentBitConfigReconciler) ListNamespacedResources(
return filters, outputs, parsers, clusterParsers, multipleParsers, clusterMultipleParsers, err
}

if err := listNamespacedResources(ctx, r.Client, &clusterParsers, cfg.Namespace, &cfg.Spec.ClusterParserSelector); err != nil {
if err := listNamespacedResources(
ctx,
r.Client,
&clusterParsers,
cfg.Namespace,
&cfg.Spec.ClusterParserSelector,
); err != nil {
return filters, outputs, parsers, clusterParsers, multipleParsers, clusterMultipleParsers, err
}

if err := listNamespacedResources(ctx, r.Client, &multipleParsers, cfg.Namespace, &cfg.Spec.MultilineParserSelector); err != nil {
if err := listNamespacedResources(
ctx,
r.Client,
&multipleParsers,
cfg.Namespace,
&cfg.Spec.MultilineParserSelector,
); err != nil {
return filters, outputs, parsers, clusterParsers, multipleParsers, clusterMultipleParsers, err
}

if err := listNamespacedResources(ctx, r.Client, &clusterMultipleParsers, cfg.Namespace, &cfg.Spec.ClusterMultilineParserSelector); err != nil {
if err := listNamespacedResources(
ctx,
r.Client,
&clusterMultipleParsers,
cfg.Namespace,
&cfg.Spec.ClusterMultilineParserSelector,
); err != nil {
return filters, outputs, parsers, clusterParsers, multipleParsers, clusterMultipleParsers, err
}

Expand Down
8 changes: 7 additions & 1 deletion controllers/fluentd_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,13 @@ func (r *FluentdReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

// Install RBAC resources for the filter plugin kubernetes
cr, sa, crb := operator.MakeRBACObjects(fd.Name, fd.Namespace, fluentdLowercase, fd.Spec.RBACRules, fd.Spec.ServiceAccountAnnotations)
cr, sa, crb := operator.MakeRBACObjects(
fd.Name,
fd.Namespace,
fluentdLowercase,
fd.Spec.RBACRules,
fd.Spec.ServiceAccountAnnotations,
)
// Deploy Fluentd ClusterRole
if _, err := controllerutil.CreateOrPatch(ctx, r.Client, cr, r.mutate(cr, &fd)); err != nil {
return ctrl.Result{}, err
Expand Down
100 changes: 84 additions & 16 deletions controllers/fluentdconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,28 +131,47 @@ func (r *FluentdConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// gpr acts as a global resource to store the related plugin resources
gpr := fluentdv1alpha1.NewGlobalPluginResources("main")

// Each cluster/namespace scope fluentd configs will generate their own filters/outputs plugins with their own cfgId/cfgLabel,
// Each cluster/namespace scope fluentd configs will generate their own filters/outputs plugins
// with their own cfgId/cfgLabel,
// and they will finally be combined into one fluentd config file
gpr.CombineGlobalInputsPlugins(sl, fd.Spec.GlobalInputs)

// Default Output and filter
// list all namespaced CRs
inputs, filters, outputs, err := r.ListNamespacedLevelResources(ctx, fd.Namespace, fd.Spec.DefaultInputSelector, fd.Spec.DefaultFilterSelector, fd.Spec.DefaultOutputSelector)
inputs, filters, outputs, err := r.ListNamespacedLevelResources(
ctx,
fd.Namespace,
fd.Spec.DefaultInputSelector,
fd.Spec.DefaultFilterSelector,
fd.Spec.DefaultOutputSelector,
)
if err != nil {
r.Log.Info("List namespace level resources failed", "config", "default", "err", err.Error())
return ctrl.Result{}, err
}
if len(inputs) > 0 || len(filters) > 0 || len(outputs) > 0 {
// Combine the namespaced filter/output pluginstores in this fluentd config
cfgResouces, errs := gpr.PatchAndFilterNamespacedLevelResources(sl, fmt.Sprintf("%s-%s-%s", fd.Kind, fd.Namespace, fd.Name), inputs, filters, outputs)
cfgResouces, errs := gpr.PatchAndFilterNamespacedLevelResources(
sl,
fmt.Sprintf("%s-%s-%s", fd.Kind, fd.Namespace, fd.Name),
inputs,
filters,
outputs,
)
if len(errs) > 0 {
r.Log.Info("Patch and filter namespace level resources failed", "config", "default", "err", strings.Join(errs, ","))
return ctrl.Result{}, errors.New(strings.Join(errs, ","))
}

err = gpr.IdentifyCopyAndPatchOutput(cfgResouces)
if err != nil {
r.Log.Info("IdentifyCopy and PatchOutput namespace level resources failed", "config", "default", "err", strings.Join(errs, ","))
r.Log.Info(
"IdentifyCopy and PatchOutput namespace level resources failed",
"config",
"default",
"err",
strings.Join(errs, ","),
)
return ctrl.Result{}, errors.New(strings.Join(errs, ","))
}

Expand Down Expand Up @@ -246,7 +265,15 @@ func (r *FluentdConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}

r.Log.Info("Fluentd main configuration has updated", "logging-control-plane", fd.Namespace, "fd", fd.Name, "secret", secName)
r.Log.Info(
"Fluentd main configuration has updated",
"logging-control-plane",
fd.Namespace,
"fd",
fd.Name,
"secret",
secName,
)
}

return ctrl.Result{}, nil
Expand Down Expand Up @@ -287,7 +314,12 @@ func (r *FluentdConfigReconciler) ClusterCfgsForFluentd(
}

// list all cluster CRs
clusterInputs, clusterfilters, clusteroutputs, err := r.ListClusterLevelResources(ctx, cfg.Spec.ClusterInputSelector, cfg.Spec.ClusterFilterSelector, cfg.Spec.ClusterOutputSelector)
clusterInputs, clusterfilters, clusteroutputs, err := r.ListClusterLevelResources(
ctx,
cfg.Spec.ClusterInputSelector,
cfg.Spec.ClusterFilterSelector,
cfg.Spec.ClusterOutputSelector,
)
if err != nil {
r.Log.Info("List cluster level resources failed", "config", cfg.Name, "err", err.Error())
if err = r.PatchObjects(ctx, &cfg, fluentdv1alpha1.InvalidState, err.Error()); err != nil {
Expand All @@ -298,7 +330,13 @@ func (r *FluentdConfigReconciler) ClusterCfgsForFluentd(
}

// Combine the filter/output pluginstores in this fluentd config
cfgResouces, errs := gpr.PatchAndFilterClusterLevelResources(sl, cfg.GetCfgId(), clusterInputs, clusterfilters, clusteroutputs)
cfgResouces, errs := gpr.PatchAndFilterClusterLevelResources(
sl,
cfg.GetCfgId(),
clusterInputs,
clusterfilters,
clusteroutputs,
)
if len(errs) > 0 {
r.Log.Info("Patch and filter cluster level resources failed", "config", cfg.Name, "err", strings.Join(errs, ","))
if err = r.PatchObjects(ctx, &cfg, fluentdv1alpha1.InvalidState, strings.Join(errs, ", ")); err != nil {
Expand Down Expand Up @@ -336,8 +374,13 @@ func (r *FluentdConfigReconciler) ClusterCfgsForFluentd(
}

// CfgsForFluentd combines all namespaced cfgs selected by this fd
func (r *FluentdConfigReconciler) CfgsForFluentd(ctx context.Context, cfgs fluentdv1alpha1.FluentdConfigList, sl plugins.SecretLoader,
gpr *fluentdv1alpha1.PluginResources, globalCfgLabels map[string]bool) error {
func (r *FluentdConfigReconciler) CfgsForFluentd(
ctx context.Context,
cfgs fluentdv1alpha1.FluentdConfigList,
sl plugins.SecretLoader,
gpr *fluentdv1alpha1.PluginResources,
globalCfgLabels map[string]bool,
) error {

for _, cfg := range cfgs.Items {
// Build the inner router for this cfg and append it to the MainRouter
Expand All @@ -363,7 +406,12 @@ func (r *FluentdConfigReconciler) CfgsForFluentd(ctx context.Context, cfgs fluen
}

// list all cluster CRs
clusterInputs, clusterfilters, clusteroutputs, err := r.ListClusterLevelResources(ctx, cfg.Spec.ClusterInputSelector, cfg.Spec.ClusterFilterSelector, cfg.Spec.ClusterOutputSelector)
clusterInputs, clusterfilters, clusteroutputs, err := r.ListClusterLevelResources(
ctx,
cfg.Spec.ClusterInputSelector,
cfg.Spec.ClusterFilterSelector,
cfg.Spec.ClusterOutputSelector,
)
if err != nil {
r.Log.Info("List cluster level resources failed", "config", cfg.Name, "err", err.Error())
if err = r.PatchObjects(ctx, &cfg, fluentdv1alpha1.InvalidState, err.Error()); err != nil {
Expand All @@ -374,7 +422,13 @@ func (r *FluentdConfigReconciler) CfgsForFluentd(ctx context.Context, cfgs fluen
}

// list all namespaced CRs
inputs, filters, outputs, err := r.ListNamespacedLevelResources(ctx, cfg.Namespace, cfg.Spec.InputSelector, cfg.Spec.FilterSelector, cfg.Spec.OutputSelector)
inputs, filters, outputs, err := r.ListNamespacedLevelResources(
ctx,
cfg.Namespace,
cfg.Spec.InputSelector,
cfg.Spec.FilterSelector,
cfg.Spec.OutputSelector,
)
if err != nil {
r.Log.Info("List namespace level resources failed", "config", cfg.Name, "err", err.Error())
if err = r.PatchObjects(ctx, &cfg, fluentdv1alpha1.InvalidState, err.Error()); err != nil {
Expand All @@ -385,7 +439,13 @@ func (r *FluentdConfigReconciler) CfgsForFluentd(ctx context.Context, cfgs fluen
}

// Combine the cluster input/filter/output pluginstores in this fluentd config
clustercfgResouces, errs := gpr.PatchAndFilterClusterLevelResources(sl, cfg.GetCfgId(), clusterInputs, clusterfilters, clusteroutputs)
clustercfgResouces, errs := gpr.PatchAndFilterClusterLevelResources(
sl,
cfg.GetCfgId(),
clusterInputs,
clusterfilters,
clusteroutputs,
)
if len(errs) > 0 {
r.Log.Info("Patch and filter cluster level resources failed", "config", cfg.Name, "err", strings.Join(errs, ","))
if err = r.PatchObjects(ctx, &cfg, fluentdv1alpha1.InvalidState, strings.Join(errs, ", ")); err != nil {
Expand Down Expand Up @@ -533,7 +593,8 @@ func (r *FluentdConfigReconciler) ListNamespacedLevelResources(
if err != nil {
return nil, nil, nil, err
}
if err = r.List(ctx, &inputs, client.InNamespace(namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil {
matchingLabelSelector := client.MatchingLabelsSelector{Selector: selector}
if err = r.List(ctx, &inputs, client.InNamespace(namespace), matchingLabelSelector); err != nil {
return nil, nil, nil, err
}
}
Expand All @@ -545,7 +606,8 @@ func (r *FluentdConfigReconciler) ListNamespacedLevelResources(
if err != nil {
return nil, nil, nil, err
}
if err = r.List(ctx, &filters, client.InNamespace(namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil {
matchingLabelSelector := client.MatchingLabelsSelector{Selector: selector}
if err = r.List(ctx, &inputs, client.InNamespace(namespace), matchingLabelSelector); err != nil {
return nil, nil, nil, err
}
}
Expand All @@ -557,7 +619,8 @@ func (r *FluentdConfigReconciler) ListNamespacedLevelResources(
if err != nil {
return nil, nil, nil, err
}
if err = r.List(ctx, &outputs, client.InNamespace(namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil {
matchingLabelSelector := client.MatchingLabelsSelector{Selector: selector}
if err = r.List(ctx, &inputs, client.InNamespace(namespace), matchingLabelSelector); err != nil {
return nil, nil, nil, err
}
}
Expand All @@ -566,7 +629,12 @@ func (r *FluentdConfigReconciler) ListNamespacedLevelResources(
}

// PatchObjects patches the errors to the obj
func (r *FluentdConfigReconciler) PatchObjects(ctx context.Context, obj client.Object, state fluentdv1alpha1.StatusState, msg string) error {
func (r *FluentdConfigReconciler) PatchObjects(
ctx context.Context,
obj client.Object,
state fluentdv1alpha1.StatusState,
msg string,
) error {
switch o := obj.(type) {
case *fluentdv1alpha1.ClusterFluentdConfig:
o.Status.State = state
Expand Down
3 changes: 2 additions & 1 deletion pkg/filenotify/filenotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
// These are wrapped up in a common interface so that either can be used interchangeably in your code.
//
// This package is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License.
// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9
// Hopefully this can be replaced with an external package sometime in the future.
// See https://github.com/fsnotify/fsnotify/issues/9
package filenotify

import (
Expand Down
3 changes: 2 additions & 1 deletion pkg/filenotify/fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
// Copyright Hugo Authors

// Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License.
// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9
// Hopefully this can be replaced with an external package sometime in the future.
// See https://github.com/fsnotify/fsnotify/issues/9
package filenotify

import "github.com/fsnotify/fsnotify"
Expand Down
3 changes: 2 additions & 1 deletion pkg/filenotify/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
// Copyright Hugo Authors

// Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License.
// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9
// Hopefully this can be replaced with an external package sometime in the future.
// See https://github.com/fsnotify/fsnotify/issues/9
package filenotify

import (
Expand Down
Loading