Skip to content
Merged
9 changes: 9 additions & 0 deletions contrib/config/receiver-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
input:
address: 0.0.0.0
port: 8081
output:
plugins:
kafka:
brokers: 0.0.0.0:9092
clientId: packetstreamer
topic: packetstreamer
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.16.3 // indirect
github.com/aws/smithy-go v1.11.2 // indirect
github.com/confluentinc/confluent-kafka-go v1.8.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/inhies/go-bytesize v0.0.0-20210819104631-275770b98743 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/sys v0.0.0-20211210111614-af8b64212486 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2 // indirect
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.16.3 h1:cJGRyzCSVwZC7zZZ1xbx9m32UnrK
github.com/aws/aws-sdk-go-v2/service/sts v1.16.3/go.mod h1:bfBj0iVmsUyUg4weDB4NxktD9rDGeKSVWnjTnwbx9b8=
github.com/aws/smithy-go v1.11.2 h1:eG/N+CcUMAvsdffgMvjMKwfyDzIkjM6pfxMJ8Mzc6mE=
github.com/aws/smithy-go v1.11.2/go.mod h1:3xHYmszWVx2c0kIwQeEVf9uSm4fYZt67FBJnwub1bgM=
github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E=
github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/inhies/go-bytesize v0.0.0-20210819104631-275770b98743 h1:X3Xxno5Ji8idrNiUoFc7QyXpqhSYlDRYQmc7mlpMBzU=
Expand Down Expand Up @@ -80,6 +84,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2 h1:QAgN6OC0o7dwvyz+HML6GYm+0Pk54O91+oxGqJ/5z8I=
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
Expand Down
215 changes: 164 additions & 51 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ const (
kilobyte = 1024
)

const (
defaultClientId = "packetstreamer"
defaultTopic = "packetstreamer"
defaultAcks = "all"
)

type InputConfig struct {
Address string
Port *int
Expand All @@ -46,8 +52,18 @@ type S3PluginConfig struct {
CannedACL string `yaml:"cannedACL,omitempty"`
}

type KafkaPluginConfig struct {
Brokers string
ClientId string `yaml:"clientId,omitempty"`
Topic string `yaml:"topic,omitempty"`
MessageSize *bytesize.ByteSize `yaml:"messageSize,omitempty"`
Acks string `yaml:"acks,omitempty"`
FileSize *bytesize.ByteSize `yaml:"fileSize,omitempty"`
}

type PluginsConfig struct {
S3 *S3PluginConfig
S3 *S3PluginConfig
Kafka *KafkaPluginConfig
}

type OutputConfig struct {
Expand All @@ -65,8 +81,18 @@ type S3OutputRawConfig struct {
CannedACL *string `yaml:"cannedACL,omitempty"`
}

type KafkaOutputRawConfig struct {
Brokers string
ClientId *string `yaml:"clientId,omitempty"`
Topic *string `yaml:"topic,omitempty"`
MessageSize *string `yaml:"messageSize,omitempty"`
Acks *string `yaml:"acks,omitempty"`
FileSize *string `yaml:"fileSize,omitempty"`
}

type PluginsRawConfig struct {
S3 *S3OutputRawConfig
S3 *S3OutputRawConfig
Kafka *KafkaOutputRawConfig
}

type OutputRawConfig struct {
Expand All @@ -93,7 +119,7 @@ type SamplingRateConfig struct {

type RawConfig struct {
Input *InputConfig
Output OutputRawConfig
Output *OutputRawConfig
TLS TLSConfig
Auth AuthConfig
CompressBlockSize *int `yaml:"compressBlockSize,omitempty"`
Expand Down Expand Up @@ -135,58 +161,19 @@ func NewConfig(configFileName string) (*Config, error) {
}

var s3Config *S3PluginConfig
if rawConfig.Output.Plugins.S3 != nil {
var (
totalFileSize *bytesize.ByteSize
uploadTimeout time.Duration
uploadChunkSize *bytesize.ByteSize
cannedACL string
)

if rawConfig.Output.Plugins.S3.TotalFileSize != nil {
t, err := bytesize.Parse(*rawConfig.Output.Plugins.S3.TotalFileSize)
if err != nil {
return nil, fmt.Errorf("could not parse the totalFileSize field %s: %w", *rawConfig.Output.Plugins.S3.TotalFileSize, err)
}
totalFileSize = &t
} else {
t := 10 * bytesize.MB
totalFileSize = &t
}
var kafkaConfig *KafkaPluginConfig
if rawConfig.Output != nil && rawConfig.Output.Plugins != nil {

if rawConfig.Output.Plugins.S3.UploadTimeout != nil {
uploadTimeout, err = time.ParseDuration(*rawConfig.Output.Plugins.S3.UploadTimeout)
if err != nil {
return nil, fmt.Errorf("could not parse the uploadTimeout field %s: %w", *rawConfig.Output.Plugins.S3.UploadTimeout, err)
}
} else {
uploadTimeout = time.Minute
}
s3Config, err = populateS3Config(rawConfig)

if rawConfig.Output.Plugins.S3.UploadChunkSize != nil {
u, err := bytesize.Parse(*rawConfig.Output.Plugins.S3.UploadChunkSize)
if err != nil {
return nil, fmt.Errorf("could not partse the uploadChunkSize field %s: %w", *rawConfig.Output.Plugins.S3.UploadChunkSize, err)
}
uploadChunkSize = &u
} else {
u := 5 * bytesize.MB
totalFileSize = &u
if err != nil {
return nil, err
}

if rawConfig.Output.Plugins.S3.CannedACL != nil {
cannedACL = *rawConfig.Output.Plugins.S3.CannedACL
} else {
cannedACL = string(types.ObjectCannedACLBucketOwnerFullControl)
}
kafkaConfig, err = populateKafkaConfig(rawConfig)

s3Config = &S3PluginConfig{
Bucket: rawConfig.Output.Plugins.S3.Bucket,
Region: rawConfig.Output.Plugins.S3.Region,
TotalFileSize: totalFileSize,
UploadChunkSize: uploadChunkSize,
UploadTimeout: uploadTimeout,
CannedACL: cannedACL,
if err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -220,7 +207,8 @@ func NewConfig(configFileName string) (*Config, error) {
File: rawConfig.Output.File,
Server: rawConfig.Output.Server,
Plugins: &PluginsConfig{
S3: s3Config,
S3: s3Config,
Kafka: kafkaConfig,
},
},
TLS: rawConfig.TLS,
Expand All @@ -244,3 +232,128 @@ func NewConfig(configFileName string) (*Config, error) {

return config, nil
}

func populateKafkaConfig(rawConfig RawConfig) (*KafkaPluginConfig, error) {
if rawConfig.Output.Plugins.Kafka == nil {
return nil, nil
}

var (
clientId string
topic string
acks string
messageSize *bytesize.ByteSize
fileSize *bytesize.ByteSize
)

rawKafkaConfig := rawConfig.Output.Plugins.Kafka

if rawKafkaConfig.ClientId != nil {
clientId = *rawKafkaConfig.ClientId
} else {
clientId = defaultClientId
}

if rawKafkaConfig.Topic != nil {
topic = *rawKafkaConfig.Topic
} else {
topic = defaultTopic
}

if rawKafkaConfig.MessageSize != nil {
ms, err := bytesize.Parse(*rawKafkaConfig.MessageSize)
if err != nil {
return nil, fmt.Errorf("could not parse the messageSize field %s: %w", *rawKafkaConfig.MessageSize, err)
}
messageSize = &ms
} else {
ms := 65 * bytesize.KB
messageSize = &ms
}

if rawKafkaConfig.Acks != nil {
acks = *rawKafkaConfig.Acks
} else {
acks = defaultAcks
}

if rawKafkaConfig.FileSize != nil {
fs, err := bytesize.Parse(*rawKafkaConfig.FileSize)
if err != nil {
return nil, fmt.Errorf("could not parse the fileSize field %s: %w", *rawKafkaConfig.FileSize, err)
}
fileSize = &fs
} else {
fs := 1 * bytesize.MB
fileSize = &fs
}

return &KafkaPluginConfig{
Brokers: rawConfig.Output.Plugins.Kafka.Brokers,
ClientId: clientId,
Topic: topic,
MessageSize: messageSize,
Acks: acks,
FileSize: fileSize,
}, nil
}

func populateS3Config(rawConfig RawConfig) (*S3PluginConfig, error) {
if rawConfig.Output.Plugins.S3 == nil {
return nil, nil
}

var (
totalFileSize *bytesize.ByteSize
uploadTimeout time.Duration
uploadChunkSize *bytesize.ByteSize
cannedACL string
)

if rawConfig.Output.Plugins.S3.TotalFileSize != nil {
t, err := bytesize.Parse(*rawConfig.Output.Plugins.S3.TotalFileSize)
if err != nil {
return nil, fmt.Errorf("could not parse the totalFileSize field %s: %w", *rawConfig.Output.Plugins.S3.TotalFileSize, err)
}
totalFileSize = &t
} else {
t := 10 * bytesize.MB
totalFileSize = &t
}

if rawConfig.Output.Plugins.S3.UploadTimeout != nil {
var err error
uploadTimeout, err = time.ParseDuration(*rawConfig.Output.Plugins.S3.UploadTimeout)
if err != nil {
return nil, fmt.Errorf("could not parse the uploadTimeout field %s: %w", *rawConfig.Output.Plugins.S3.UploadTimeout, err)
}
} else {
uploadTimeout = time.Minute
}

if rawConfig.Output.Plugins.S3.UploadChunkSize != nil {
u, err := bytesize.Parse(*rawConfig.Output.Plugins.S3.UploadChunkSize)
if err != nil {
return nil, fmt.Errorf("could not parse the uploadChunkSize field %s: %w", *rawConfig.Output.Plugins.S3.UploadChunkSize, err)
}
uploadChunkSize = &u
} else {
u := 5 * bytesize.MB
totalFileSize = &u
}

if rawConfig.Output.Plugins.S3.CannedACL != nil {
cannedACL = *rawConfig.Output.Plugins.S3.CannedACL
} else {
cannedACL = string(types.ObjectCannedACLBucketOwnerFullControl)
}

return &S3PluginConfig{
Bucket: rawConfig.Output.Plugins.S3.Bucket,
Region: rawConfig.Output.Plugins.S3.Region,
TotalFileSize: totalFileSize,
UploadChunkSize: uploadChunkSize,
UploadTimeout: uploadTimeout,
CannedACL: cannedACL,
}, nil
}
3 changes: 3 additions & 0 deletions pkg/file/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package file

var Header = []byte{0xde, 0xef, 0xec, 0xe0}
Loading