Skip to content

Commit

Permalink
feat: s3 target factory support for kms
Browse files Browse the repository at this point in the history
Signed-off-by: Roko Romic <rokoromic@gmail.com>
  • Loading branch information
rromic committed Mar 25, 2023
1 parent bcb28ad commit b451582
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 22 deletions.
25 changes: 14 additions & 11 deletions pkg/config/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,20 @@ var testConfig = &config.Config{
}},
},
S3: config.S3{
AccessKeyID: "AccessKey",
SecretAccessKey: "SecretAccessKey",
Bucket: "test",
SkipExisting: true,
MinimumPriority: "debug",
Endpoint: "https://storage.yandexcloud.net",
PathStyle: true,
Region: "ru-central1",
Prefix: "prefix",
CustomFields: map[string]string{"field": "value"},
Channels: []config.S3{{}},
AccessKeyID: "AccessKey",
SecretAccessKey: "SecretAccessKey",
Bucket: "test",
BucketKeyEnabled: false,
SseKmsKeyId: "",
ServerSideEncryption: "",
SkipExisting: true,
MinimumPriority: "debug",
Endpoint: "https://storage.yandexcloud.net",
PathStyle: true,
Region: "ru-central1",
Prefix: "prefix",
CustomFields: map[string]string{"field": "value"},
Channels: []config.S3{{}},
},
Kinesis: config.Kinesis{
AccessKeyID: "AccessKey",
Expand Down
38 changes: 27 additions & 11 deletions pkg/config/target_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (f *TargetFactory) KinesisClients(config Kinesis) []target.Client {
return clients
}

// S3Clients resolver method
// GCSClients resolver method
func (f *TargetFactory) GCSClients(config GCS) []target.Client {
clients := make([]target.Client, 0)
if config.Name == "" {
Expand Down Expand Up @@ -283,7 +283,7 @@ func (f *TargetFactory) createSlackClient(config Slack, parent Slack) target.Cli
Name: config.Name,
SkipExistingOnStartup: config.SkipExisting,
ResultFilter: createResultFilter(config.Filter, config.MinimumPriority, config.Sources),
ReportFilter: createReprotFilter(config.Filter),
ReportFilter: createReportFilter(config.Filter),
},
Webhook: config.Webhook,
CustomFields: config.CustomFields,
Expand Down Expand Up @@ -329,7 +329,7 @@ func (f *TargetFactory) createLokiClient(config Loki, parent Loki) target.Client
Name: config.Name,
SkipExistingOnStartup: config.SkipExisting,
ResultFilter: createResultFilter(config.Filter, config.MinimumPriority, config.Sources),
ReportFilter: createReprotFilter(config.Filter),
ReportFilter: createReportFilter(config.Filter),
},
Host: config.Host + config.Path,
CustomLabels: config.CustomLabels,
Expand Down Expand Up @@ -391,7 +391,7 @@ func (f *TargetFactory) createElasticsearchClient(config Elasticsearch, parent E
Name: config.Name,
SkipExistingOnStartup: config.SkipExisting,
ResultFilter: createResultFilter(config.Filter, config.MinimumPriority, config.Sources),
ReportFilter: createReprotFilter(config.Filter),
ReportFilter: createReportFilter(config.Filter),
},
Host: config.Host,
Username: config.Username,
Expand Down Expand Up @@ -427,7 +427,7 @@ func (f *TargetFactory) createDiscordClient(config Discord, parent Discord) targ
Name: config.Name,
SkipExistingOnStartup: config.SkipExisting,
ResultFilter: createResultFilter(config.Filter, config.MinimumPriority, config.Sources),
ReportFilter: createReprotFilter(config.Filter),
ReportFilter: createReportFilter(config.Filter),
},
Webhook: config.Webhook,
CustomFields: config.CustomFields,
Expand Down Expand Up @@ -471,7 +471,7 @@ func (f *TargetFactory) createTeamsClient(config Teams, parent Teams) target.Cli
Name: config.Name,
SkipExistingOnStartup: config.SkipExisting,
ResultFilter: createResultFilter(config.Filter, config.MinimumPriority, config.Sources),
ReportFilter: createReprotFilter(config.Filter),
ReportFilter: createReportFilter(config.Filter),
},
Webhook: config.Webhook,
CustomFields: config.CustomFields,
Expand Down Expand Up @@ -523,7 +523,7 @@ func (f *TargetFactory) createWebhookClient(config Webhook, parent Webhook) targ
Name: config.Name,
SkipExistingOnStartup: config.SkipExisting,
ResultFilter: createResultFilter(config.Filter, config.MinimumPriority, config.Sources),
ReportFilter: createReprotFilter(config.Filter),
ReportFilter: createReportFilter(config.Filter),
},
Host: config.Host,
Headers: config.Headers,
Expand Down Expand Up @@ -587,13 +587,26 @@ func (f *TargetFactory) createS3Client(config S3, parent S3) target.Client {
config.SkipExisting = parent.SkipExisting
}

if !config.BucketKeyEnabled {
config.BucketKeyEnabled = parent.BucketKeyEnabled
}

if config.SseKmsKeyId == "" {
config.SseKmsKeyId = parent.SseKmsKeyId
}

if config.ServerSideEncryption == "" {
config.ServerSideEncryption = parent.ServerSideEncryption
}

s3Client := helper.NewS3Client(
config.AccessKeyID,
config.SecretAccessKey,
config.Region,
config.Endpoint,
config.Bucket,
config.PathStyle,
helper.WithKMS(&config.BucketKeyEnabled, &config.SseKmsKeyId, &config.ServerSideEncryption),
)

sugar.Infof("%s configured", config.Name)
Expand All @@ -603,7 +616,7 @@ func (f *TargetFactory) createS3Client(config S3, parent S3) target.Client {
Name: config.Name,
SkipExistingOnStartup: config.SkipExisting,
ResultFilter: createResultFilter(config.Filter, config.MinimumPriority, config.Sources),
ReportFilter: createReprotFilter(config.Filter),
ReportFilter: createReportFilter(config.Filter),
},
S3: s3Client,
CustomFields: config.CustomFields,
Expand Down Expand Up @@ -675,7 +688,7 @@ func (f *TargetFactory) createKinesisClient(config Kinesis, parent Kinesis) targ
Name: config.Name,
SkipExistingOnStartup: config.SkipExisting,
ResultFilter: createResultFilter(config.Filter, config.MinimumPriority, config.Sources),
ReportFilter: createReprotFilter(config.Filter),
ReportFilter: createReportFilter(config.Filter),
},
CustomFields: config.CustomFields,
Kinesis: kinesisClient,
Expand Down Expand Up @@ -732,7 +745,7 @@ func (f *TargetFactory) createGCSClient(config GCS, parent GCS) target.Client {
Name: config.Name,
SkipExistingOnStartup: config.SkipExisting,
ResultFilter: createResultFilter(config.Filter, config.MinimumPriority, config.Sources),
ReportFilter: createReprotFilter(config.Filter),
ReportFilter: createReportFilter(config.Filter),
},
Client: gcsClient,
CustomFields: config.CustomFields,
Expand Down Expand Up @@ -786,6 +799,9 @@ func (f *TargetFactory) mapSecretValues(config any, ref string) {
if values.SecretAccessKey != "" {
c.SecretAccessKey = values.SecretAccessKey
}
if values.KmsKeyId != "" {
c.SseKmsKeyId = values.KmsKeyId
}

case *Kinesis:
if values.AccessKeyID != "" {
Expand Down Expand Up @@ -824,7 +840,7 @@ func createResultFilter(filter TargetFilter, minimumPriority string, sources []s
)
}

func createReprotFilter(filter TargetFilter) *report.ReportFilter {
func createReportFilter(filter TargetFilter) *report.ReportFilter {
return target.NewReportFilter(
ToRuleSet(filter.ReportLabels),
)
Expand Down
28 changes: 28 additions & 0 deletions pkg/config/target_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func newFakeClient() v1.SecretInterface {
"webhook": []byte("http://localhost:9200/webhook"),
"accessKeyID": []byte("accessKeyID"),
"secretAccessKey": []byte("secretAccessKey"),
"kmsKeyId": []byte("kmsKeyId"),
"token": []byte("token"),
"credentials": []byte("credentials"),
},
Expand Down Expand Up @@ -154,6 +155,26 @@ func Test_ResolveTargetWithoutHost(t *testing.T) {
t.Error("Expected Client to be nil if no bucket is configured")
}
})
t.Run("S3.SSE-S3", func(t *testing.T) {
if len(factory.S3Clients(config.S3{Endpoint: "https://storage.yandexcloud.net", AccessKeyID: "access", SecretAccessKey: "secret", Region: "ru-central1", ServerSideEncryption: "AES256"})) != 0 {
t.Error("Expected Client to be nil if server side encryption is not configured")
}
})
t.Run("S3.SSE-KMS", func(t *testing.T) {
if len(factory.S3Clients(config.S3{Endpoint: "https://storage.yandexcloud.net", AccessKeyID: "access", SecretAccessKey: "secret", Region: "ru-central1", ServerSideEncryption: "aws:kms"})) != 0 {
t.Error("Expected Client to be nil if server side encryption is not configured")
}
})
t.Run("S3.SSE-KMS-S3-KEY", func(t *testing.T) {
if len(factory.S3Clients(config.S3{Endpoint: "https://storage.yandexcloud.net", AccessKeyID: "access", SecretAccessKey: "secret", Region: "ru-central1", BucketKeyEnabled: true, ServerSideEncryption: "aws:kms"})) != 0 {
t.Error("Expected Client to be nil if server side encryption is not configured")
}
})
t.Run("S3.SSE-KMS-KEY-ID", func(t *testing.T) {
if len(factory.S3Clients(config.S3{Endpoint: "https://storage.yandexcloud.net", AccessKeyID: "access", SecretAccessKey: "secret", Region: "ru-central1", ServerSideEncryption: "aws:kms", SseKmsKeyId: "SseKmsKeyId"})) != 0 {
t.Error("Expected Client to be nil if server side encryption is not configured")
}
})
t.Run("Kinesis.Endoint", func(t *testing.T) {
if len(factory.KinesisClients(config.Kinesis{})) != 0 {
t.Error("Expected Client to be nil if no endpoint is configured")
Expand Down Expand Up @@ -293,6 +314,13 @@ func Test_GetValuesFromSecret(t *testing.T) {
}
})

t.Run("Get S3 values from Secret with KMS", func(t *testing.T) {
clients := factory.S3Clients(config.S3{SecretRef: secretName, Endpoint: "endoint", Bucket: "bucket", Region: "region", BucketKeyEnabled: true, ServerSideEncryption: "aws:kms"})
if len(clients) != 1 {
t.Error("Expected one client created")
}
})

t.Run("Get Kinesis values from Secret", func(t *testing.T) {
clients := factory.KinesisClients(config.Kinesis{SecretRef: secretName, Endpoint: "endpoint", StreamName: "stream", Region: "region"})
if len(clients) != 1 {
Expand Down

0 comments on commit b451582

Please sign in to comment.