Skip to content

Commit

Permalink
SecretsManager: rotate_secret() should create AWSPending without a va…
Browse files Browse the repository at this point in the history
…lue (#8504)
  • Loading branch information
bblommers authored Jan 19, 2025
1 parent 209e4e1 commit 62dd9bc
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 90 deletions.
17 changes: 5 additions & 12 deletions moto/secretsmanager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ def _add_secret(
version_stages: Optional[List[str]] = None,
replica_regions: Optional[List[Dict[str, str]]] = None,
force_overwrite: bool = False,
create_new_version: bool = False,
) -> Tuple[FakeSecret, bool]:
if version_stages is None:
version_stages = ["AWSCURRENT"]
Expand Down Expand Up @@ -633,7 +634,7 @@ def _add_secret(

secret.update(description, tags, kms_key_id, last_changed_date=update_time)

if new_version:
if new_version or create_new_version:
if "AWSCURRENT" in version_stages:
secret.reset_default_version(secret_version, version_id)
else:
Expand Down Expand Up @@ -760,29 +761,21 @@ def rotate_secret(
pass

if secret.versions:
old_secret_version = secret.versions[secret.default_version_id] # type: ignore

if client_request_token:
self._client_request_token_validator(client_request_token)
new_version_id = client_request_token
else:
new_version_id = str(mock_random.uuid4())

# We add the new secret version as "pending". The previous version remains
# as "current" for now. Once we've passed the new secret through the lambda
# rotation function (if provided) we can then update the status to "current".
old_secret_version_secret_string = (
old_secret_version["secret_string"]
if "secret_string" in old_secret_version
else None
)
# We add a "pending" stage. The previous version remains as "current" for now.
# Caller is responsible for creating the new secret in the Lambda
self._add_secret(
secret_id,
old_secret_version_secret_string,
description=secret.description,
tags=secret.tags,
version_id=new_version_id,
version_stages=["AWSPENDING"],
create_new_version=True,
)

secret.rotation_requested = True
Expand Down
7 changes: 7 additions & 0 deletions tests/test_awslambda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ def create_role_and_test(kwargs, role_name):
kwargs["iam_role_arn"] = iam_role_arn
resp = func(**kwargs)
finally:
for policy in iam.list_attached_role_policies(RoleName=role_name)[
"AttachedPolicies"
]:
iam.detach_role_policy(
RoleName=role_name,
PolicyArn=policy["PolicyArn"],
)
iam.delete_role(RoleName=role_name)

return resp
Expand Down
17 changes: 9 additions & 8 deletions tests/test_secretsmanager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,28 @@ def secretsmanager_aws_verified(func):
"""

@wraps(func)
def pagination_wrapper():
def pagination_wrapper(**kwargs):
allow_aws_request = (
os.environ.get("MOTO_TEST_ALLOW_AWS_REQUEST", "false").lower() == "true"
)

if allow_aws_request:
return create_secret_and_execute(func)
return create_secret_and_execute(kwargs, func)
else:
with mock_aws():
return create_secret_and_execute(func)
return create_secret_and_execute(kwargs, func)

def create_secret_and_execute(func):
def create_secret_and_execute(kwargs, func):
sm_client = boto3.client("secretsmanager", "us-east-1")

secret_arn = sm_client.create_secret(
secret = sm_client.create_secret(
Name=f"moto_secret_{str(uuid4())[0:6]}",
SecretString="old_secret",
)["ARN"]
)
try:
return func(secret_arn)
kwargs["secret"] = secret
return func(**kwargs)
finally:
sm_client.delete_secret(SecretId=secret_arn)
sm_client.delete_secret(SecretId=secret["ARN"])

return pagination_wrapper
Loading

0 comments on commit 62dd9bc

Please sign in to comment.