Skip to content
This repository has been archived by the owner on Sep 2, 2021. It is now read-only.

Commit

Permalink
Better validation of output for call to DescribeStream (for checking …
Browse files Browse the repository at this point in the history
…SSE)
  • Loading branch information
wtait1 committed Mar 27, 2020
1 parent f1cfe41 commit ca32ff7
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 4 deletions.
9 changes: 7 additions & 2 deletions kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ func buildKinesisClient(streamName, streamRole string, logFunc func(string, ...i

sseEnabled, err := streamHasSSE(streamName, kinesisHandle)
if err != nil {
logFunc("Could not determine if SSE is enabled for stream %s: %s", streamName, err)
} else if !sseEnabled {
logFunc(err.Error())
}
if !sseEnabled {
logWarn(fmt.Sprintf("Kinesis stream %s does NOT have Server-Side Encryption (SSE) enabled", streamName))
}
return &kinesisWrapper{
Expand All @@ -86,6 +87,10 @@ func streamHasSSE(streamName string, client kinesisiface.KinesisAPI) (bool, erro
streamInfo, err := client.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: aws.String(streamName),
})
if streamInfo == nil || streamInfo.StreamDescription == nil {
return false, fmt.Errorf("Could not determine if SSE is enabled for stream %s: %w", streamName, err)
}

return *streamInfo.StreamDescription.EncryptionType == kinesis.EncryptionTypeKms, err
}

Expand Down
20 changes: 20 additions & 0 deletions kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,26 @@ func TestGetRegionOverride(t *testing.T) {
}
}

func TestStreamHasSSEError(t *testing.T) {
mockKinesis := &mockKinesisClient{}

hasSSE, err := streamHasSSE("simulate_empty_response", mockKinesis)
if err == nil {
t.Errorf("Expected error, but got <nil>")
}
if hasSSE {
t.Errorf("Expected 'hasSSE' == FALSE, but was TRUE")
}

hasSSE, err = streamHasSSE("success", mockKinesis)
if err != nil {
t.Errorf("Expected no error, but got %s", err)
}
if !hasSSE {
t.Errorf("Expected 'hasSSE' == TRUE, but was FALSE")
}
}

func TestBuildMessages(t *testing.T) {
expected := EventChunk{
ChunkNumber: 0,
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func logWarn(msg string, v ...interface{}) {

func logDebug(msg string, v ...interface{}) {
if flags.debug {
log.Printf("[WARN] "+msg+"\n", v)
log.Printf("[DEBUG] "+msg+"\n", v)
}
}

Expand Down
16 changes: 15 additions & 1 deletion testUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,26 @@ type mockKinesisClient struct {
kinesisiface.KinesisAPI
}

func (m *mockKinesisClient) DescribeStream(inp *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) {
m.timesCalled++
if *inp.StreamName == "simulate_empty_response" {
return &kinesis.DescribeStreamOutput{}, nil
} else {
return &kinesis.DescribeStreamOutput{
StreamDescription: &kinesis.StreamDescription{
StreamName: aws.String("test-stream"),
EncryptionType: aws.String(kinesis.EncryptionTypeKms),
},
}, nil
}
}

func (m *mockKinesisClient) PutRecord(inp *kinesis.PutRecordInput) (*kinesis.PutRecordOutput, error) {
m.timesCalled++
// Used to deterministically simulate an error in the AWS SDK
// See `TestSendToStreamKinesisError` for usage
if *inp.StreamName == "simulate_error" {
return &kinesis.PutRecordOutput{}, fmt.Errorf("simulated error")
return &kinesis.PutRecordOutput{}, fmt.Errorf("simulated service error")
}
return &kinesis.PutRecordOutput{
SequenceNumber: aws.String("a"),
Expand Down

0 comments on commit ca32ff7

Please sign in to comment.