|
8 | 8 | import logging
|
9 | 9 | import os
|
10 | 10 | import warnings
|
11 |
| -from typing import TYPE_CHECKING, Literal, overload |
| 11 | +from typing import TYPE_CHECKING, Any, Literal, overload |
12 | 12 |
|
13 | 13 | import boto3
|
14 | 14 |
|
15 | 15 | from aws_lambda_powertools.shared import constants
|
16 | 16 | from aws_lambda_powertools.shared.functions import resolve_max_age
|
17 | 17 | from aws_lambda_powertools.shared.json_encoder import Encoder
|
18 |
| -from aws_lambda_powertools.utilities.parameters.base import BaseProvider |
| 18 | +from aws_lambda_powertools.utilities.parameters.base import BaseProvider, transform_value |
19 | 19 | from aws_lambda_powertools.utilities.parameters.constants import DEFAULT_MAX_AGE_SECS, DEFAULT_PROVIDERS
|
20 |
| -from aws_lambda_powertools.utilities.parameters.exceptions import SetSecretError |
| 20 | +from aws_lambda_powertools.utilities.parameters.exceptions import ( |
| 21 | + GetSecretError, |
| 22 | + SetSecretError, |
| 23 | + TransformParameterError, |
| 24 | +) |
21 | 25 | from aws_lambda_powertools.warnings import PowertoolsDeprecationWarning
|
22 | 26 |
|
23 | 27 | if TYPE_CHECKING:
|
@@ -126,11 +130,159 @@ def _get(self, name: str, **sdk_options) -> str | bytes:
|
126 | 130 |
|
127 | 131 | return secret_value["SecretBinary"]
|
128 | 132 |
|
129 |
| - def _get_multiple(self, path: str, **sdk_options) -> dict[str, str]: |
| 133 | + def _get_multiple(self, names: list[str], **sdk_options) -> dict[str, Any]: # type: ignore[override] |
130 | 134 | """
|
131 |
| - Retrieving multiple parameter values is not supported with AWS Secrets Manager |
| 135 | + Retrieve multiple secrets using AWS Secrets Manager batch_get_secret_value API |
| 136 | +
|
| 137 | + Parameters |
| 138 | + ---------- |
| 139 | + names: list[str] |
| 140 | + List of secret names to retrieve |
| 141 | + sdk_options: dict, optional |
| 142 | + Additional options passed to batch_get_secret_value API call |
| 143 | +
|
| 144 | + Returns |
| 145 | + ------- |
| 146 | + dict[str, str] |
| 147 | + Dictionary mapping secret names to their values |
| 148 | +
|
| 149 | + Raises |
| 150 | + ------ |
| 151 | + GetParameterError |
| 152 | + When the parameter provider fails to retrieve secrets |
132 | 153 | """
|
133 |
| - raise NotImplementedError() |
| 154 | + |
| 155 | + # Merge filters: combine names with any additional filters from sdk_options |
| 156 | + filters = sdk_options.get("Filters", []) |
| 157 | + name_filter = {"Key": "name", "Values": names} |
| 158 | + |
| 159 | + # Add name filter to existing filters |
| 160 | + filters.append(name_filter) |
| 161 | + sdk_options["Filters"] = filters |
| 162 | + |
| 163 | + # Remove SecretIdList if present to avoid conflicts |
| 164 | + sdk_options.pop("SecretIdList", None) |
| 165 | + |
| 166 | + secrets: dict[str, Any] = {} |
| 167 | + next_token = None |
| 168 | + |
| 169 | + # Handle pagination automatically |
| 170 | + while True: |
| 171 | + if next_token: |
| 172 | + sdk_options["NextToken"] = next_token |
| 173 | + elif "NextToken" in sdk_options: |
| 174 | + # Remove NextToken from first call if it was passed |
| 175 | + sdk_options.pop("NextToken") # pragma: no cover |
| 176 | + |
| 177 | + try: |
| 178 | + response = self.client.batch_get_secret_value(**sdk_options) |
| 179 | + except Exception as exc: |
| 180 | + raise GetSecretError(f"Failed to retrieve secrets: {str(exc)}") from exc |
| 181 | + |
| 182 | + # Process successful secrets |
| 183 | + for secret in response.get("SecretValues", []): |
| 184 | + secret_name = secret["Name"] |
| 185 | + |
| 186 | + # Extract secret value (SecretString or SecretBinary) |
| 187 | + if "SecretString" in secret: |
| 188 | + secrets[secret_name] = secret["SecretString"] |
| 189 | + elif "SecretBinary" in secret: |
| 190 | + secrets[secret_name] = secret["SecretBinary"] |
| 191 | + |
| 192 | + # Check if there are more results |
| 193 | + next_token = response.get("NextToken") |
| 194 | + if not next_token: |
| 195 | + break |
| 196 | + |
| 197 | + # If no secrets were found, raise an error |
| 198 | + if not secrets: |
| 199 | + raise GetSecretError(f"No secrets found matching the provided names: {names}") |
| 200 | + |
| 201 | + return secrets |
| 202 | + |
| 203 | + def get_multiple( # type: ignore[override] |
| 204 | + self, |
| 205 | + names: list[str], |
| 206 | + max_age: int | None = None, |
| 207 | + transform: TransformOptions = None, |
| 208 | + raise_on_transform_error: bool = False, |
| 209 | + force_fetch: bool = False, |
| 210 | + **sdk_options, |
| 211 | + ) -> dict[str, Any]: |
| 212 | + """ |
| 213 | + Retrieve multiple secrets by name from AWS Secrets Manager |
| 214 | +
|
| 215 | + Parameters |
| 216 | + ---------- |
| 217 | + names: list[str] |
| 218 | + List of secret names to retrieve |
| 219 | + max_age: int, optional |
| 220 | + Maximum age of the cached value |
| 221 | + transform: str, optional |
| 222 | + Optional transformation of the parameter value. Supported values |
| 223 | + are "json" for JSON strings and "binary" for base 64 encoded values. |
| 224 | + raise_on_transform_error: bool, optional |
| 225 | + Raises an exception if any transform fails, otherwise this will |
| 226 | + return a None value for each transform that failed |
| 227 | + force_fetch: bool, optional |
| 228 | + Force update even before a cached item has expired, defaults to False |
| 229 | + sdk_options: dict, optional |
| 230 | + Arguments that will be passed directly to the underlying API call |
| 231 | +
|
| 232 | + Returns |
| 233 | + ------- |
| 234 | + dict[str, str | bytes | dict] |
| 235 | + Dictionary mapping secret names to their values |
| 236 | +
|
| 237 | + Raises |
| 238 | + ------ |
| 239 | + GetParameterError |
| 240 | + When the parameter provider fails to retrieve secrets |
| 241 | + TransformParameterError |
| 242 | + When the parameter provider fails to transform a secret value |
| 243 | + """ |
| 244 | + if not names: |
| 245 | + raise GetSecretError("You must provide at least one secret name") |
| 246 | + |
| 247 | + # Create a unique cache key for this batch of secrets |
| 248 | + # Use sorted names to ensure consistent caching regardless of order |
| 249 | + cache_key_name = "|".join(sorted(names)) |
| 250 | + key = self._build_cache_key(name=cache_key_name, transform=transform, is_nested=True) |
| 251 | + |
| 252 | + # If max_age is not set, resolve it from the environment variable, defaulting to DEFAULT_MAX_AGE_SECS |
| 253 | + max_age = resolve_max_age(env=os.getenv(constants.PARAMETERS_MAX_AGE_ENV, DEFAULT_MAX_AGE_SECS), choice=max_age) |
| 254 | + |
| 255 | + if not force_fetch and self.has_not_expired_in_cache(key): |
| 256 | + cached_values = self.fetch_from_cache(key) |
| 257 | + # Return only the requested secrets from cache (in case cache has more) |
| 258 | + return {name: cached_values[name] for name in names if name in cached_values} |
| 259 | + |
| 260 | + try: |
| 261 | + values = self._get_multiple(names, **sdk_options) |
| 262 | + except Exception as exc: |
| 263 | + raise GetSecretError(str(exc)) from exc |
| 264 | + |
| 265 | + if transform: |
| 266 | + # Transform each secret value |
| 267 | + transformed_values = {} |
| 268 | + for name, value in values.items(): |
| 269 | + try: |
| 270 | + transformed_values[name] = transform_value( |
| 271 | + key=name, |
| 272 | + value=value, |
| 273 | + transform=transform, |
| 274 | + raise_on_transform_error=raise_on_transform_error, |
| 275 | + ) |
| 276 | + except TransformParameterError: |
| 277 | + if raise_on_transform_error: |
| 278 | + raise |
| 279 | + transformed_values[name] = None # pragma: no cover |
| 280 | + values = transformed_values |
| 281 | + |
| 282 | + # Cache the results |
| 283 | + self.add_to_cache(key=key, value=values, max_age=max_age) |
| 284 | + |
| 285 | + return values |
134 | 286 |
|
135 | 287 | def _create_secret(self, name: str, **sdk_options) -> CreateSecretResponseTypeDef:
|
136 | 288 | """
|
@@ -369,6 +521,85 @@ def get_secret(
|
369 | 521 | )
|
370 | 522 |
|
371 | 523 |
|
| 524 | +def get_secrets_by_name( |
| 525 | + names: list[str], |
| 526 | + transform: TransformOptions = None, |
| 527 | + force_fetch: bool = False, |
| 528 | + max_age: int | None = None, |
| 529 | + **sdk_options, |
| 530 | +) -> dict[str, str | bytes | dict]: |
| 531 | + """ |
| 532 | + Retrieve multiple secrets by name from AWS Secrets Manager |
| 533 | +
|
| 534 | + Parameters |
| 535 | + ---------- |
| 536 | + names: list[str] |
| 537 | + List of secret names to retrieve |
| 538 | + transform: str, optional |
| 539 | + Transforms the content from a JSON object ('json') or base64 binary string ('binary') |
| 540 | + force_fetch: bool, optional |
| 541 | + Force update even before a cached item has expired, defaults to False |
| 542 | + max_age: int, optional |
| 543 | + Maximum age of the cached value |
| 544 | + sdk_options: dict, optional |
| 545 | + Dictionary of options that will be passed to the batch_get_secret_value call |
| 546 | +
|
| 547 | + Raises |
| 548 | + ------ |
| 549 | + GetParameterError |
| 550 | + When the parameter provider fails to retrieve secrets |
| 551 | + TransformParameterError |
| 552 | + When the parameter provider fails to transform a secret value |
| 553 | +
|
| 554 | + Returns |
| 555 | + ------- |
| 556 | + dict[str, str | bytes | dict] |
| 557 | + Dictionary mapping secret names to their values |
| 558 | +
|
| 559 | + Example |
| 560 | + ------- |
| 561 | + **Retrieves multiple secrets** |
| 562 | +
|
| 563 | + >>> from aws_lambda_powertools.utilities.parameters import get_secrets_by_name |
| 564 | + >>> |
| 565 | + >>> secrets = get_secrets_by_name(["db-password", "api-key", "jwt-secret"]) |
| 566 | + >>> print(secrets["db-password"]) |
| 567 | +
|
| 568 | + **Retrieves multiple secrets with JSON transformation** |
| 569 | +
|
| 570 | + >>> from aws_lambda_powertools.utilities.parameters import get_secrets_by_name |
| 571 | + >>> |
| 572 | + >>> secrets = get_secrets_by_name(["config", "settings"], transform="json") |
| 573 | + >>> print(secrets["config"]["database_url"]) |
| 574 | +
|
| 575 | + **Retrieves multiple secrets with additional filters** |
| 576 | +
|
| 577 | + >>> from aws_lambda_powertools.utilities.parameters import get_secrets_by_name |
| 578 | + >>> |
| 579 | + >>> secrets = get_secrets_by_name( |
| 580 | + ... names=["app-secret"], |
| 581 | + ... Filters=[{"Key": "primary-region", "Values": ["us-east-1"]}] |
| 582 | + ... ) |
| 583 | + """ |
| 584 | + if not names: |
| 585 | + raise GetSecretError("You must provide at least one secret name") |
| 586 | + |
| 587 | + # If max_age is not set, resolve it from the environment variable, defaulting to DEFAULT_MAX_AGE_SECS |
| 588 | + max_age = resolve_max_age(env=os.getenv(constants.PARAMETERS_MAX_AGE_ENV, DEFAULT_MAX_AGE_SECS), choice=max_age) |
| 589 | + |
| 590 | + # Only create the provider if this function is called at least once |
| 591 | + if "secrets" not in DEFAULT_PROVIDERS: |
| 592 | + DEFAULT_PROVIDERS["secrets"] = SecretsProvider() |
| 593 | + |
| 594 | + return DEFAULT_PROVIDERS["secrets"].get_multiple( |
| 595 | + names=names, |
| 596 | + max_age=max_age, |
| 597 | + transform=transform, |
| 598 | + force_fetch=force_fetch, |
| 599 | + **sdk_options, |
| 600 | + ) |
| 601 | + |
| 602 | + |
372 | 603 | def set_secret(
|
373 | 604 | name: str,
|
374 | 605 | value: str | bytes,
|
|
0 commit comments