From 23886fb627762ce26fee5a51cee5142b920f58f6 Mon Sep 17 00:00:00 2001 From: Georgi Date: Mon, 8 Apr 2024 15:00:40 +0200 Subject: [PATCH] Store and retry forwarding events in case of exceptions (#759) * Refactor forwarding and logs * Store and retry forwarding events incase of exceptions * Add a retry mechanism to store events on S3 in case of failure. * Multiple functions can use the same bucket to store events * Address comments * Change retry execution to be triggered on custom invocations only --- aws/logs_monitoring/README.md | 11 +- aws/logs_monitoring/forwarder.py | 201 +++++++++++------- aws/logs_monitoring/lambda_function.py | 38 +++- aws/logs_monitoring/logs/helpers.py | 13 ++ aws/logs_monitoring/retry/enums.py | 10 + aws/logs_monitoring/retry/storage.py | 85 ++++++++ aws/logs_monitoring/settings.py | 6 + aws/logs_monitoring/template.yaml | 21 ++ .../integration_tests/docker-compose.yml | 1 + .../trace_forwarder/connection.py | 4 +- 10 files changed, 306 insertions(+), 84 deletions(-) create mode 100644 aws/logs_monitoring/retry/enums.py create mode 100644 aws/logs_monitoring/retry/storage.py diff --git a/aws/logs_monitoring/README.md b/aws/logs_monitoring/README.md index fd5ecdf4e..7be934bec 100644 --- a/aws/logs_monitoring/README.md +++ b/aws/logs_monitoring/README.md @@ -120,7 +120,13 @@ If you can't install the Forwarder using the provided CloudFormation template, y 4. Set the environment variable `DD_ENHANCED_METRICS` to `false` on the Forwarder. This stops the Forwarder from generating enhanced metrics itself, but it will still forward custom metrics from other lambdas. 5. Some AWS accounts are configured such that triggers will not automatically create resource-based policies allowing Cloudwatch log groups to invoke the forwarder. Reference the [CloudWatchLogPermissions][103] to see which permissions are required for the forwarder to be invoked by Cloudwatch Log Events. 6. [Configure triggers][104]. -7. Create an S3 bucket, and set environment variable `DD_S3_BUCKET_NAME` to the bucket name. Also provide `s3:GetObject`, `s3:PutObject`, and `s3:DeleteObject` permissions on this bucket to the Lambda execution role. This bucket is used to store the Lambda tags cache. +7. Create an S3 bucket, and set environment variable `DD_S3_BUCKET_NAME` to the bucket name. Also provide `s3:GetObject`, `s3:PutObject`, `s3:ListBucket`, and `s3:DeleteObject` permissions on this bucket to the Lambda execution role. This bucket is used to store the different tags cache i.e. Lambda, S3, Step Function and Log Group. Additionally, this bucket will be used to store unforwarded events incase of forwarding exceptions. +8. Set environment variable `DD_STORE_FAILED_EVENTS` to `true` to enable the forwarder to also store event data in the S3 bucket. In case of exceptions when sending logs, metrics or traces to intake, the forwarder will store relevant data in the S3 bucket. On custom invocations i.e. on receiving an event with the `retry` keyword set to a non empty string (which can be manually triggered - see below), the forwarder will retry sending the stored events. When successful it will clear up the storage in the bucket. + +```bash +aws lambda invoke --function-name --payload '{"retry":"true"}' out +``` + [101]: https://github.com/DataDog/datadog-serverless-functions/releases [102]: https://app.datadoghq.com/organization-settings/api-keys @@ -138,6 +144,9 @@ If you can't install the Forwarder using the provided CloudFormation template, y If you encounter issues upgrading to the latest version, check the Troubleshooting section. +### Upgrade an older version to +3.107.0 +Starting version 3.107.0 a new feature is added to enable Lambda function to store unforwarded events incase of exceptions on the intake point. If the feature is enabled using `DD_STORE_FAILED_EVENTS` env var, failing events will be stored under a defined dir in the same S3 bucket used to store tags cache. The same bucket can be used to store logs from several Lambda functions under unique subdirs. + ### Upgrade an older version to +3.106.0 Starting version 3.106.0 Lambda function has been updated to add a prefix to cache filenames stored in the S3 bucket configured in `DD_S3_BUCKET_NAME`. This allows to use the same bucket to store cache files from several functions. diff --git a/aws/logs_monitoring/forwarder.py b/aws/logs_monitoring/forwarder.py index c640e1e3f..e09dceafa 100644 --- a/aws/logs_monitoring/forwarder.py +++ b/aws/logs_monitoring/forwarder.py @@ -15,7 +15,9 @@ from logs.datadog_client import DatadogClient from logs.datadog_tcp_client import DatadogTCPClient from logs.datadog_scrubber import DatadogScrubber -from logs.helpers import filter_logs +from logs.helpers import filter_logs, add_retry_tag +from retry.storage import Storage +from retry.enums import RetryPrefix from settings import ( DD_API_KEY, DD_USE_TCP, @@ -25,6 +27,7 @@ DD_PORT, DD_TRACE_INTAKE_URL, DD_FORWARD_LOG, + DD_STORE_FAILED_EVENTS, SCRUBBING_RULE_CONFIGS, INCLUDE_AT_MATCH, EXCLUDE_AT_MATCH, @@ -32,85 +35,139 @@ logger = logging.getLogger() logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) -trace_connection = TraceConnection( - DD_TRACE_INTAKE_URL, DD_API_KEY, DD_SKIP_SSL_VALIDATION -) -def forward(logs, metrics, traces): - """ - Forward logs, metrics, and traces to Datadog in a background thread. - """ - if DD_FORWARD_LOG: - _forward_logs(logs) - - _forward_metrics(metrics) - - if len(traces) > 0: - _forward_traces(traces) - - -def _forward_logs(logs): - """Forward logs to Datadog""" - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Forwarding {len(logs)} logs") - logs_to_forward = filter_logs( - [json.dumps(log, ensure_ascii=False) for log in logs], - include_pattern=INCLUDE_AT_MATCH, - exclude_pattern=EXCLUDE_AT_MATCH, - ) - scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS) - if DD_USE_TCP: - batcher = DatadogBatcher(256 * 1000, 256 * 1000, 1) - cli = DatadogTCPClient(DD_URL, DD_PORT, DD_NO_SSL, DD_API_KEY, scrubber) - else: - batcher = DatadogBatcher(512 * 1000, 4 * 1000 * 1000, 400) - cli = DatadogHTTPClient( - DD_URL, DD_PORT, DD_NO_SSL, DD_SKIP_SSL_VALIDATION, DD_API_KEY, scrubber +class Forwarder(object): + def __init__(self, function_prefix): + self.trace_connection = TraceConnection( + DD_TRACE_INTAKE_URL, DD_API_KEY, DD_SKIP_SSL_VALIDATION ) + self.storage = Storage(function_prefix) - with DatadogClient(cli) as client: - try: + def forward(self, logs, metrics, traces): + """ + Forward logs, metrics, and traces to Datadog in a background thread. + """ + if DD_FORWARD_LOG: + self._forward_logs(logs) + self._forward_metrics(metrics) + self._forward_traces(traces) + + def retry(self): + """ + Retry forwarding logs, metrics, and traces to Datadog. + """ + for prefix in RetryPrefix: + self._retry_prefix(prefix) + + def _retry_prefix(self, prefix): + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Retrying {prefix} data") + + key_data = self.storage.get_data(prefix) + + for k, d in key_data.items(): + if d is None: + continue + match prefix: + case RetryPrefix.LOGS: + self._forward_logs(d, key=k) + case RetryPrefix.METRICS: + self._forward_metrics(d, key=k) + case RetryPrefix.TRACES: + self._forward_traces(d, key=k) + + def _forward_logs(self, logs, key=None): + """Forward logs to Datadog""" + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Forwarding {len(logs)} logs") + + logs_to_forward = [] + for log in logs: + if key: + log = add_retry_tag(log) + logs_to_forward.append(json.dumps(log, ensure_ascii=False)) + + logs_to_forward = filter_logs( + logs_to_forward, INCLUDE_AT_MATCH, EXCLUDE_AT_MATCH + ) + + scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS) + if DD_USE_TCP: + batcher = DatadogBatcher(256 * 1000, 256 * 1000, 1) + cli = DatadogTCPClient(DD_URL, DD_PORT, DD_NO_SSL, DD_API_KEY, scrubber) + else: + batcher = DatadogBatcher(512 * 1000, 4 * 1000 * 1000, 400) + cli = DatadogHTTPClient( + DD_URL, DD_PORT, DD_NO_SSL, DD_SKIP_SSL_VALIDATION, DD_API_KEY, scrubber + ) + + failed_logs = [] + with DatadogClient(cli) as client: for batch in batcher.batch(logs_to_forward): - client.send(batch) + try: + client.send(batch) + except Exception: + logger.exception(f"Exception while forwarding log batch {batch}") + failed_logs.extend(batch) + else: + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Forwarded log batch: {batch}") + if key: + self.storage.delete_data(key) + + if DD_STORE_FAILED_EVENTS and len(failed_logs) > 0 and not key: + self.storage.store_data(RetryPrefix.LOGS, failed_logs) + + send_event_metric("logs_forwarded", len(logs_to_forward) - len(failed_logs)) + + def _forward_metrics(self, metrics, key=None): + """ + Forward custom metrics submitted via logs to Datadog in a background thread + using `lambda_stats` that is provided by the Datadog Python Lambda Layer. + """ + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Forwarding {len(metrics)} metrics") + + failed_metrics = [] + for metric in metrics: + try: + send_log_metric(metric) + except Exception: + logger.exception( + f"Exception while forwarding metric {json.dumps(metric)}" + ) + failed_metrics.append(metric) + else: if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Forwarded log batch: {json.dumps(batch)}") + logger.debug(f"Forwarded metric: {json.dumps(metric)}") + if key: + self.storage.delete_data(key) + + if DD_STORE_FAILED_EVENTS and len(failed_metrics) > 0 and not key: + self.storage.store_data(RetryPrefix.METRICS, failed_metrics) + + send_event_metric("metrics_forwarded", len(metrics) - len(failed_metrics)) + + def _forward_traces(self, traces, key=None): + if not len(traces) > 0: + return + + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Forwarding {len(traces)} traces") + + try: + serialized_trace_paylods = json.dumps(traces) + self.trace_connection.send_traces(serialized_trace_paylods) except Exception: logger.exception( - f"Exception while forwarding log batch {json.dumps(batch)}" + f"Exception while forwarding traces {serialized_trace_paylods}" ) + if DD_STORE_FAILED_EVENTS and not key: + self.storage.store_data(RetryPrefix.TRACES, traces) else: - send_event_metric("logs_forwarded", len(logs_to_forward)) - - -def _forward_metrics(metrics): - """ - Forward custom metrics submitted via logs to Datadog in a background thread - using `lambda_stats` that is provided by the Datadog Python Lambda Layer. - """ - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Forwarding {len(metrics)} metrics") - try: - for metric in metrics: - send_log_metric(metric) if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Forwarded metric: {json.dumps(metric)}") - except Exception: - logger.exception(f"Exception while forwarding metric {json.dumps(metric)}") - else: - send_event_metric("metrics_forwarded", len(metrics)) - - -def _forward_traces(trace_payloads): - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Forwarding {len(trace_payloads)} traces") - try: - trace_connection.send_traces(trace_payloads) - except Exception: - logger.exception( - f"Exception while forwarding traces {json.dumps(trace_payloads)}" - ) - else: - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Forwarded traces: {json.dumps(trace_payloads)}") - send_event_metric("traces_forwarded", len(trace_payloads)) + logger.debug(f"Forwarded traces: {serialized_trace_paylods}") + if key: + self.storage.delete_data(key) + send_event_metric("traces_forwarded", len(traces)) diff --git a/aws/logs_monitoring/lambda_function.py b/aws/logs_monitoring/lambda_function.py index 1b821c13d..b27f5c80a 100644 --- a/aws/logs_monitoring/lambda_function.py +++ b/aws/logs_monitoring/lambda_function.py @@ -17,13 +17,14 @@ from steps.transformation import transform from steps.splitting import split from caching.cache_layer import CacheLayer -from forwarder import forward +from forwarder import Forwarder from settings import ( DD_API_KEY, DD_SKIP_SSL_VALIDATION, DD_API_URL, DD_FORWARDER_VERSION, DD_ADDITIONAL_TARGET_LAMBDAS, + DD_RETRY_KEYWORD, ) @@ -55,6 +56,7 @@ api._cacert = not DD_SKIP_SSL_VALIDATION cache_layer = None +forwarder = None def datadog_forwarder(event, context): @@ -66,31 +68,51 @@ def datadog_forwarder(event, context): if DD_ADDITIONAL_TARGET_LAMBDAS: invoke_additional_target_lambdas(event) - init_cache_layer(context) + function_prefix = get_function_arn_digest(context) + init_cache_layer(function_prefix) + init_forwarder(function_prefix) parsed = parse(event, context, cache_layer) enriched = enrich(parsed, cache_layer) transformed = transform(enriched) metrics, logs, trace_payloads = split(transformed) - forward(logs, metrics, trace_payloads) + forwarder.forward(logs, metrics, trace_payloads) parse_and_submit_enhanced_metrics(logs, cache_layer) + try: + if bool(event.get(DD_RETRY_KEYWORD, False)) is True: + forwarder.retry() + except Exception as e: + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Failed to retry forwarding {e}") + pass -def init_cache_layer(context): + +def init_cache_layer(function_prefix): global cache_layer if cache_layer is None: # set the prefix for cache layer try: - if not cache_layer: - function_arn = context.invoked_function_arn.lower() - prefix = sha1(function_arn.encode("UTF-8")).hexdigest() - cache_layer = CacheLayer(prefix) + if cache_layer is None: + cache_layer = CacheLayer(function_prefix) except Exception as e: logger.exception(f"Failed to create cache layer due to {e}") raise +def init_forwarder(function_prefix): + global forwarder + if forwarder is None: + forwarder = Forwarder(function_prefix) + + +def get_function_arn_digest(context): + function_arn = context.invoked_function_arn.lower() + prefix = sha1(function_arn.encode("UTF-8")).hexdigest() + return prefix + + def invoke_additional_target_lambdas(event): lambda_client = boto3.client("lambda") lambda_arns = DD_ADDITIONAL_TARGET_LAMBDAS.split(",") diff --git a/aws/logs_monitoring/logs/helpers.py b/aws/logs_monitoring/logs/helpers.py index a2606a21e..cd092454d 100644 --- a/aws/logs_monitoring/logs/helpers.py +++ b/aws/logs_monitoring/logs/helpers.py @@ -8,8 +8,11 @@ import re import gzip import os +import json from logs.exceptions import ScrubbingException +from settings import DD_CUSTOM_TAGS, DD_RETRY_KEYWORD + logger = logging.getLogger() logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) @@ -75,3 +78,13 @@ def compileRegex(rule, pattern): raise Exception( "could not compile {} regex with pattern: {}".format(rule, pattern) ) + + +def add_retry_tag(log): + try: + log = json.loads(log) + log[DD_CUSTOM_TAGS] = log.get(DD_CUSTOM_TAGS, "") + f",{DD_RETRY_KEYWORD}:true" + except Exception: + logger.warning(f"cannot add retry tag for log {log}") + + return log diff --git a/aws/logs_monitoring/retry/enums.py b/aws/logs_monitoring/retry/enums.py new file mode 100644 index 000000000..8b39cf9ed --- /dev/null +++ b/aws/logs_monitoring/retry/enums.py @@ -0,0 +1,10 @@ +from enum import Enum + + +class RetryPrefix(Enum): + LOGS = "logs" + METRICS = "metrics" + TRACES = "traces" + + def __str__(self): + return self.value diff --git a/aws/logs_monitoring/retry/storage.py b/aws/logs_monitoring/retry/storage.py new file mode 100644 index 000000000..bc97d327a --- /dev/null +++ b/aws/logs_monitoring/retry/storage.py @@ -0,0 +1,85 @@ +import os +import logging +from time import time +import json +import boto3 +from botocore.exceptions import ClientError +from settings import DD_RETRY_PATH, DD_S3_BUCKET_NAME + +logger = logging.getLogger(__name__) +logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) + + +class Storage(object): + def __init__(self, function_prefix): + self.bucket_name = DD_S3_BUCKET_NAME + self.s3_client = boto3.client("s3") + self.function_prefix = function_prefix + + def get_data(self, prefix): + keys = self._list_keys(prefix) + key_data = {} + for key in keys: + key_data[key] = self._fetch_data_for_key(key) + + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Found {len(keys)} retry keys for prefix {prefix}") + + return key_data + + def store_data(self, prefix, data): + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Storing retry data for prefix {prefix}") + random_suffix = str(time()) + key_prefix = self._get_key_prefix(prefix) + key = f"{key_prefix}{random_suffix}" + serialized_data = self._serialize(data) + try: + self.s3_client.put_object( + Bucket=self.bucket_name, Key=key, Body=serialized_data + ) + except ClientError: + logger.error(f"Failed to store retry data for prefix {prefix}") + + def delete_data(self, key): + try: + self.s3_client.delete_object(Bucket=self.bucket_name, Key=key) + except ClientError: + logger.error(f"Failed to delete retry data for key {key}") + + def _list_keys(self, prefix): + key_prefix = self._get_key_prefix(prefix) + try: + response = self.s3_client.list_objects_v2( + Bucket=self.bucket_name, Prefix=key_prefix + ) + return [content["Key"] for content in response.get("Contents", [])] + except ClientError as e: + logger.error( + f"Failed to list retry keys for prefix {key_prefix} because of {e}" + ) + return [] + + def _fetch_data_for_key(self, key): + try: + response = self.s3_client.get_object(Bucket=self.bucket_name, Key=key) + body = response.get("Body") + data = body.read() + return self._deserialize(data) + except ClientError: + logger.error(f"Failed to fetch retry data for key {key}") + return None + except Exception as e: + logger.error( + f"Failed to deserialize retry data for key {key} because of {e}" + ) + return None + + def _get_key_prefix(self, retry_prefix): + return f"{DD_RETRY_PATH}/{self.function_prefix}/{str(retry_prefix)}/" + + def _serialize(self, data): + return bytes(json.dumps(data).encode("UTF-8")) + + def _deserialize(self, data): + return json.loads(data.decode("UTF-8")) diff --git a/aws/logs_monitoring/settings.py b/aws/logs_monitoring/settings.py index 12838ecf3..62a9c56ef 100644 --- a/aws/logs_monitoring/settings.py +++ b/aws/logs_monitoring/settings.py @@ -270,3 +270,9 @@ def __init__(self, name, pattern, placeholder): GET_RESOURCES_LAMBDA_FILTER = "lambda" GET_RESOURCES_STEP_FUNCTIONS_FILTER = "states" GET_RESOURCES_S3_FILTER = "s3:bucket" + + +# Retyer +DD_RETRY_PATH = "failed_events" +DD_RETRY_KEYWORD = "retry" +DD_STORE_FAILED_EVENTS = get_env_var("DD_STORE_FAILED_EVENTS", "false", boolean=True) diff --git a/aws/logs_monitoring/template.yaml b/aws/logs_monitoring/template.yaml index f664fb9c5..853e729ed 100644 --- a/aws/logs_monitoring/template.yaml +++ b/aws/logs_monitoring/template.yaml @@ -229,6 +229,13 @@ Parameters: Type: String Default: "" Description: The name of the forwarder bucket to create. If not provided, AWS will generate a unique name. + DdStoreFailedEvents: + Type: String + Default: false + AllowedValues: + - true + - false + Description: Set to true to enable the forwarder to store events that failed to send to Datadog. DdForwarderExistingBucketName: Type: String Default: "" @@ -553,6 +560,8 @@ Resources: - SetDdPort - Ref: DdPort - Ref: AWS::NoValue + DD_STORE_FAILED_EVENTS: + Ref: DdStoreFailedEvents REDACT_IP: Fn::If: - SetRedactIp @@ -709,6 +718,17 @@ Resources: Effect: Allow # Get the actual log content from the s3 bucket based on the received s3 event. # Use PermissionsBoundaryArn to limit (allow/deny) access if needed. + - Action: + - s3:ListBucket + Resource: + - Fn::If: + - CreateS3Bucket + - Fn::GetAtt: ForwarderBucket.Arn + - Fn::Sub: "arn:aws:s3:::${DdForwarderExistingBucketName}" + Condition: + StringLike: + s3:prefix: "retry/*" + Effect: Allow - Action: - s3:GetObject Resource: "*" @@ -1121,6 +1141,7 @@ Metadata: - AdditionalTargetLambdaArns - DdForwarderExistingBucketName - DdForwarderBucketName + - DdStoreFailedEvents ParameterLabels: DdApiKey: default: "DdApiKey *" diff --git a/aws/logs_monitoring/tools/integration_tests/docker-compose.yml b/aws/logs_monitoring/tools/integration_tests/docker-compose.yml index bb051ef22..b1277047c 100644 --- a/aws/logs_monitoring/tools/integration_tests/docker-compose.yml +++ b/aws/logs_monitoring/tools/integration_tests/docker-compose.yml @@ -44,6 +44,7 @@ services: DD_FETCH_LAMBDA_TAGS: "true" DD_FETCH_LOG_GROUP_TAGS: "true" DD_FETCH_STEP_FUNCTIONS_TAGS: "false" # intentionally set false to allow integration test for step function logs to run without hitting aws + DD_STORE_FAILED_EVENTS: "false" expose: - 9001 depends_on: diff --git a/aws/logs_monitoring/trace_forwarder/connection.py b/aws/logs_monitoring/trace_forwarder/connection.py index a18df04d3..19fb4850e 100644 --- a/aws/logs_monitoring/trace_forwarder/connection.py +++ b/aws/logs_monitoring/trace_forwarder/connection.py @@ -3,7 +3,6 @@ # This product includes software developed at Datadog (https://www.datadoghq.com/). # Copyright 2021 Datadog, Inc. from ctypes import cdll, Structure, c_char_p, c_int -import json import os @@ -27,8 +26,7 @@ def __init__(self, root_url, api_key, insecure_skip_verify): insecure_skip_verify, ) - def send_traces(self, trace_payloads): - serialized_trace_paylods = json.dumps(trace_payloads) + def send_traces(self, serialized_trace_paylods): had_error = ( self.lib.ForwardTraces(make_go_string(serialized_trace_paylods)) != 0 )