Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving fluentextension to contrib repo. #2794

Merged
merged 7 commits into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extension/fluentbitextension/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
90 changes: 90 additions & 0 deletions extension/fluentbitextension/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# FluentBit Subprocess Extension

**This extension is experimental and may receive breaking changes or be removed
at any time.**

The `fluentbit` extension facilitates running a FluentBit subprocess of the
collector. You are responsible for providing a configuration to FluentBit via the `config`
config option. This will be provided to FluentBit subprocess, along with a few other
config options to enhance the integration with the collector.

There are 2 typical ways to use Fluent Bit and Collector together: side-by-side
and chained.

## Side-by-side

With this approach Collector is responsible for traces and metrics and
Fluent Bit is responsible for logs. `fluentbit` extension is used to start/stop and
provide configuration to Fluent Bit:

![Side by side](images/sidebyside.png)

### Chained

In this approach we use `fluentbit` extension in conjunction with the `fluentforward`
receiver such that the FluentBit process will be configured to send to the
TCP socket opened by the `fluentforward` receiver:

![Side by side](images/chained.png)

`fluentbit` extension does not actually listen for the logs from FluentBit,
it just starts a FluentBit subprocess that will generally send to a `fluentforward`
receiver, which must be configured separately.

The downside of the chained approach is that log data is serialized and deserialized
when going from Fluent Bit to Collector, which has performance implications. The benefit
is that all logs are passed through the Collector which allows log data to be processed
inside the Collector uniformly with traces and metrics (e.g. the same attributes added
to all 3 signals if needed) and also allows to export logs in the formats supported
by the Collector, which Fluent Bit does not necessarily support.

**As of now, this extension is only targeted for Linux environments. It does not
work on Windows or MacOS.**

Note: if you are only collecting logs and not traces or metrics it is likely simpler
to use Fluent Bit alone without Collector.

## Example Config

```yaml
extensions:
health_check:
fluentbit:
executable_path: /usr/src/fluent-bit/build/bin/fluent-bit
tcp_endpoint: 127.0.0.1:8006
config: |
[SERVICE]
parsers_file /usr/src/fluent-bit/conf/parsers.conf
[INPUT]
name tail
path /var/log/mylog
parser apache
receivers:
fluentforward:
endpoint: 0.0.0.0:8006
prometheus:
config:
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 1s
static_configs:
- targets: ['127.0.0.1:8888']
# This will connect to the Fluent Bit subprocess's built-in HTTP
# monitoring server to grab Promtheus metrics.
- job_name: 'fluentbit'
scrape_interval: 1s
metrics_path: '/api/v1/metrics/prometheus'
static_configs:
- targets: ['127.0.0.1:2020']
service:
pipelines:
logs:
receivers: [fluentforward]
processors: []
exporters: [mylogsexporter]
metrics:
receivers: [prometheus]
processors: [batch]
exporters: [mymetricsexporter]
extensions: [health_check, zpages, fluentbit, pprof]
```
50 changes: 50 additions & 0 deletions extension/fluentbitextension/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fluentbitextension

import (
"go.opentelemetry.io/collector/config/configmodels"
)

// Config has the configuration for the fluentbit extension.
type Config struct {
configmodels.ExtensionSettings `mapstructure:",squash"`

// The TCP `host:port` to which the subprocess should send log entries.
// This is required unless you are overridding `args` and providing the
// output configuration yourself either in `args` or `config`.
TCPEndpoint string `mapstructure:"tcp_endpoint"`

// The path to the executable for FluentBit. Ideally should be an absolute
// path since the CWD of the collector is not guaranteed to be stable.
ExecutablePath string `mapstructure:"executable_path"`

// Exec arguments to the FluentBit process. If you provide this, none of
// the standard args will be set, and only these provided args will be
// passed to FluentBit. The standard args will set the flush interval to 1
// second, configure the forward output with the given `tcp_endpoint`
// option, enable the HTTP monitoring server in FluentBit, and set the
// config file to stdin. The only required arg is `--config=/dev/stdin`,
// since this extension passes the provided config to FluentBit via stdin.
// If you set args manually, you will be responsible for setting the
// forward output to the right port for the fluentforward receiver. See
// `process.go#constructArgs` of this extension source to see the current
// default args.
Args []string `mapstructure:"args"`

// A configuration for FluentBit. This is the text content of the config
// itself, not a path to a config file.
Config string `mapstructure:"config"`
}
55 changes: 55 additions & 0 deletions extension/fluentbitextension/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fluentbitextension

import (
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Extensions[typeStr] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)

require.Nil(t, err)
require.NotNil(t, cfg)

ext0 := cfg.Extensions["fluentbit"]
assert.Equal(t, factory.CreateDefaultConfig(), ext0)

ext1 := cfg.Extensions["fluentbit/1"]
assert.Equal(t,
&Config{
ExtensionSettings: configmodels.ExtensionSettings{
TypeVal: "fluentbit",
NameVal: "fluentbit/1",
},
ExecutablePath: "/usr/local/bin/fluent-bit",
},
ext1)

assert.Equal(t, 1, len(cfg.Service.Extensions))
assert.Equal(t, "fluentbit/1", cfg.Service.Extensions[0])
}
50 changes: 50 additions & 0 deletions extension/fluentbitextension/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fluentbitextension

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/extension/extensionhelper"
)

const (
// The value of extension "type" in configuration.
typeStr = "fluentbit"
)

// NewFactory creates a factory for FluentBit extension.
func NewFactory() component.ExtensionFactory {
return extensionhelper.NewFactory(
typeStr,
createDefaultConfig,
createExtension)
}

func createDefaultConfig() configmodels.Extension {
return &Config{
ExtensionSettings: configmodels.ExtensionSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
}
}

func createExtension(_ context.Context, params component.ExtensionCreateParams, cfg configmodels.Extension) (component.Extension, error) {
config := cfg.(*Config)
return newProcessManager(config, params.Logger), nil
}
50 changes: 50 additions & 0 deletions extension/fluentbitextension/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fluentbitextension

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configmodels"
"go.uber.org/zap"
)

func TestFactory_CreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig()
assert.Equal(t, &Config{
ExtensionSettings: configmodels.ExtensionSettings{
NameVal: typeStr,
TypeVal: typeStr,
},
},
cfg)

assert.NoError(t, configcheck.ValidateConfig(cfg))
ext, err := createExtension(context.Background(), component.ExtensionCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
require.NotNil(t, ext)
}

func TestFactory_CreateExtension(t *testing.T) {
cfg := createDefaultConfig().(*Config)
ext, err := createExtension(context.Background(), component.ExtensionCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
require.NotNil(t, ext)
}
11 changes: 11 additions & 0 deletions extension/fluentbitextension/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/extension/fluentbitextension

go 1.14

require (
github.com/davecgh/go-spew v1.1.1
github.com/shirou/gopsutil v3.21.2+incompatible
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.22.1-0.20210310184601-062748d23a02
go.uber.org/zap v1.16.0
)
Loading