-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
lambda-promtail: Add ability to ingest logs from S3 #5065
Conversation
am I supposed to manually edit the changelog? the PR template is a little ambiguous to me |
I can't speak to the cloudformation (not my jam) but we should add the variables (bucket name, prefixes) and lambda permissions and s3 event notification to the terraform to support this. |
@AndreZiviani yes, please manually add a changelog entry to this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good. I don't know as much about s3 and it's metadata so I will double check docs there before reading the s3 specific go code here.
c8cd46f
to
9c9f4a9
Compare
9c9f4a9
to
f560050
Compare
f167211
to
74f88ac
Compare
sorry for those force pushes, I made a mistake when merging/rebasing... |
@AndreZiviani let me know if you need help with any of the outstanding comments :) |
thanks @cstyan, I had to focus on other things this past week, will try to address your suggestions on Monday/Tuesday |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Test failures don't look related, just a failed git clone for a sub job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, this is looking good! Would you mind updating the docs with this new functionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great! this is what i already did to make mine work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs portion of the PR looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, this is looking good. Thanks for the PR! I've left a few suggestions here that could improve the approach, but I also think we could do those in a followup PR if you prefer (either done by yourself or someone else, maybe @cstyan?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, a few more comments but they're mostly style nits.
I think it would be a good idea to eventually move the global variables we have in main.go
that we're now using within other files for batchSize
to a struct of config options or something like that, but that doesn't need to happen in this PR.
if stream, ok := b.streams[labels]; ok { | ||
stream.Entries = append(stream.Entries, e.entry) | ||
return | ||
|
||
b.size += stream.Size() | ||
|
||
} else { | ||
|
||
b.streams[labels] = &logproto.Stream{ | ||
Labels: labels, | ||
Entries: []logproto.Entry{e.entry}, | ||
} | ||
|
||
b.size += b.streams[labels].Size() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit again: we could simplify to something like this
stream, ok := b.streams[labels];
if !ok {
b.streams[labels] = &logproto.Stream{
Labels: labels,
}
stream = b.stream[labels]
}
stream.Entries = append(stream.Entries, e.entry)
b.size += stream.Size()
return nil, err | ||
var s3Client *s3.Client | ||
|
||
if c, ok := s3Clients[labels["bucket_region"]]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we only need a single client per bucket region? there isn't the possibility of different buckets in the same region requiring different clients for auth reasons?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect we can ignore this for now as multiple regions/bucket auth can be attached to this lambda.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think multiple buckets with different auth options would increase the complexity, it would be easier to configure another lambda with another role
@@ -38,7 +38,7 @@ func parseCWEvent(ctx context.Context, b *batch, ev *events.CloudwatchLogsEvent) | |||
} | |||
|
|||
func processCWEvent(ctx context.Context, ev *events.CloudwatchLogsEvent) error { | |||
batch := newBatch() | |||
batch, _ := newBatch(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not swallow this error. Perhaps newBatch
shouldn't accept entries and then newBatch
won't need to return an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the batch will always be empty here it should never fail, thats why I ignored it
if stream, ok := b.streams[labels]; ok { | ||
stream.Entries = append(stream.Entries, e.entry) | ||
|
||
b.size += stream.Size() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Size()
method doesn't actually measure what we're expecting (it's part of the generated protobuf). Instead, I'd keep track of the line's bytel length as they're added, like so:
https://github.com/grafana/loki/blob/main/clients/pkg/promtail/client/batch.go#L44-L45
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, will fix that
return nil, err | ||
var s3Client *s3.Client | ||
|
||
if c, ok := s3Clients[labels["bucket_region"]]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect we can ignore this for now as multiple regions/bucket auth can be attached to this lambda.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, let's get this merged :)
What this PR does / why we need it:
Allows lambda-promtail to ingest load balancer logs stored on S3, before it was only possible to ingest logs from CloudWatch
Which issue(s) this PR fixes:
Special notes for your reviewer:
I've made it somewhat modular allowing to plug new datasources (e.g. SNS, SQS...) on the same lambda function easier but it is not trivial to have the same lambda receive events from multiple sources in go (it's easy on other languages) that's why I changed the event parameter to a map of interface.
Since Loki now accepts out of order writes I'm sending the timestamp from the log itself.
I've also copied the batch feature from promtail itself because a S3 log can be quite big.
Most of the changes are either inspired or straight copied from promtail source code.
Checklist
CHANGELOG.md
about the changes.