Skip to content

Commit

Permalink
awskinesisexporter: extending to allow for dynamic export types (open…
Browse files Browse the repository at this point in the history
…-telemetry#5440)

* Updating the work to add in dynamic encoding and compression for
kinesis exporter

* Fixing typo and updating default

* Rename local var to make it clear it is to be used for exporting

* Moved NewEncoder method in to a more fiting, restricting to single name

* Fixing test

* Fixing importing issues

* Adding code comment as to why export exists

* removing Permanent error

* Adding updates to mod files

* Updating mods
  • Loading branch information
MovieStoreGuy authored Oct 11, 2021
1 parent 9f28b9f commit dd44390
Show file tree
Hide file tree
Showing 26 changed files with 1,026 additions and 115 deletions.
5 changes: 4 additions & 1 deletion exporter/awskinesisexporter/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Kinesis Exporter


The kinesis exporter currently exports Jaeger traces (dynamic encodings coming) to the configured kinesis stream.
The kinesis exporter currently exports dynamic encodings to the configured kinesis stream.
The exporter relies heavily on the kinesis.PutRecords api to reduce network I/O and and reduces records into smallest atomic representation
to avoid hitting the hard limits placed on Records (No greater than 1Mb).
This producer will block until the operation is done to allow for retryable and queued data to help during high loads.
Expand All @@ -15,6 +15,9 @@ The following settings can be optionally configured:
- `kinesis_endpoint` (no default)
- `region` (default = us-west-2): the region that the kinesis stream is deployed in
- `role` (no default): The role to be used in order to send data to the kinesis stream
- `encoding`
- `name` (default = otlp): defines the export type to be used to send to kinesis (available is `otlp-proto`, `otlp-json`, `zipkin-proto`, `zipkin-json`, `jaeger`)
- `compression` (default = none): allows to set the compression type (defaults BestSpeed for all) before forwarding to kinesis (available is `flate`, `gzip`, `zlib` or `none`)
- `max_records_per_batch` (default = 500, PutRecords limit): The number of records that can be batched together then sent to kinesis.
- `max_record_size` (default = 1Mb, PutRecord(s) limit on record size): The max allowed size that can be exported to kinesis
- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend.
Expand Down
6 changes: 6 additions & 0 deletions exporter/awskinesisexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ type AWSConfig struct {
Role string `mapstructure:"role"`
}

type Encoding struct {
Name string `mapstructure:"name"`
Compression string `mapstructure:"compression"`
}

// Config contains the main configuration options for the awskinesis exporter
type Config struct {
config.ExporterSettings `mapstructure:",squash"`
exporterhelper.TimeoutSettings `mapstructure:",squash"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

Encoding `mapstructure:"encoding"`
AWS AWSConfig `mapstructure:"aws"`
MaxRecordsPerBatch int `mapstructure:"max_records_per_batch"`
MaxRecordSize int `mapstructure:"max_record_size"`
Expand Down
8 changes: 8 additions & 0 deletions exporter/awskinesisexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func TestDefaultConfig(t *testing.T) {
QueueSettings: exporterhelper.DefaultQueueSettings(),
RetrySettings: exporterhelper.DefaultRetrySettings(),
TimeoutSettings: exporterhelper.DefaultTimeoutSettings(),
Encoding: Encoding{
Name: "otlp",
Compression: "none",
},
AWS: AWSConfig{
Region: "us-west-2",
},
Expand Down Expand Up @@ -80,6 +84,10 @@ func TestConfig(t *testing.T) {
},
TimeoutSettings: exporterhelper.DefaultTimeoutSettings(),
QueueSettings: exporterhelper.DefaultQueueSettings(),
Encoding: Encoding{
Name: "otlp-proto",
Compression: "none",
},
AWS: AWSConfig{
StreamName: "test-stream",
KinesisEndpoint: "awskinesis.mars-1.aws.galactic",
Expand Down
24 changes: 22 additions & 2 deletions exporter/awskinesisexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/batch"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/compress"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/producer"
)

Expand Down Expand Up @@ -65,11 +66,30 @@ func createExporter(c config.Exporter, log *zap.Logger) (*Exporter, error) {
producer, err := producer.NewBatcher(kinesis.New(sess, cfgs...), conf.AWS.StreamName,
producer.WithLogger(log),
)
if err != nil {
return nil, err
}

compressor, err := compress.NewCompressor(conf.Encoding.Compression)
if err != nil {
return nil, err
}

encoder, err := batch.NewEncoder(
conf.Encoding.Name,
batch.WithMaxRecordSize(conf.MaxRecordSize),
batch.WithMaxRecordsPerBatch(conf.MaxRecordsPerBatch),
batch.WithCompression(compressor),
)

if err != nil {
return nil, err
}

return &Exporter{
producer: producer,
batcher: batch.NewJaeger(conf.MaxRecordsPerBatch, conf.MaxRecordSize),
}, err
batcher: encoder,
}, nil
}

// Start tells the exporter to start. The exporter may prepare for exporting
Expand Down
7 changes: 7 additions & 0 deletions exporter/awskinesisexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
const (
// The value of "type" key in configuration.
typeStr = "awskinesis"

defaultEncoding = "otlp"
defaultCompression = "none"
)

// NewFactory creates a factory for Kinesis exporter.
Expand All @@ -46,6 +49,10 @@ func createDefaultConfig() config.Exporter {
TimeoutSettings: exporterhelper.DefaultTimeoutSettings(),
RetrySettings: exporterhelper.DefaultRetrySettings(),
QueueSettings: exporterhelper.DefaultQueueSettings(),
Encoding: Encoding{
Name: defaultEncoding,
Compression: defaultCompression,
},
AWS: AWSConfig{
Region: "us-west-2",
},
Expand Down
21 changes: 13 additions & 8 deletions exporter/awskinesisexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,28 @@ go 1.17

require (
github.com/aws/aws-sdk-go v1.40.56
github.com/golang/protobuf v1.5.2
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.36.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.36.0
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.36.1-0.20211004155959-190f8fbb2b9a
go.opentelemetry.io/collector/model v0.36.1-0.20211004155959-190f8fbb2b9a
go.uber.org/zap v1.19.1
google.golang.org/protobuf v1.27.1
)

require (
github.com/gogo/protobuf v1.3.2
github.com/google/uuid v1.3.0
github.com/jaegertracing/jaeger v1.26.0
go.uber.org/multierr v1.7.0
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af // indirect
google.golang.org/genproto v0.0.0-20210927142257-433400c27d05 // indirect
)

require (
github.com/apache/thrift v0.15.0 // indirect
github.com/cenkalti/backoff/v4 v4.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/jaegertracing/jaeger v1.26.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/knadh/koanf v1.2.4 // indirect
github.com/magiconair/properties v1.8.5 // indirect
Expand All @@ -33,21 +34,25 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.36.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/openzipkin/zipkin-go v0.2.5 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/cast v1.4.1 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v1.0.1 // indirect
go.opentelemetry.io/otel/metric v0.24.0 // indirect
go.opentelemetry.io/otel/trace v1.0.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/net v0.0.0-20210927181540-4e4d966f7476 // indirect
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/grpc v1.41.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger => ../../pkg/translator/jaeger

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin => ../../pkg/translator/zipkin

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal
16 changes: 11 additions & 5 deletions exporter/awskinesisexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 22 additions & 28 deletions exporter/awskinesisexporter/internal/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
"errors"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
protov1 "github.com/golang/protobuf/proto" //nolint:staticcheck // Some encoding types uses legacy prototype version
"github.com/aws/aws-sdk-go/service/kinesis" //nolint:staticcheck // Some encoding types uses legacy prototype version
"go.opentelemetry.io/collector/consumer/consumererror"
protov2 "google.golang.org/protobuf/proto"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/compress"
)

const (
Expand All @@ -40,6 +40,8 @@ type Batch struct {
maxBatchSize int
maxRecordSize int

compression compress.Compressor

records []*kinesis.PutRecordsRequestEntry
}

Expand All @@ -63,10 +65,19 @@ func WithMaxRecordSize(size int) Option {
}
}

func WithCompression(compressor compress.Compressor) Option {
return func(bt *Batch) {
if compressor != nil {
bt.compression = compressor
}
}
}

func New(opts ...Option) *Batch {
bt := &Batch{
maxBatchSize: MaxBatchedRecords,
maxRecordSize: MaxRecordSize,
compression: compress.NewNoopCompressor(),
records: make([]*kinesis.PutRecordsRequestEntry, 0, MaxRecordSize),
}

Expand All @@ -77,41 +88,24 @@ func New(opts ...Option) *Batch {
return bt
}

func (b *Batch) addRaw(raw []byte, key string) error {
func (b *Batch) AddRecord(raw []byte, key string) error {
record, err := b.compression.Do(raw)
if err != nil {
return err
}

if l := len(key); l == 0 || l > 256 {
return ErrPartitionKeyLength
}

if len(raw) > b.maxRecordSize {
if l := len(record); l == 0 || l > b.maxRecordSize {
return ErrRecordLength
}

b.records = append(b.records, &kinesis.PutRecordsRequestEntry{Data: raw, PartitionKey: aws.String(key)})
b.records = append(b.records, &kinesis.PutRecordsRequestEntry{Data: record, PartitionKey: aws.String(key)})
return nil
}

// AddProtobufV1 allows for deprecated protobuf generated types to be exported
// and marshaled into records to be exported to kinesis.
// The protobuf v1 message is considered deprecated so where possible to use
// batch.AddProtobufV2.
func (b *Batch) AddProtobufV1(message protov1.Message, key string) error {
data, err := protov1.Marshal(message)
if err != nil {
return err
}
return b.addRaw(data, key)
}

// AddProtobufV2 accepts the protobuf message and marshal it into a record
// ready to submit to kinesis.
func (b *Batch) AddProtobufV2(message protov2.Message, key string) error {
data, err := protov2.Marshal(message)
if err != nil {
return err
}
return b.addRaw(data, key)
}

// Chunk breaks up the iternal queue into blocks that can be used
// to be written to he kinesis.PutRecords endpoint
func (b *Batch) Chunk() (chunks [][]*kinesis.PutRecordsRequestEntry) {
Expand Down
Loading

0 comments on commit dd44390

Please sign in to comment.