-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decompress option on kinesis-consumer #8891
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤝 ✅ CLA has been signed. Thank you!
!retry-failed |
## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding | ||
## is done automatically by the golang sdk, as data is read from kinesis) | ||
## | ||
# decompress = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to allow to specify the compression algorithm in order to be more flexible? What if they decide to use snappy
or similar in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good, but it might be beneficial to allow the user to specify the decompression/compression algorithm (i.e.
"gzip"
for now) to be future proof. What do you think?
Sounds sensible. How's something like the following sound:
decompress: "gzip"
snappy could requested like the following, i.e:
decompress: "snappy"
I can look at adding in the support for snappy at the same time (would be an extra dependency I think).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds exactly like what I had in mind. :-) You don't need to add snappy now but I like to be prepared. :-) So only having "gzip"
or ""
is fine for me. If we hit other compression types we can also add later. But in case you are already aware of a use-case, you can of course add it in one go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh and please use internal.choice
for testing the valid values... ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know how you feel about this change to decompress_with
. I've added zlib
as an option; but left out the snappy for now (extra dependency required to do the snappy; whereas zlib is standard in golang). I could always look at sending an additional PR for snappy. I added a test for the gzip/zlib/none options.
Happy to make changes. Thanks for the help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried doing a !retry-failed; but it did not seem to make the CircleCI happier. Unfortunately I can't see what the output of the circle ci tests are; would you be able to provide me the details? Locally I'm unable to see failures related to the PR. (The permissions CircleCI is wanting are very permissive; and is wanting access to all of my repos; private and public).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good, but it might be beneficial to allow the user to specify the decompression/compression algorithm (i.e. "gzip"
for now) to be future proof. What do you think?
!retry-failed |
## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding | ||
## is done automatically by the golang sdk, as data is read from kinesis) | ||
## | ||
# decompress_with = "none" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH the name is a bit bulky. I think we should either stick with decompress
or compression
.
gzipDecompression = "gzip" | ||
noDecompression = "none" | ||
zlibDecompression = "zlib" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is overdoing, just use the supported
list to track what is possible.
) | ||
|
||
var supportedDecompressionAlgorithms = []string{gzipDecompression, noDecompression, zlibDecompression} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this to Init()
instead of declaring a global variable and directly define the list instead of using the constants.
@@ -238,8 +260,47 @@ func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error { | |||
return nil | |||
} | |||
|
|||
func gzipDecompress(data []byte) ([]byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only cosmetics, but can you please change the name to decompressGzip()
to match the other names starting with an action...
func (k *KinesisConsumer) Init() error { | ||
err := choice.Check(k.DecompressWith, supportedDecompressionAlgorithms) | ||
if err != nil { | ||
return fmt.Errorf(`cannot verify "decompress_with" setting: %v`, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You also might want to log the valid options for the user.
## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding | ||
## is done automatically by the golang sdk, as data is read from kinesis) | ||
## | ||
# decompress_with = "none" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making ""
a synonym for none
? This way you do get the default for free. :-)
func (k *KinesisConsumer) getRecordData(r *consumer.Record) ([]byte, error) { | ||
return k.getDecompressionFunc(k.DecompressWith)(r.Data) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about determining the decompression function once in Init()
and store it in KinesisConsumer
, e.g. as decompress
? Then call k.decompress(r.Data)
in onMessage()
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I have some minor comments to simplify the code a bit more (hopefullly). :-) Oh and please fix the formatting issues found by CI...
gzipDecompression, noDecompression, zlibDecompression := "gzip", "none", "zlib" | ||
err := choice.Check(k.DecompressionType, []string{gzipDecompression, noDecompression, zlibDecompression}) | ||
if err != nil { | ||
return fmt.Errorf(`cannot verify "decompress" setting: %v`, err) | ||
} | ||
|
||
switch k.DecompressionType { | ||
case gzipDecompression: | ||
k.decompressionFunc = decompressGzip | ||
case zlibDecompression: | ||
k.decompressionFunc = decompressZlib | ||
default: | ||
k.decompressionFunc = decompressNoOp | ||
} | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do:
gzipDecompression, noDecompression, zlibDecompression := "gzip", "none", "zlib" | |
err := choice.Check(k.DecompressionType, []string{gzipDecompression, noDecompression, zlibDecompression}) | |
if err != nil { | |
return fmt.Errorf(`cannot verify "decompress" setting: %v`, err) | |
} | |
switch k.DecompressionType { | |
case gzipDecompression: | |
k.decompressionFunc = decompressGzip | |
case zlibDecompression: | |
k.decompressionFunc = decompressZlib | |
default: | |
k.decompressionFunc = decompressNoOp | |
} | |
return nil | |
switch k.DecompressionType { | |
case "gzip": | |
k.decompressionFunc = decompressGzip | |
case "zlib": | |
k.decompressionFunc = decompressZlib | |
case "none", "": | |
k.decompressionFunc = decompressNoOp | |
default: | |
return fmt.Errorf("unknown decompression %q", k.DecompressionType) | |
} | |
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still some small potential for beauty (at least in my view)... ;-) See my comment.
type TestTrackingAccumulator struct { | ||
telegraf.TrackingAccumulator | ||
Metrics *[]telegraf.Metric | ||
} | ||
|
||
func (t TestTrackingAccumulator) AddTrackingMetricGroup(group []telegraf.Metric) telegraf.TrackingID { | ||
*t.Metrics = append(*t.Metrics, group...) | ||
return 1 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the tracking accumulator provided by testutil
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the tracking accumulator provided by `testutil.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
!retry-failed |
Is decompression something the SDKs we use should do? It seems like if decoding base64 is handled for us decompression should be too. I noticed that aws-sdk-go has a new v2 release. Is it included there? I like how the new setting lets the user choose a compression algorithm. Many of the existing plugins have a setting just like this but they call it content_encoding. For example, the influxdb_v2 output, amqp_consumer input. Even though this plugin doesn't use http directly, I think it would make sense to call the setting content_encoding here too. |
Hi there,
Yup, makes sense to keep a consistent language across the plugins. I'm happy to update to call it
I do not know if the v2 sdk performs the auto base64 decoding like that of the v1 (https://github.com/aws/aws-sdk-go/blob/master/service/kinesis/api.go#L7022). Also unknown I'm unsure if the v2 sdk would perform auto decompression of compressed data (my gut "guess" would be that it doesn't) However, I'm thinking that that a SDK update for this plugin, isn't for this PR to implement; and would be best placed for a separate isolated upgrade PR (It may take a complete re-implementation). |
Let's go ahead and rename it to content_encoding. I'll merge it when this is complete |
} | ||
}) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tootedom I think the linter is complaining about this empty line. We need CI to pass before we can merge, including the linter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many thanks for the pointer. I was failing miserably at installing revive locally (still not fathomed it).
Have removed the line, and pushed. Linter looks happy.
!retry-failed |
Description
We are wishing to use the kinesis_consumer input to consume a kinesis stream that is being populated via a cloudwatch logs filter subscription. When AWS pushes cloudwatch logs to the kinesis destination it is zipping the logs and base64 encoding them.
The AWS sdk for go, is under the hood is automatically base64 decoding the data (a blob) that the kinesis consumer obtains from the kinesis record (r.Data). The result is a zip []byte that we cannot subsequently consume/manipulate/use in telegraf.
PR update
Adding of a
decompress
option (default is false for backwards compatibility), to allow the r.Data to be decompressed if the user chooses to.This will then allow for the consuming of cloudwatch log data that is streamed to kinesis.
I used the standard go implementation for the uncompress, unsure if you wanted to use something like (I didn't want to add an dependency, without an ok so have left to using the golang standard lib):