Skip to content

Commit

Permalink
reuse compression type for supporting multiple types
Browse files Browse the repository at this point in the history
  • Loading branch information
pepperkick committed Mar 7, 2024
1 parent b359613 commit 7506b9d
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .chloggen/add-compression-option.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ change_type: enhancement
component: awss3exporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "add `compression` option to enable gzip compression"
note: "add `compression` option to enable file compression on S3"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [ 27872 ]
Expand Down
8 changes: 6 additions & 2 deletions exporter/awss3exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ The following exporter configuration parameters are supported.
| `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | |
| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false |
| `disable_ssl` | set this to `true` to disable SSL when sending requests | false |
| `compression` | should the file be compressed using gzip | false |
| `compression` | should the file be compressed | none |

### Marshaler

Expand All @@ -43,10 +43,14 @@ Marshaler determines the format of data sent to AWS S3. Currently, the following
- `otlp_json` (default): the [OpenTelemetry Protocol format](https://github.com/open-telemetry/opentelemetry-proto), represented as json.
- `otlp_proto`: the [OpenTelemetry Protocol format](https://github.com/open-telemetry/opentelemetry-proto), represented as Protocol Buffers. A single protobuf message is written into each object.
- `sumo_ic`: the [Sumo Logic Installed Collector Archive format](https://help.sumologic.com/docs/manage/data-archiving/archive/).
**This format is supported only for logs and does not support `compression` option as it is compressed by default**
**This format is supported only for logs.**
- `body`: export the log body as string.
**This format is supported only for logs.**

### Compression
- `none` (default): No compression will be applied
- `gzip`: Files will be compressed with gzip. **This does not support `sumo_ic`marshaler.**

# Example Configuration

Following example configuration defines to store output in 'eu-central' region and bucket named 'databucket'.
Expand Down
33 changes: 21 additions & 12 deletions exporter/awss3exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@ import (
"errors"

"go.uber.org/multierr"

"go.opentelemetry.io/collector/config/configcompression"
)

// S3UploaderConfig contains aws s3 uploader related config to controls things
// like bucket, prefix, batching, connections, retries, etc.
type S3UploaderConfig struct {
Region string `mapstructure:"region"`
S3Bucket string `mapstructure:"s3_bucket"`
S3Prefix string `mapstructure:"s3_prefix"`
S3Partition string `mapstructure:"s3_partition"`
FilePrefix string `mapstructure:"file_prefix"`
Endpoint string `mapstructure:"endpoint"`
RoleArn string `mapstructure:"role_arn"`
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
DisableSSL bool `mapstructure:"disable_ssl"`
Compression bool `mapstructure:"compression"`
Region string `mapstructure:"region"`
S3Bucket string `mapstructure:"s3_bucket"`
S3Prefix string `mapstructure:"s3_prefix"`
S3Partition string `mapstructure:"s3_partition"`
FilePrefix string `mapstructure:"file_prefix"`
Endpoint string `mapstructure:"endpoint"`
RoleArn string `mapstructure:"role_arn"`
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
DisableSSL bool `mapstructure:"disable_ssl"`
Compression configcompression.Type `mapstructure:"compression"`
}

type MarshalerType string
Expand Down Expand Up @@ -49,8 +51,15 @@ func (c *Config) Validate() error {
if c.S3Uploader.S3Bucket == "" {
errs = multierr.Append(errs, errors.New("bucket is required"))
}
if c.S3Uploader.Compression && c.MarshalerName == SumoIC {
errs = multierr.Append(errs, errors.New("marshaler does not support compression"))
compression := c.S3Uploader.Compression
if compression.IsCompressed() {
if compression != configcompression.TypeGzip {
errs = multierr.Append(errs, errors.New("unknown compression type"))
}

if c.MarshalerName == SumoIC {
errs = multierr.Append(errs, errors.New("marshaler does not support compression"))
}
}
return errs
}
42 changes: 42 additions & 0 deletions exporter/awss3exporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,45 @@ func TestMarshallerName(t *testing.T) {
)

}

func TestCompressionName(t *testing.T) {
factories, err := otelcoltest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Exporters[factory.Type()] = factory
cfg, err := otelcoltest.LoadConfigAndValidate(
filepath.Join("testdata", "compression.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

e := cfg.Exporters[component.MustNewID("awss3")].(*Config)

assert.Equal(t, e,
&Config{
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "foo",
S3Partition: "minute",
Compression: "gzip",
},
MarshalerName: "otlp_json",
},
)

e = cfg.Exporters[component.MustNewIDWithName("awss3", "proto")].(*Config)

assert.Equal(t, e,
&Config{
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "bar",
S3Partition: "minute",
Compression: "none",
},
MarshalerName: "otlp_proto",
},
)

}
1 change: 0 additions & 1 deletion exporter/awss3exporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collect
import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
Expand Down
1 change: 1 addition & 0 deletions exporter/awss3exporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/aws/aws-sdk-go v1.50.27
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.96.1-0.20240305232712-5a68058e0e3a
go.opentelemetry.io/collector/config/configcompression v0.96.0
go.opentelemetry.io/collector/confmap v0.96.1-0.20240305232712-5a68058e0e3a
go.opentelemetry.io/collector/consumer v0.96.1-0.20240305232712-5a68058e0e3a
go.opentelemetry.io/collector/exporter v0.96.1-0.20240305232712-5a68058e0e3a
Expand Down
2 changes: 2 additions & 0 deletions exporter/awss3exporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions exporter/awss3exporter/s3_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"compress/gzip"
"context"
"fmt"
"go.opentelemetry.io/collector/config/configcompression"
"math/rand"
"strconv"
"time"
Expand Down Expand Up @@ -39,14 +40,14 @@ func randomInRange(low, hi int) int {
return low + rand.Intn(hi-low)
}

func getS3Key(time time.Time, keyPrefix string, partition string, filePrefix string, metadata string, fileformat string, compression bool) string {
func getS3Key(time time.Time, keyPrefix string, partition string, filePrefix string, metadata string, fileformat string, compression configcompression.Type) string {
timeKey := getTimeKey(time, partition)
randomID := randomInRange(100000000, 999999999)

s3Key := keyPrefix + "/" + timeKey + "/" + filePrefix + metadata + "_" + strconv.Itoa(randomID) + "." + fileformat

// add ".gz" extension to files if compression is enabled
if compression {
if compression == configcompression.TypeGzip {
s3Key += ".gz"
}

Expand Down Expand Up @@ -80,15 +81,14 @@ func getSession(config *Config, sessionConfig *aws.Config) (*session.Session, er
}

func (s3writer *s3Writer) writeBuffer(_ context.Context, buf []byte, config *Config, metadata string, format string) error {
compressionEnabled := config.S3Uploader.Compression
now := time.Now()
key := getS3Key(now,
config.S3Uploader.S3Prefix, config.S3Uploader.S3Partition,
config.S3Uploader.FilePrefix, metadata, format, compressionEnabled)
config.S3Uploader.FilePrefix, metadata, format, config.S3Uploader.Compression)

encoding := ""
var reader *bytes.Reader
if compressionEnabled {
if config.S3Uploader.Compression == configcompression.TypeGzip {
// set s3 uploader content encoding to "gzip"
encoding = "gzip"
var gzipContents bytes.Buffer
Expand Down
4 changes: 2 additions & 2 deletions exporter/awss3exporter/s3_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestS3Key(t *testing.T) {
require.NotNil(t, tm)

re := regexp.MustCompile(`keyprefix/year=2022/month=06/day=05/hour=00/minute=00/fileprefixlogs_([0-9]+).json`)
s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "json", false)
s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "json", "")
matched := re.MatchString(s3Key)
assert.Equal(t, true, matched)
}
Expand All @@ -50,7 +50,7 @@ func TestS3KeyOfCompressedFile(t *testing.T) {
require.NotNil(t, tm)

re := regexp.MustCompile(`keyprefix/year=2022/month=06/day=05/hour=00/minute=00/fileprefixlogs_([0-9]+).json.gz`)
s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "json", true)
s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "json", "gzip")
matched := re.MatchString(s3Key)
assert.Equal(t, true, matched)
}
Expand Down
26 changes: 26 additions & 0 deletions exporter/awss3exporter/testdata/compression.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
receivers:
nop:

exporters:
awss3:
s3uploader:
s3_bucket: "foo"
compression: "gzip"
marshaler: otlp_json

awss3/proto:
s3uploader:
s3_bucket: "bar"
compression: "none"
marshaler: otlp_proto


processors:
nop:

service:
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [awss3, awss3/proto]

0 comments on commit 7506b9d

Please sign in to comment.