Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 60 additions & 6 deletions apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type FluentBitConfigSpec struct {
OutputSelector metav1.LabelSelector `json:"outputSelector,omitempty"`
// Select parser plugins
ParserSelector metav1.LabelSelector `json:"parserSelector,omitempty"`
// Select multiline parser plugins
MultilineParserSelector metav1.LabelSelector `json:"multilineParserSelector,omitempty"`
// If namespace is defined, then the configmap and secret for fluent-bit is in this namespace.
// If it is not defined, it is in the namespace of the fluentd-operator
Namespace *string `json:"namespace,omitempty"`
Expand Down Expand Up @@ -101,10 +103,12 @@ type Service struct {
LogLevel string `json:"logLevel,omitempty"`
// Optional 'parsers' config file (can be multiple)
ParsersFile string `json:"parsersFile,omitempty"`
// backward compatible
ParsersFiles []string `json:"parsersFiles,omitempty"`
// Configure a global environment for the storage layer in Service. It is recommended to configure the volume and volumeMount separately for this storage. The hostPath type should be used for that Volume in Fluentbit daemon set.
Storage *Storage `json:"storage,omitempty"`
// Per-namespace re-emitter configuration
EmitterName string `json:"emitterName,omitempty"`
EmitterName string `json:"emitterName,omitempty"`
EmitterMemBufLimit string `json:"emitterMemBufLimit,omitempty"`
EmitterStorageType string `json:"emitterStorageType,omitempty"`
}
Expand Down Expand Up @@ -184,6 +188,11 @@ func (s *Service) Params() *params.KVs {
m.Insert("Parsers_File", s.ParsersFile)
}
}
if len(s.ParsersFiles) != 0 {
for _, parserFile := range s.ParsersFiles {
m.Insert("Parsers_File", parserFile)
}
}
if s.Storage != nil {
if s.Storage.Path != "" {
m.Insert("storage.path", s.Storage.Path)
Expand All @@ -210,8 +219,10 @@ func (s *Service) Params() *params.KVs {
return m
}

func (cfg ClusterFluentBitConfig) RenderMainConfig(sl plugins.SecretLoader, inputs ClusterInputList, filters ClusterFilterList,
outputs ClusterOutputList, nsFilterLists []FilterList, nsOutputLists []OutputList, rewriteTagConfigs []string) (string, error) {
func (cfg ClusterFluentBitConfig) RenderMainConfig(
sl plugins.SecretLoader, inputs ClusterInputList, filters ClusterFilterList,
outputs ClusterOutputList, nsFilterLists []FilterList, nsOutputLists []OutputList, rewriteTagConfigs []string,
) (string, error) {
var buf bytes.Buffer

// The Service defines the global behaviour of the Fluent Bit engine.
Expand Down Expand Up @@ -289,8 +300,10 @@ func (cfg ClusterFluentBitConfig) RenderMainConfig(sl plugins.SecretLoader, inpu
return buf.String(), nil
}

func (cfg ClusterFluentBitConfig) RenderParserConfig(sl plugins.SecretLoader, parsers ClusterParserList, nsParserLists []ParserList,
nsClusterParserLists []ClusterParserList) (string, error) {
func (cfg ClusterFluentBitConfig) RenderParserConfig(
sl plugins.SecretLoader, parsers ClusterParserList, nsParserLists []ParserList,
nsClusterParserLists []ClusterParserList,
) (string, error) {
var buf bytes.Buffer
existingParsers := make(map[string]bool)
parserSections, err := parsers.Load(sl, existingParsers)
Expand Down Expand Up @@ -326,6 +339,45 @@ func (cfg ClusterFluentBitConfig) RenderParserConfig(sl plugins.SecretLoader, pa
return buf.String(), nil
}

func (cfg ClusterFluentBitConfig) RenderMultilineParserConfig(
sl plugins.SecretLoader, multilineParsers ClusterMultilineParserList, nsMultilineParserLists []MultilineParserList,
nsClusterMultilineParserLists []ClusterMultilineParserList,
) (string, error) {
var buf bytes.Buffer

multilineParserSection, err := multilineParsers.Load(sl)
if err != nil {
return "", err
}

buf.WriteString(multilineParserSection)

for _, nsmp := range nsMultilineParserLists {
if len(nsmp.Items) == 0 {
continue
}
if nsmp.Items != nil {
ns := nsmp.Items[0].Namespace
namespacedSl := plugins.NewSecretLoader(sl.Client, ns)
nsmpSection, err := nsmp.Load(namespacedSl)
if err != nil {
return "", err
}
buf.WriteString(nsmpSection)
}
}

for _, nscmp := range nsClusterMultilineParserLists {
nscmpSection, err := nscmp.Load(sl)
if err != nil {
return "", err
}
buf.WriteString(nscmpSection)
}

return buf.String(), nil
}

// +kubebuilder:object:generate:=false

type Script struct {
Expand All @@ -342,7 +394,9 @@ func (a ByName) Len() int { return len(a) }
func (a ByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByName) Less(i, j int) bool { return a[i].Name < a[j].Name }

func (cfg ClusterFluentBitConfig) RenderLuaScript(cl plugins.ConfigMapLoader, filters ClusterFilterList, namespace string) ([]Script, error) {
func (cfg ClusterFluentBitConfig) RenderLuaScript(
cl plugins.ConfigMapLoader, filters ClusterFilterList, namespace string,
) ([]Script, error) {

scripts := make([]Script, 0)
for _, f := range filters.Items {
Expand Down
Loading