-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lambda-promtail: Add support for processing SQS messages, add promtailClient Type, add logger, upgrade dependencies and fix unexpected flushing behaviors #8231
Conversation
./tools/diff_coverage.sh ../loki-target-branch/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. + ingester 0%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
+ loki 0% |
./tools/diff_coverage.sh ../loki-target-branch/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. + ingester 0%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
+ loki 0% |
3 similar comments
./tools/diff_coverage.sh ../loki-target-branch/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. + ingester 0%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
+ loki 0% |
./tools/diff_coverage.sh ../loki-target-branch/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. + ingester 0%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
+ loki 0% |
./tools/diff_coverage.sh ../loki-target-branch/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. + ingester 0%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
+ loki 0% |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few minor comments
reviewing this I also started thinking that it might make sense to implement a client
type within lambda promtail that we could attach functions to, and it could store the promtailClient
reference rather than creating more package level globals
@@ -51,7 +52,7 @@ func getS3Object(ctx context.Context, labels map[string]string) (io.ReadCloser, | |||
s3Client = s3.NewFromConfig(cfg) | |||
s3Clients[labels["bucket_region"]] = s3Client | |||
} | |||
|
|||
fmt.Println("fetching", labels["key"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you mean to include this println?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used it for debugging purposes and ended up leaving it there since it's a useful piece of information (it helps immediately know what event/file is being processed) and because it prints only 1 line per lambda invocation (so it's not like it's going to cost a lot in Cloudwatch logs).
As the code is growing maybe we should have a more structured approach.
Maybe we could remove this line and open up another MR to use the standard logger (and set the LOG_LEVEL or DEBUG using an environment variable)? WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with keeping as is or implementing something more structured.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I implemented the logger in my latest commit, I went with go-kit since it's the one used in the main project
Thanks! I will update this PR with your comments and the new client type ✌️ |
./tools/diff_coverage.sh ../loki-target-branch/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. + ingester 0%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
+ loki 0% |
./tools/diff_coverage.sh ../loki-target-branch/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. + ingester 0%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
+ loki 0% |
Looks like there's a conflict with your CHANGELOD.md changes. Might be easiest to remove that commit, merge upstream main into your branch, and then make your changelog entry again. I'll have a look at the client and logger changes in the morning. |
a0b2d22
to
ffdb7a4
Compare
ffdb7a4
to
06bf6a1
Compare
./tools/diff_coverage.sh ../loki-target-branch/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. + ingester 0%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
+ loki 0% |
s3Clients[labels["bucket_region"]] = s3Client | ||
} | ||
fmt.Println("fetching", labels["key"]) | ||
obj, err := s3Client.GetObject(ctx, | ||
&s3.GetObjectInput{ | ||
Bucket: aws.String(labels["bucket"]), | ||
Key: aws.String(labels["key"]), | ||
ExpectedBucketOwner: aws.String(labels["bucketOwner"]), | ||
}) | ||
|
||
if err != nil { | ||
fmt.Printf("Failed to get object %s from bucket %s on account %s\n", labels["key"], labels["bucket"], labels["bucketOwner"]) | ||
return nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we removing the object fetching code here and moving it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gets3Object
function does two different things, it gets an *s3.Client
from the client map (and updates the map if creating a new client), then it fetches the s3 object.
I thought it made more sense to move the client part to a dedicated getS3Client
function (besides, one of the bugs fixed with this MR is actually linked to a "do_something_with_a_client" function with the said client being created in the same function).
Doing so left the gets3Object
function as a wrapper just calling s3.getObject
without any other action, so it made little sense to keep it, which is why I moved it back to processS3Event
.
If it does not make sense and/or if it is not a desired change I can just set it back the way it was. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 got it
16ee004
to
48a597b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Docs squad] Couple of small suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[docs sqad] docs look good, I'll leave it to engineering to approve the code.
I appreciate the bug fixes 👍 Having never used SQS myself I can't speak to the validity of the SQS specific code or how you're talking about using it, but I trust that you or other users will be using it enough to point out any issues that need fixing going forward. I'll leave this for a day just to have one last read through and see if anyone else has comments, and then merge. |
Glad I could help. I've been running this code for days/weeks now and it runs smoothly. Thanks for the review. ✌️ |
eabe6df
to
6c54cd5
Compare
hey @CCOLLOT my bad for leaving this unmerged, do you mind just resolving the conflicts? I will merge after that |
There you go. I had to fix |
…8700) ## Add Terraform sample code to process AWS S3 log files through an SQS queue Lambda-promtail supports processing AWS s3 logs files through an SQS queue since #8231 As explained in the [documentation section of lambda-promtail](https://github.com/grafana/loki/blob/main/docs/sources/clients/lambda-promtail/_index.md#triggering-lambda-promtail-via-sqs:~:text=Triggering%20Lambda%2DPromtail,DLQ%20redrive%20feature.), this can be leveraged to re-process logs that lambda-promtail failed to process (or send to loki) using a main SQS queue and a secondary SQS dead-letter queue. AWS has a feature called `SQS redrive`, which enables routing messages pending in the DLQ back to the main (source) queue. This PR demonstrates how to spin up this architecture using terraform.
It looks like grafana#8231 clashed with grafana#8750 in such a way that causes applying the module to fail.
) **What this PR does / why we need it**: Currently, the Terraform module for lambda-promtail is not working due to a misnamed reference: ``` ╷ │ Error: Reference to undeclared resource │ │ on .terraform/modules/lambda_promtail/tools/lambda-promtail/sqs.tf line 66, in resource "aws_iam_role_policy_attachment" "lambda_sqs_execution": │ 66: role = aws_iam_role.iam_for_lambda.name │ │ A managed resource "aws_iam_role" "iam_for_lambda" has not been declared in │ module.lambda_promtail. ╵ ``` It looks like this was the result of a conflict with #8231 and #8750. While I was in there I also refactored the SQS work to be more consistent with #8750. I also made the queue name prefix configurable with the `sqs_queue_name_prefix` variable, as it was hardcoded before which could cause issues if multiple instances of this module are initialized in the same AWS account and region. **Which issue(s) this PR fixes**: Fixes #<issue number> **Special notes for your reviewer**: **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](d10549e)
What this PR does / why we need it:
Context
I want to :
AWS SQS redrive
feature to process logs after fixing an outage in the ingestion flow and I am not limited by lambda giving up after 2 (default) retries.I faced several issues:
BATCH_SIZE
. The current implementation does not reset the size of the batch, leading to a flush to loki/promtail for every single added line after reachingBATCH_SIZE
(and making the below file descriptor issue much worse because thehttp.Client
is currently created in thesend
function.Added Feature
Fixed bugs
flushBatch
function would reset the stream after flushing but it wouldn't reset the size of the batch, leading to lambda-promtail flushing every single following line toloki/promtail
. In my case it led to logs being processed very slowly by lambda when the total size is way above the defaultbatchSize
. I would reach the 15 min timeout on AWS lambda for almost every single file.http.Client
used to flush logs toloki/promtail
was being recreated thousands of times, which made the process quickly run out of file descriptors (limited to 1024 in AWS lambda). I set it as a global variable to avoid impacting other parts of the code for now but if required we could do it a little cleaner via dependency injection.After fixing those bugs my average lambda execution time for vpc flow log files (size between 10mb and 100mb) went from timing out at 15 min to ~3 seconds.
Upgrades:
1.19
go get -u ./...
Other
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)CHANGELOG.md
docs/sources/upgrading/_index.md