-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[cmd/serverless] add support for Logs Collection in the Datadog Agent Extension #6861
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…rwarder interface.
…s to the logs agent Introducing the `logs/input/channel` implementation supporting receiving logs through a Go channel.
…ssages. This has been introduced to not change the original pipeline process. The original `Auditor` has been renamed `RegistryAuditor`. `Auditor` is now an interface for both `RegistryAuditor` and `NullAuditor`.
This commit is also adding the reading of the function ARN from the invoke event.
…sing the DogStatsD server. Currently supported enhanced metrics: billed_duration, duration, init_duration, max_memory_used.
This commit also adds the parsing of the time in the AWS log.
…d ARN support. This way, we can use the timestamp from the AWS LogMesage while sending the logs to the intake. This is what this commit is doing.
…rics and logs. For feature-parity with the Datadog Forwarder, we want to use this configuration entry / environment variable to append tags to every metric and log sent by the extension.
|
||
// Loop terminates when the channel is closed. | ||
for logline := range t.inputChan { | ||
origin := message.NewOrigin(t.source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also update t.source.BytesRead
to be consistent with the other tailers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall it looks good to me and fits pretty well within the existing Logs Agent code.
Instead of spawning a new HTTP server (using yet another tcp port), we are re-using the already spawned HTTP server which is already used to communicate with the libraries. It also means there is no more configuration field to change the HTTP. This may have to be addressed if 8124 is not available for some reasons.
e1ac48c
to
7f24782
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the logs agent part it looks good to me. I think it will works pretty well.
// buildCommonFormat returns the log common format seelog string | ||
func buildCommonFormat(loggerName LoggerName) string { | ||
return fmt.Sprintf("%%Date(%s) | %s | %%LEVEL | %%Msg%%n", getLogDateFormat(), loggerName) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just curious, why do we remove contextual information from agent log when running in the serverless context ?
I guess here it's not a problem not to check for JMXFetch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are mimicking the logs that users are used to have while running functions in AWS Lambda (through our clients libraries for instance).
if loggerName == "JMXFETCH" { | ||
return `%Msg%n` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be included in pkg/config/log_format.go
, for both buildCommonFormat
and buildJSONFormat
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice catch, this must be a merge error or merge artifact.
This must be a merge error/artifact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just a quick question about tagging, will be there any tag to specifically isolate one lambda execution stream ?
@prognant thanks for the reviews... 🙇
You mean to identify that data sent by the Agent has been sent through a Lambda using the Extension? The |
I was thinking about isolating/filtering logs (could be metrics) emitted by one single execution of a given lambda function. I.e. if a lambda function has a huge number of concurrent invocation, having a tag (like a unique uuid for each lambda execution) per lambda execution would be useful, WDYT ? |
Oh sorry, I've read your question too fast! We already support that with the request ID which is set here. This request ID is then sent in the JSON and it is available in the Datadog app, for instance in the log explorer, or to filter by a given request id, etc. 👍 |
… Extension (DataDog#6861) * Add the Serverless Datadog Agent implementation. See https://docs.datadoghq.com/serverless/datadog_lambda_library/extension/ * Fix 3rd party license list since new pkg are imported from the AWS SDK. * forwarder: adapt the SyncDefaultForwarder to recent changes in the Forwarder interface. * aggregator: fix unit tests with Flush now being a public function. * serverless: linter compliance. * serverless: errcheck compliance. * serverless: unused code linter. * serverless: linter compliance. * serverless: add http server listening for server logs and sending logs to the logs agent Introducing the `logs/input/channel` implementation supporting receiving logs through a Go channel. * serverless/logs: introduce a `NullAuditor` not doing anything with messages. This has been introduced to not change the original pipeline process. The original `Auditor` has been renamed `RegistryAuditor`. `Auditor` is now an interface for both `RegistryAuditor` and `NullAuditor`. * serverless: remove debug messages * serverless/logs: add a sync `Flush()` method to the logs pipeline. This commit is also adding the reading of the function ARN from the invoke event. * serverless/logs: remove unwanted debug lines. * serverless/logs: read REPORT messages and send AWS enhanced metrics using the DogStatsD server. Currently supported enhanced metrics: billed_duration, duration, init_duration, max_memory_used. * serverless/logs: enhanced metrics are distribution. * serverless/logs: comment on the sync flush of logs then statsd. * serverless/logs: configurable logs type we're subscribing to. This commit also adds the parsing of the time in the AWS log. * serverless/arn: remove version from the ARN string if any. * serverless/logs: create aws package containing both AWS LogMessage and ARN support. This way, we can use the timestamp from the AWS LogMesage while sending the logs to the intake. This is what this commit is doing. * serverless/logs: parse time up to the millisecond. * serverless/logs: send the enhanced metrics with the proper timestamp. * serverless/logs: comments and send the report log to the intake. * serverless/logs: send aws.lambda.enhanced.memorysize * serverless/logs: send formated platform logs. * serverless/logs: cleaning comments here and there. * config: fix log conflict while adding the serverless format. * serverless/logs: linter compliance. * cmd/serverless: start -> run * dogstatsd: add an unit test in serverless mode. * tasks: remove serverless test tag * forwarder: rename `SyncDefaultForwarder` to `SyncForwarder` since it's not a default forwarder. * [cmd/serverless] change default port for the HTTP server collecting logs * serverless: support using DD_TAGS to add extra tags while sending metrics and logs. For feature-parity with the Datadog Forwarder, we want to use this configuration entry / environment variable to append tags to every metric and log sent by the extension. * serverless/logs: send the logs with the ARN and RequestId information. * serverless: linter compliance * serverless/logs: use the function name if available as the "service". * serverless: hit the correct domain using the `AWS_LAMBDA_RUNTIME_API` env var. * serverless: re-use the daemon http server to receive the logs from AWS. Instead of spawning a new HTTP server (using yet another tcp port), we are re-using the already spawned HTTP server which is already used to communicate with the libraries. It also means there is no more configuration field to change the HTTP. This may have to be addressed if 8124 is not available for some reasons. * dogstatsd: fix a unit test. * dogstatsd: fix a unit test. * tasks/dogstatsd: slighlty increase the maximum valid size for the dogstatsd binary. * misc: use the correct log-format for non-serverless logs. This must be a merge error/artifact.
What does this PR do?
This PR adds support for Logs collection in the Datadog Agent Extension.
Log collection
In order to collect the logs from the environment, the extension is adding a route
/lambda/logs
into the extension HTTP server (running on port 8124) on which it is receiving logs from the AWS environment, these logs are the logs of the function, of the extensions and of the platform.The
pkg/serverless/aws
package has been introduced with methods helping to get the ARN of the running function + the parsing of the log message received on the HTTP server.Logs aggregation / processing
An instance of the Logs Agent is started in the extension if the
DD_LOGS_ENABLED
flag is set. This Logs Agent use the regular pipeline except that:NullAuditor
(not writing any registry).pkg/logs/input/channel
). Note that it is specialized for AWS logs message but could be more generic with additional code if needed.serverless
, it is not appending any hostname to the logs.Synchronous flush of the pipeline
The Extension needs to do a synchronous flush of the buffered data on a signal. Today, it is used to naively flush at the end of the execution of the function, however, we already work on adding a smarter flush mechanism.
In order to do so, I had to adapt different parts of the pipeline:
Pipeline.Flush
, triggering a flush of the Processor and of the Sender.Processor.Flush
, sending buffered message to the processor / encoderSender.Flush
, sending the buffered data to the BatchStrategy senderBatchStrategy.Flush
to synchronously send the HTTP request to the intake to flush the data.A mutex has been introduced and the
select
were all modified to support a signal for the flush. This may have some performances impact for which I want to add benchmarks.Miscellaneous changes
message.NewMessageWithTime
to send in the pipeline a log with a known/given timestampAuditor
has been renamedRegistryAuditor
and an interfaceAuditor
has been introduced instead (to create theNullAuditor
)