diff --git a/exporter/fileexporter/error_component.go b/exporter/fileexporter/error_component.go new file mode 100644 index 000000000000..16d4675f6ed5 --- /dev/null +++ b/exporter/fileexporter/error_component.go @@ -0,0 +1,26 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" + +import ( + "context" + + "go.opentelemetry.io/collector/component" +) + +// errorComponent is used to return error from a factory method. SharedComponent does +// not handle errors, so wrapping the error into a component is necessary. +type errorComponent struct { + err error +} + +// Start will return the cached error. +func (e *errorComponent) Start(context.Context, component.Host) error { + return e.err +} + +// Shutdown will return the cached error. +func (e *errorComponent) Shutdown(context.Context) error { + return e.err +} diff --git a/exporter/fileexporter/factory.go b/exporter/fileexporter/factory.go index 246a61f5f00f..5f2f29e29ab6 100644 --- a/exporter/fileexporter/factory.go +++ b/exporter/fileexporter/factory.go @@ -7,11 +7,15 @@ import ( "context" "io" "os" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" "gopkg.in/natefinch/lumberjack.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter/internal/metadata" @@ -30,6 +34,13 @@ const ( compressionZSTD = "zstd" ) +type FileExporter interface { + component.Component + consumeTraces(_ context.Context, td ptrace.Traces) error + consumeMetrics(_ context.Context, md pmetric.Metrics) error + consumeLogs(_ context.Context, ld plog.Logs) error +} + // NewFactory creates a factory for OTLP exporter. func NewFactory() exporter.Factory { return exporter.NewFactory( @@ -52,19 +63,15 @@ func createTracesExporter( set exporter.CreateSettings, cfg component.Config, ) (exporter.Traces, error) { - conf := cfg.(*Config) - writer, err := buildFileWriter(conf) + fe, err := getOrCreateFileExporter(cfg) if err != nil { return nil, err } - fe := exporters.GetOrAdd(cfg, func() component.Component { - return newFileExporter(conf, writer) - }) return exporterhelper.NewTracesExporter( ctx, set, cfg, - fe.Unwrap().(*fileExporter).consumeTraces, + fe.consumeTraces, exporterhelper.WithStart(fe.Start), exporterhelper.WithShutdown(fe.Shutdown), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), @@ -76,19 +83,15 @@ func createMetricsExporter( set exporter.CreateSettings, cfg component.Config, ) (exporter.Metrics, error) { - conf := cfg.(*Config) - writer, err := buildFileWriter(conf) + fe, err := getOrCreateFileExporter(cfg) if err != nil { return nil, err } - fe := exporters.GetOrAdd(cfg, func() component.Component { - return newFileExporter(conf, writer) - }) return exporterhelper.NewMetricsExporter( ctx, set, cfg, - fe.Unwrap().(*fileExporter).consumeMetrics, + fe.consumeMetrics, exporterhelper.WithStart(fe.Start), exporterhelper.WithShutdown(fe.Shutdown), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), @@ -100,59 +103,94 @@ func createLogsExporter( set exporter.CreateSettings, cfg component.Config, ) (exporter.Logs, error) { - conf := cfg.(*Config) - writer, err := buildFileWriter(conf) + fe, err := getOrCreateFileExporter(cfg) if err != nil { return nil, err } - fe := exporters.GetOrAdd(cfg, func() component.Component { - return newFileExporter(conf, writer) - }) return exporterhelper.NewLogsExporter( ctx, set, cfg, - fe.Unwrap().(*fileExporter).consumeLogs, + fe.consumeLogs, exporterhelper.WithStart(fe.Start), exporterhelper.WithShutdown(fe.Shutdown), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), ) } -func newFileExporter(conf *Config, writer io.WriteCloser) *fileExporter { - return &fileExporter{ - path: conf.Path, +// getOrCreateFileExporter creates a FileExporter and caches it for a particular configuration, +// or returns the already cached one. Caching is required because the factory is asked trace and +// metric receivers separately when it gets CreateTracesReceiver() and CreateMetricsReceiver() +// but they must not create separate objects, they must use one Exporter object per configuration. +func getOrCreateFileExporter(cfg component.Config) (FileExporter, error) { + conf := cfg.(*Config) + fe := exporters.GetOrAdd(cfg, func() component.Component { + e, err := newFileExporter(conf) + if err != nil { + return &errorComponent{err: err} + } + + return e + }) + + component := fe.Unwrap() + if errComponent, ok := component.(*errorComponent); ok { + return nil, errComponent.err + } + + return component.(FileExporter), nil +} + +func newFileExporter(conf *Config) (FileExporter, error) { + marshaller := &marshaller{ formatType: conf.FormatType, - file: writer, tracesMarshaler: tracesMarshalers[conf.FormatType], metricsMarshaler: metricsMarshalers[conf.FormatType], logsMarshaler: logsMarshalers[conf.FormatType], - exporter: buildExportFunc(conf), compression: conf.Compression, compressor: buildCompressor(conf.Compression), - flushInterval: conf.FlushInterval, } + export := buildExportFunc(conf) + + writer, err := newFileWriter(conf.Path, conf.Rotation, conf.FlushInterval, export) + if err != nil { + return nil, err + } + + return &fileExporter{ + marshaller: marshaller, + writer: writer, + }, nil } -func buildFileWriter(cfg *Config) (io.WriteCloser, error) { - if cfg.Rotation == nil { - f, err := os.OpenFile(cfg.Path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) +func newFileWriter(path string, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) { + var wc io.WriteCloser + if rotation == nil { + f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return nil, err } - return newBufferedWriteCloser(f), nil + wc = newBufferedWriteCloser(f) + } else { + wc = &lumberjack.Logger{ + Filename: path, + MaxSize: rotation.MaxMegabytes, + MaxAge: rotation.MaxDays, + MaxBackups: rotation.MaxBackups, + LocalTime: rotation.LocalTime, + } } - return &lumberjack.Logger{ - Filename: cfg.Path, - MaxSize: cfg.Rotation.MaxMegabytes, - MaxAge: cfg.Rotation.MaxDays, - MaxBackups: cfg.Rotation.MaxBackups, - LocalTime: cfg.Rotation.LocalTime, + + return &fileWriter{ + path: path, + file: wc, + exporter: export, + flushInterval: flushInterval, }, nil } // This is the map of already created File exporters for particular configurations. // We maintain this map because the Factory is asked trace and metric receivers separately // when it gets CreateTracesReceiver() and CreateMetricsReceiver() but they must not -// create separate objects, they must use one Receiver object per configuration. +// create separate objects, they must use one Exporter object per configuration. var exporters = sharedcomponent.NewSharedComponents() diff --git a/exporter/fileexporter/factory_test.go b/exporter/fileexporter/factory_test.go index 0b7a36c89e49..9d155e653d34 100644 --- a/exporter/fileexporter/factory_test.go +++ b/exporter/fileexporter/factory_test.go @@ -7,6 +7,7 @@ import ( "context" "io" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -96,7 +97,7 @@ func TestCreateLogsExporterError(t *testing.T) { assert.Error(t, err) } -func TestBuildFileWriter(t *testing.T) { +func TestNewFileWriter(t *testing.T) { type args struct { cfg *Config } @@ -104,17 +105,19 @@ func TestBuildFileWriter(t *testing.T) { name string args args want io.WriteCloser - validate func(*testing.T, io.WriteCloser) + validate func(*testing.T, *fileWriter) }{ { name: "single file", args: args{ cfg: &Config{ - Path: tempFileName(t), + Path: tempFileName(t), + FlushInterval: 5 * time.Second, }, }, - validate: func(t *testing.T, closer io.WriteCloser) { - _, ok := closer.(*bufferedWriteCloser) + validate: func(t *testing.T, writer *fileWriter) { + assert.Equal(t, 5*time.Second, writer.flushInterval) + _, ok := writer.file.(*bufferedWriteCloser) assert.Equal(t, true, ok) }, }, @@ -128,10 +131,10 @@ func TestBuildFileWriter(t *testing.T) { }, }, }, - validate: func(t *testing.T, closer io.WriteCloser) { - writer, ok := closer.(*lumberjack.Logger) + validate: func(t *testing.T, writer *fileWriter) { + logger, ok := writer.file.(*lumberjack.Logger) assert.Equal(t, true, ok) - assert.Equal(t, defaultMaxBackups, writer.MaxBackups) + assert.Equal(t, defaultMaxBackups, logger.MaxBackups) }, }, { @@ -147,21 +150,21 @@ func TestBuildFileWriter(t *testing.T) { }, }, }, - validate: func(t *testing.T, closer io.WriteCloser) { - writer, ok := closer.(*lumberjack.Logger) + validate: func(t *testing.T, writer *fileWriter) { + logger, ok := writer.file.(*lumberjack.Logger) assert.Equal(t, true, ok) - assert.Equal(t, 3, writer.MaxBackups) - assert.Equal(t, 30, writer.MaxSize) - assert.Equal(t, 100, writer.MaxAge) - assert.Equal(t, true, writer.LocalTime) + assert.Equal(t, 3, logger.MaxBackups) + assert.Equal(t, 30, logger.MaxSize) + assert.Equal(t, 100, logger.MaxAge) + assert.Equal(t, true, logger.LocalTime) }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := buildFileWriter(tt.args.cfg) + got, err := newFileWriter(tt.args.cfg.Path, tt.args.cfg.Rotation, tt.args.cfg.FlushInterval, nil) defer func() { - assert.NoError(t, got.Close()) + assert.NoError(t, got.file.Close()) }() assert.NoError(t, err) tt.validate(t, got) diff --git a/exporter/fileexporter/file_exporter.go b/exporter/fileexporter/file_exporter.go index 2765fb3787dd..c610c9fa6445 100644 --- a/exporter/fileexporter/file_exporter.go +++ b/exporter/fileexporter/file_exporter.go @@ -5,10 +5,6 @@ package fileexporter // import "github.com/open-telemetry/opentelemetry-collecto import ( "context" - "encoding/binary" - "io" - "sync" - "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/plog" @@ -16,154 +12,43 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" ) -// Marshaler configuration used for marhsaling Protobuf -var tracesMarshalers = map[string]ptrace.Marshaler{ - formatTypeJSON: &ptrace.JSONMarshaler{}, - formatTypeProto: &ptrace.ProtoMarshaler{}, -} -var metricsMarshalers = map[string]pmetric.Marshaler{ - formatTypeJSON: &pmetric.JSONMarshaler{}, - formatTypeProto: &pmetric.ProtoMarshaler{}, -} -var logsMarshalers = map[string]plog.Marshaler{ - formatTypeJSON: &plog.JSONMarshaler{}, - formatTypeProto: &plog.ProtoMarshaler{}, -} - -// exportFunc defines how to export encoded telemetry data. -type exportFunc func(e *fileExporter, buf []byte) error - // fileExporter is the implementation of file exporter that writes telemetry data to a file type fileExporter struct { - path string - file io.WriteCloser - mutex sync.Mutex - - tracesMarshaler ptrace.Marshaler - metricsMarshaler pmetric.Marshaler - logsMarshaler plog.Marshaler - - compression string - compressor compressFunc - - formatType string - exporter exportFunc - - flushInterval time.Duration - flushTicker *time.Ticker - stopTicker chan struct{} + marshaller *marshaller + writer *fileWriter } func (e *fileExporter) consumeTraces(_ context.Context, td ptrace.Traces) error { - buf, err := e.tracesMarshaler.MarshalTraces(td) + buf, err := e.marshaller.marshalTraces(td) if err != nil { return err } - buf = e.compressor(buf) - return e.exporter(e, buf) + return e.writer.export(buf) } func (e *fileExporter) consumeMetrics(_ context.Context, md pmetric.Metrics) error { - buf, err := e.metricsMarshaler.MarshalMetrics(md) + buf, err := e.marshaller.marshalMetrics(md) if err != nil { return err } - buf = e.compressor(buf) - return e.exporter(e, buf) + return e.writer.export(buf) } func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error { - buf, err := e.logsMarshaler.MarshalLogs(ld) + buf, err := e.marshaller.marshalLogs(ld) if err != nil { return err } - buf = e.compressor(buf) - return e.exporter(e, buf) -} - -func exportMessageAsLine(e *fileExporter, buf []byte) error { - // Ensure only one write operation happens at a time. - e.mutex.Lock() - defer e.mutex.Unlock() - if _, err := e.file.Write(buf); err != nil { - return err - } - if _, err := io.WriteString(e.file, "\n"); err != nil { - return err - } - return nil -} - -func exportMessageAsBuffer(e *fileExporter, buf []byte) error { - // Ensure only one write operation happens at a time. - e.mutex.Lock() - defer e.mutex.Unlock() - // write the size of each message before writing the message itself. https://developers.google.com/protocol-buffers/docs/techniques - // each encoded object is preceded by 4 bytes (an unsigned 32 bit integer) - data := make([]byte, 4, 4+len(buf)) - binary.BigEndian.PutUint32(data, uint32(len(buf))) - - return binary.Write(e.file, binary.BigEndian, append(data, buf...)) -} - -// startFlusher starts the flusher. -// It does not check the flushInterval -func (e *fileExporter) startFlusher() { - e.mutex.Lock() - defer e.mutex.Unlock() - ff, ok := e.file.(interface{ flush() error }) - if !ok { - // Just in case. - return - } - - // Create the stop channel. - e.stopTicker = make(chan struct{}) - // Start the ticker. - e.flushTicker = time.NewTicker(e.flushInterval) - go func() { - for { - select { - case <-e.flushTicker.C: - e.mutex.Lock() - ff.flush() - e.mutex.Unlock() - case <-e.stopTicker: - return - } - } - }() + return e.writer.export(buf) } // Start starts the flush timer if set. -func (e *fileExporter) Start(context.Context, component.Host) error { - if e.flushInterval > 0 { - e.startFlusher() - } - return nil +func (e *fileExporter) Start(ctx context.Context, _ component.Host) error { + return e.writer.start(ctx) } // Shutdown stops the exporter and is invoked during shutdown. // It stops the flush ticker if set. func (e *fileExporter) Shutdown(context.Context) error { - e.mutex.Lock() - defer e.mutex.Unlock() - // Stop the flush ticker. - if e.flushTicker != nil { - e.flushTicker.Stop() - // Stop the go routine. - close(e.stopTicker) - } - return e.file.Close() -} - -func buildExportFunc(cfg *Config) func(e *fileExporter, buf []byte) error { - if cfg.FormatType == formatTypeProto { - return exportMessageAsBuffer - } - // if the data format is JSON and needs to be compressed, telemetry data can't be written to file in JSON format. - if cfg.FormatType == formatTypeJSON && cfg.Compression != "" { - return exportMessageAsBuffer - } - return exportMessageAsLine + return e.writer.shutdown() } diff --git a/exporter/fileexporter/file_exporter_test.go b/exporter/fileexporter/file_exporter_test.go index e813b7ef1598..7b1747df7a0b 100644 --- a/exporter/fileexporter/file_exporter_test.go +++ b/exporter/fileexporter/file_exporter_test.go @@ -126,19 +126,10 @@ func TestFileTracesExporter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { conf := tt.args.conf - writer, err := buildFileWriter(conf) + feI, err := newFileExporter(conf) assert.NoError(t, err) - fe := &fileExporter{ - path: conf.Path, - formatType: conf.FormatType, - file: writer, - tracesMarshaler: tracesMarshalers[conf.FormatType], - exporter: buildExportFunc(conf), - compression: conf.Compression, - compressor: buildCompressor(conf.Compression), - flushInterval: conf.FlushInterval, - } - require.NotNil(t, fe) + require.IsType(t, &fileExporter{}, feI) + fe := feI.(*fileExporter) td := testdata.GenerateTracesTwoSpansSameResource() assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost())) @@ -146,13 +137,13 @@ func TestFileTracesExporter(t *testing.T) { assert.NoError(t, fe.consumeTraces(context.Background(), td)) assert.NoError(t, fe.Shutdown(context.Background())) - fi, err := os.Open(fe.path) + fi, err := os.Open(fe.writer.path) assert.NoError(t, err) defer fi.Close() br := bufio.NewReader(fi) for { buf, isEnd, err := func() ([]byte, bool, error) { - if fe.formatType == formatTypeJSON && fe.compression == "" { + if fe.marshaller.formatType == formatTypeJSON && fe.marshaller.compression == "" { return readJSONMessage(br) } return readMessageFromStream(br) @@ -161,7 +152,7 @@ func TestFileTracesExporter(t *testing.T) { if isEnd { break } - decoder := buildUnCompressor(fe.compression) + decoder := buildUnCompressor(fe.marshaller.compression) buf, err = decoder(buf) assert.NoError(t, err) got, err := tt.args.unmarshaler.UnmarshalTraces(buf) @@ -175,11 +166,15 @@ func TestFileTracesExporter(t *testing.T) { func TestFileTracesExporterError(t *testing.T) { mf := &errorWriter{} fe := &fileExporter{ - file: mf, - formatType: formatTypeJSON, - exporter: exportMessageAsLine, - tracesMarshaler: tracesMarshalers[formatTypeJSON], - compressor: noneCompress, + marshaller: &marshaller{ + formatType: formatTypeJSON, + tracesMarshaler: tracesMarshalers[formatTypeJSON], + compressor: noneCompress, + }, + writer: &fileWriter{ + file: mf, + exporter: exportMessageAsLine, + }, } require.NotNil(t, fe) @@ -264,14 +259,13 @@ func TestFileMetricsExporter(t *testing.T) { writer, err := buildFileWriter(conf) assert.NoError(t, err) fe := &fileExporter{ - path: conf.Path, - formatType: conf.FormatType, - file: writer, - metricsMarshaler: metricsMarshalers[conf.FormatType], - exporter: buildExportFunc(conf), - compression: conf.Compression, - compressor: buildCompressor(conf.Compression), - flushInterval: conf.FlushInterval, + marshaller: &marshaller{ + formatType: conf.FormatType, + metricsMarshaler: metricsMarshalers[conf.FormatType], + compression: conf.Compression, + compressor: buildCompressor(conf.Compression), + }, + writer: writer, } require.NotNil(t, fe) @@ -281,14 +275,14 @@ func TestFileMetricsExporter(t *testing.T) { assert.NoError(t, fe.consumeMetrics(context.Background(), md)) assert.NoError(t, fe.Shutdown(context.Background())) - fi, err := os.Open(fe.path) + fi, err := os.Open(fe.writer.path) assert.NoError(t, err) defer fi.Close() br := bufio.NewReader(fi) for { buf, isEnd, err := func() ([]byte, bool, error) { - if fe.formatType == formatTypeJSON && - fe.compression == "" { + if fe.marshaller.formatType == formatTypeJSON && + fe.marshaller.compression == "" { return readJSONMessage(br) } return readMessageFromStream(br) @@ -297,7 +291,7 @@ func TestFileMetricsExporter(t *testing.T) { if isEnd { break } - decoder := buildUnCompressor(fe.compression) + decoder := buildUnCompressor(fe.marshaller.compression) buf, err = decoder(buf) assert.NoError(t, err) got, err := tt.args.unmarshaler.UnmarshalMetrics(buf) @@ -312,11 +306,15 @@ func TestFileMetricsExporter(t *testing.T) { func TestFileMetricsExporterError(t *testing.T) { mf := &errorWriter{} fe := &fileExporter{ - file: mf, - formatType: formatTypeJSON, - exporter: exportMessageAsLine, - metricsMarshaler: metricsMarshalers[formatTypeJSON], - compressor: noneCompress, + marshaller: &marshaller{ + formatType: formatTypeJSON, + metricsMarshaler: metricsMarshalers[formatTypeJSON], + compressor: noneCompress, + }, + writer: &fileWriter{ + file: mf, + exporter: exportMessageAsLine, + }, } require.NotNil(t, fe) @@ -401,14 +399,13 @@ func TestFileLogsExporter(t *testing.T) { writer, err := buildFileWriter(conf) assert.NoError(t, err) fe := &fileExporter{ - path: conf.Path, - formatType: conf.FormatType, - file: writer, - logsMarshaler: logsMarshalers[conf.FormatType], - exporter: buildExportFunc(conf), - compression: conf.Compression, - compressor: buildCompressor(conf.Compression), - flushInterval: conf.FlushInterval, + marshaller: &marshaller{ + formatType: conf.FormatType, + logsMarshaler: logsMarshalers[conf.FormatType], + compression: conf.Compression, + compressor: buildCompressor(conf.Compression), + }, + writer: writer, } require.NotNil(t, fe) @@ -418,13 +415,13 @@ func TestFileLogsExporter(t *testing.T) { assert.NoError(t, fe.consumeLogs(context.Background(), ld)) assert.NoError(t, fe.Shutdown(context.Background())) - fi, err := os.Open(fe.path) + fi, err := os.Open(fe.writer.path) assert.NoError(t, err) defer fi.Close() br := bufio.NewReader(fi) for { buf, isEnd, err := func() ([]byte, bool, error) { - if fe.formatType == formatTypeJSON && fe.compression == "" { + if fe.marshaller.formatType == formatTypeJSON && fe.marshaller.compression == "" { return readJSONMessage(br) } return readMessageFromStream(br) @@ -433,7 +430,7 @@ func TestFileLogsExporter(t *testing.T) { if isEnd { break } - decoder := buildUnCompressor(fe.compression) + decoder := buildUnCompressor(fe.marshaller.compression) buf, err = decoder(buf) assert.NoError(t, err) got, err := tt.args.unmarshaler.UnmarshalLogs(buf) @@ -447,11 +444,15 @@ func TestFileLogsExporter(t *testing.T) { func TestFileLogsExporterErrors(t *testing.T) { mf := &errorWriter{} fe := &fileExporter{ - file: mf, - formatType: formatTypeJSON, - exporter: exportMessageAsLine, - logsMarshaler: logsMarshalers[formatTypeJSON], - compressor: noneCompress, + marshaller: &marshaller{ + formatType: formatTypeJSON, + logsMarshaler: logsMarshalers[formatTypeJSON], + compressor: noneCompress, + }, + writer: &fileWriter{ + file: mf, + exporter: exportMessageAsLine, + }, } require.NotNil(t, fe) @@ -464,14 +465,18 @@ func TestFileLogsExporterErrors(t *testing.T) { func TestExportMessageAsBuffer(t *testing.T) { path := tempFileName(t) fe := &fileExporter{ - path: path, - formatType: formatTypeProto, - file: &lumberjack.Logger{ - Filename: path, - MaxSize: 1, + marshaller: &marshaller{ + formatType: formatTypeProto, + logsMarshaler: logsMarshalers[formatTypeProto], + }, + writer: &fileWriter{ + path: path, + file: &lumberjack.Logger{ + Filename: path, + MaxSize: 1, + }, + exporter: exportMessageAsBuffer, }, - logsMarshaler: logsMarshalers[formatTypeProto], - exporter: exportMessageAsBuffer, } require.NotNil(t, fe) // @@ -479,12 +484,12 @@ func TestExportMessageAsBuffer(t *testing.T) { marshaler := &plog.ProtoMarshaler{} buf, err := marshaler.MarshalLogs(ld) assert.NoError(t, err) - assert.Error(t, exportMessageAsBuffer(fe, buf)) + assert.Error(t, exportMessageAsBuffer(fe.writer, buf)) assert.NoError(t, fe.Shutdown(context.Background())) } // tempFileName provides a temporary file name for testing. -func tempFileName(t *testing.T) string { +func tempFileName(t testing.TB) string { return filepath.Join(t.TempDir(), "fileexporter_test.tmp") } @@ -621,14 +626,19 @@ func (b *tsBuffer) Bytes() []byte { } func safeFileExporterWrite(e *fileExporter, d []byte) (int, error) { - e.mutex.Lock() - defer e.mutex.Unlock() - return e.file.Write(d) + e.writer.mutex.Lock() + defer e.writer.mutex.Unlock() + return e.writer.file.Write(d) +} + +func buildFileWriter(conf *Config) (*fileWriter, error) { + export := buildExportFunc(conf) + return newFileWriter(conf.Path, conf.Rotation, conf.FlushInterval, export) } func TestFlushing(t *testing.T) { cfg := &Config{ - Path: "", + Path: tempFileName(t), FlushInterval: time.Second, } @@ -638,7 +648,12 @@ func TestFlushing(t *testing.T) { // Wrap the buffer with the buffered writer closer that implements flush() method. bwc := newBufferedWriteCloser(buf) // Create a file exporter with flushing enabled. - fe := newFileExporter(cfg, bwc) + feI, err := newFileExporter(cfg) + assert.NoError(t, err) + assert.IsType(t, &fileExporter{}, feI) + fe := feI.(*fileExporter) + fe.writer.file.Close() + fe.writer.file = bwc // Start the flusher. ctx := context.Background() diff --git a/exporter/fileexporter/file_writer.go b/exporter/fileexporter/file_writer.go new file mode 100644 index 000000000000..46dde6d82ed6 --- /dev/null +++ b/exporter/fileexporter/file_writer.go @@ -0,0 +1,118 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" + +import ( + "context" + "encoding/binary" + "io" + "sync" + "time" +) + +// exportFunc defines how to export encoded telemetry data. +type exportFunc func(e *fileWriter, buf []byte) error + +type fileWriter struct { + path string + file io.WriteCloser + mutex sync.Mutex + + exporter exportFunc + + flushInterval time.Duration + flushTicker *time.Ticker + stopTicker chan struct{} +} + +func exportMessageAsLine(w *fileWriter, buf []byte) error { + // Ensure only one write operation happens at a time. + w.mutex.Lock() + defer w.mutex.Unlock() + if _, err := w.file.Write(buf); err != nil { + return err + } + if _, err := io.WriteString(w.file, "\n"); err != nil { + return err + } + return nil +} + +func exportMessageAsBuffer(w *fileWriter, buf []byte) error { + // Ensure only one write operation happens at a time. + w.mutex.Lock() + defer w.mutex.Unlock() + // write the size of each message before writing the message itself. https://developers.google.com/protocol-buffers/docs/techniques + // each encoded object is preceded by 4 bytes (an unsigned 32 bit integer) + data := make([]byte, 4, 4+len(buf)) + binary.BigEndian.PutUint32(data, uint32(len(buf))) + + return binary.Write(w.file, binary.BigEndian, append(data, buf...)) +} + +func (w *fileWriter) export(buf []byte) error { + return w.exporter(w, buf) +} + +// startFlusher starts the flusher. +// It does not check the flushInterval +func (w *fileWriter) startFlusher() { + w.mutex.Lock() + defer w.mutex.Unlock() + ff, ok := w.file.(interface{ flush() error }) + if !ok { + // Just in case. + return + } + + // Create the stop channel. + w.stopTicker = make(chan struct{}) + // Start the ticker. + w.flushTicker = time.NewTicker(w.flushInterval) + go func() { + for { + select { + case <-w.flushTicker.C: + w.mutex.Lock() + ff.flush() + w.mutex.Unlock() + case <-w.stopTicker: + return + } + } + }() +} + +// Start starts the flush timer if set. +func (w *fileWriter) start(context.Context) error { + if w.flushInterval > 0 { + w.startFlusher() + } + return nil +} + +// Shutdown stops the exporter and is invoked during shutdown. +// It stops the flush ticker if set. +func (w *fileWriter) shutdown() error { + w.mutex.Lock() + defer w.mutex.Unlock() + // Stop the flush ticker. + if w.flushTicker != nil { + w.flushTicker.Stop() + // Stop the go routine. + close(w.stopTicker) + } + return w.file.Close() +} + +func buildExportFunc(cfg *Config) func(w *fileWriter, buf []byte) error { + if cfg.FormatType == formatTypeProto { + return exportMessageAsBuffer + } + // if the data format is JSON and needs to be compressed, telemetry data can't be written to file in JSON format. + if cfg.FormatType == formatTypeJSON && cfg.Compression != "" { + return exportMessageAsBuffer + } + return exportMessageAsLine +} diff --git a/exporter/fileexporter/marshaller.go b/exporter/fileexporter/marshaller.go new file mode 100644 index 000000000000..1eabb80369e4 --- /dev/null +++ b/exporter/fileexporter/marshaller.go @@ -0,0 +1,62 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" + +import ( + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// Marshaler configuration used for marhsaling Protobuf +var tracesMarshalers = map[string]ptrace.Marshaler{ + formatTypeJSON: &ptrace.JSONMarshaler{}, + formatTypeProto: &ptrace.ProtoMarshaler{}, +} +var metricsMarshalers = map[string]pmetric.Marshaler{ + formatTypeJSON: &pmetric.JSONMarshaler{}, + formatTypeProto: &pmetric.ProtoMarshaler{}, +} +var logsMarshalers = map[string]plog.Marshaler{ + formatTypeJSON: &plog.JSONMarshaler{}, + formatTypeProto: &plog.ProtoMarshaler{}, +} + +type marshaller struct { + tracesMarshaler ptrace.Marshaler + metricsMarshaler pmetric.Marshaler + logsMarshaler plog.Marshaler + + compression string + compressor compressFunc + + formatType string +} + +func (m *marshaller) marshalTraces(td ptrace.Traces) ([]byte, error) { + buf, err := m.tracesMarshaler.MarshalTraces(td) + if err != nil { + return nil, err + } + buf = m.compressor(buf) + return buf, nil +} + +func (m *marshaller) marshalMetrics(md pmetric.Metrics) ([]byte, error) { + buf, err := m.metricsMarshaler.MarshalMetrics(md) + if err != nil { + return nil, err + } + buf = m.compressor(buf) + return buf, nil +} + +func (m *marshaller) marshalLogs(ld plog.Logs) ([]byte, error) { + buf, err := m.logsMarshaler.MarshalLogs(ld) + if err != nil { + return nil, err + } + buf = m.compressor(buf) + return buf, nil +}