Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decompress option on kinesis-consumer #8891

Merged
merged 11 commits into from
Mar 18, 2021

Conversation

tootedom
Copy link
Contributor

Description

We are wishing to use the kinesis_consumer input to consume a kinesis stream that is being populated via a cloudwatch logs filter subscription. When AWS pushes cloudwatch logs to the kinesis destination it is zipping the logs and base64 encoding them.

The AWS sdk for go, is under the hood is automatically base64 decoding the data (a blob) that the kinesis consumer obtains from the kinesis record (r.Data). The result is a zip []byte that we cannot subsequently consume/manipulate/use in telegraf.

PR update

Adding of a decompress option (default is false for backwards compatibility), to allow the r.Data to be decompressed if the user chooses to.

This will then allow for the consuming of cloudwatch log data that is streamed to kinesis.

I used the standard go implementation for the uncompress, unsure if you wanted to use something like (I didn't want to add an dependency, without an ok so have left to using the golang standard lib):

@telegraf-tiger telegraf-tiger bot added the feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin label Feb 20, 2021
Copy link
Contributor

@telegraf-tiger telegraf-tiger bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤝 ✅ CLA has been signed. Thank you!

@tootedom tootedom changed the title Decompress kinesis Decompress Options on kinesis-consumer Feb 20, 2021
@tootedom tootedom changed the title Decompress Options on kinesis-consumer Decompress option on kinesis-consumer Feb 20, 2021
@srebhan
Copy link
Member

srebhan commented Feb 22, 2021

!retry-failed

## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding
## is done automatically by the golang sdk, as data is read from kinesis)
##
# decompress = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to allow to specify the compression algorithm in order to be more flexible? What if they decide to use snappy or similar in the future?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good, but it might be beneficial to allow the user to specify the decompression/compression algorithm (i.e. "gzip" for now) to be future proof. What do you think?

Sounds sensible. How's something like the following sound:

decompress: "gzip"

snappy could requested like the following, i.e:

decompress: "snappy"

I can look at adding in the support for snappy at the same time (would be an extra dependency I think).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds exactly like what I had in mind. :-) You don't need to add snappy now but I like to be prepared. :-) So only having "gzip" or "" is fine for me. If we hit other compression types we can also add later. But in case you are already aware of a use-case, you can of course add it in one go.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh and please use internal.choice for testing the valid values... ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know how you feel about this change to decompress_with. I've added zlib as an option; but left out the snappy for now (extra dependency required to do the snappy; whereas zlib is standard in golang). I could always look at sending an additional PR for snappy. I added a test for the gzip/zlib/none options.

Happy to make changes. Thanks for the help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried doing a !retry-failed; but it did not seem to make the CircleCI happier. Unfortunately I can't see what the output of the circle ci tests are; would you be able to provide me the details? Locally I'm unable to see failures related to the PR. (The permissions CircleCI is wanting are very permissive; and is wanting access to all of my repos; private and public).

Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good, but it might be beneficial to allow the user to specify the decompression/compression algorithm (i.e. "gzip" for now) to be future proof. What do you think?

@srebhan srebhan self-assigned this Feb 22, 2021
@tootedom
Copy link
Contributor Author

!retry-failed

## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding
## is done automatically by the golang sdk, as data is read from kinesis)
##
# decompress_with = "none"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH the name is a bit bulky. I think we should either stick with decompress or compression.

Comment on lines 75 to 77
gzipDecompression = "gzip"
noDecompression = "none"
zlibDecompression = "zlib"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is overdoing, just use the supported list to track what is possible.

)

var supportedDecompressionAlgorithms = []string{gzipDecompression, noDecompression, zlibDecompression}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this to Init() instead of declaring a global variable and directly define the list instead of using the constants.

@@ -238,8 +260,47 @@ func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error {
return nil
}

func gzipDecompress(data []byte) ([]byte, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only cosmetics, but can you please change the name to decompressGzip() to match the other names starting with an action...

func (k *KinesisConsumer) Init() error {
err := choice.Check(k.DecompressWith, supportedDecompressionAlgorithms)
if err != nil {
return fmt.Errorf(`cannot verify "decompress_with" setting: %v`, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also might want to log the valid options for the user.

## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding
## is done automatically by the golang sdk, as data is read from kinesis)
##
# decompress_with = "none"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making "" a synonym for none? This way you do get the default for free. :-)

Comment on lines 294 to 296
func (k *KinesisConsumer) getRecordData(r *consumer.Record) ([]byte, error) {
return k.getDecompressionFunc(k.DecompressWith)(r.Data)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about determining the decompression function once in Init() and store it in KinesisConsumer, e.g. as decompress? Then call k.decompress(r.Data) in onMessage()...

Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. I have some minor comments to simplify the code a bit more (hopefullly). :-) Oh and please fix the formatting issues found by CI...

Comment on lines 383 to 397
gzipDecompression, noDecompression, zlibDecompression := "gzip", "none", "zlib"
err := choice.Check(k.DecompressionType, []string{gzipDecompression, noDecompression, zlibDecompression})
if err != nil {
return fmt.Errorf(`cannot verify "decompress" setting: %v`, err)
}

switch k.DecompressionType {
case gzipDecompression:
k.decompressionFunc = decompressGzip
case zlibDecompression:
k.decompressionFunc = decompressZlib
default:
k.decompressionFunc = decompressNoOp
}
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do:

Suggested change
gzipDecompression, noDecompression, zlibDecompression := "gzip", "none", "zlib"
err := choice.Check(k.DecompressionType, []string{gzipDecompression, noDecompression, zlibDecompression})
if err != nil {
return fmt.Errorf(`cannot verify "decompress" setting: %v`, err)
}
switch k.DecompressionType {
case gzipDecompression:
k.decompressionFunc = decompressGzip
case zlibDecompression:
k.decompressionFunc = decompressZlib
default:
k.decompressionFunc = decompressNoOp
}
return nil
switch k.DecompressionType {
case "gzip":
k.decompressionFunc = decompressGzip
case "zlib":
k.decompressionFunc = decompressZlib
case "none", "":
k.decompressionFunc = decompressNoOp
default:
return fmt.Errorf("unknown decompression %q", k.DecompressionType)
}
return nil

Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still some small potential for beauty (at least in my view)... ;-) See my comment.

Comment on lines 14 to 22
type TestTrackingAccumulator struct {
telegraf.TrackingAccumulator
Metrics *[]telegraf.Metric
}

func (t TestTrackingAccumulator) AddTrackingMetricGroup(group []telegraf.Metric) telegraf.TrackingID {
*t.Metrics = append(*t.Metrics, group...)
return 1
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the tracking accumulator provided by testutil.

Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the tracking accumulator provided by `testutil.

Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

@srebhan srebhan added the ready for final review This pull request has been reviewed and/or tested by multiple users and is ready for a final review. label Mar 3, 2021
@srebhan
Copy link
Member

srebhan commented Mar 3, 2021

!retry-failed

@reimda
Copy link
Contributor

reimda commented Mar 11, 2021

Is decompression something the SDKs we use should do? It seems like if decoding base64 is handled for us decompression should be too. I noticed that aws-sdk-go has a new v2 release. Is it included there?

I like how the new setting lets the user choose a compression algorithm. Many of the existing plugins have a setting just like this but they call it content_encoding. For example, the influxdb_v2 output, amqp_consumer input. Even though this plugin doesn't use http directly, I think it would make sense to call the setting content_encoding here too.

@tootedom
Copy link
Contributor Author

Hi there,

Many of the existing plugins have a setting just like this but they call it content_encoding. For example, the influxdb_v2 output, amqp_consumer input. Even though this plugin doesn't use http directly, I think it would make sense to call the setting content_encoding here too.

Yup, makes sense to keep a consistent language across the plugins. I'm happy to update to call it content_encoding. Let me know if you all agree, and I'll make that change.


Is decompression something the SDKs we use should do? It seems like if decoding base64 is handled for us decompression should be too. I noticed that aws-sdk-go has a new v2 release. Is it included there?

I do not know if the v2 sdk performs the auto base64 decoding like that of the v1 (https://github.com/aws/aws-sdk-go/blob/master/service/kinesis/api.go#L7022).

Also unknown I'm unsure if the v2 sdk would perform auto decompression of compressed data (my gut "guess" would be that it doesn't)

However, I'm thinking that that a SDK update for this plugin, isn't for this PR to implement; and would be best placed for a separate isolated upgrade PR (It may take a complete re-implementation).

@reimda
Copy link
Contributor

reimda commented Mar 16, 2021

I'm happy to update to call it content_encoding. Let me know if you all agree, and I'll make that change.

Let's go ahead and rename it to content_encoding. I'll merge it when this is complete

}
})
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tootedom I think the linter is complaining about this empty line. We need CI to pass before we can merge, including the linter.

Suggested change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thanks for the pointer. I was failing miserably at installing revive locally (still not fathomed it).
Have removed the line, and pushed. Linter looks happy.

@tootedom
Copy link
Contributor Author

!retry-failed

@reimda reimda merged commit 30830c2 into influxdata:master Mar 18, 2021
jblesener pushed a commit to jblesener/telegraf that referenced this pull request Apr 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin ready for final review This pull request has been reviewed and/or tested by multiple users and is ready for a final review.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants