From 62dd9bc5c71fb805571f2c34e7fcd6725700585b Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sun, 19 Jan 2025 19:19:26 -0100 Subject: [PATCH] SecretsManager: rotate_secret() should create AWSPending without a value (#8504) --- moto/secretsmanager/models.py | 17 +- tests/test_awslambda/__init__.py | 7 + tests/test_secretsmanager/__init__.py | 17 +- .../test_secretsmanager.py | 209 ++++++++++++------ 4 files changed, 160 insertions(+), 90 deletions(-) diff --git a/moto/secretsmanager/models.py b/moto/secretsmanager/models.py index a0c7acad0d90..4f83f7c8de94 100644 --- a/moto/secretsmanager/models.py +++ b/moto/secretsmanager/models.py @@ -606,6 +606,7 @@ def _add_secret( version_stages: Optional[List[str]] = None, replica_regions: Optional[List[Dict[str, str]]] = None, force_overwrite: bool = False, + create_new_version: bool = False, ) -> Tuple[FakeSecret, bool]: if version_stages is None: version_stages = ["AWSCURRENT"] @@ -633,7 +634,7 @@ def _add_secret( secret.update(description, tags, kms_key_id, last_changed_date=update_time) - if new_version: + if new_version or create_new_version: if "AWSCURRENT" in version_stages: secret.reset_default_version(secret_version, version_id) else: @@ -760,29 +761,21 @@ def rotate_secret( pass if secret.versions: - old_secret_version = secret.versions[secret.default_version_id] # type: ignore - if client_request_token: self._client_request_token_validator(client_request_token) new_version_id = client_request_token else: new_version_id = str(mock_random.uuid4()) - # We add the new secret version as "pending". The previous version remains - # as "current" for now. Once we've passed the new secret through the lambda - # rotation function (if provided) we can then update the status to "current". - old_secret_version_secret_string = ( - old_secret_version["secret_string"] - if "secret_string" in old_secret_version - else None - ) + # We add a "pending" stage. The previous version remains as "current" for now. + # Caller is responsible for creating the new secret in the Lambda self._add_secret( secret_id, - old_secret_version_secret_string, description=secret.description, tags=secret.tags, version_id=new_version_id, version_stages=["AWSPENDING"], + create_new_version=True, ) secret.rotation_requested = True diff --git a/tests/test_awslambda/__init__.py b/tests/test_awslambda/__init__.py index 5dfd89f8a46c..015f8d34c733 100644 --- a/tests/test_awslambda/__init__.py +++ b/tests/test_awslambda/__init__.py @@ -66,6 +66,13 @@ def create_role_and_test(kwargs, role_name): kwargs["iam_role_arn"] = iam_role_arn resp = func(**kwargs) finally: + for policy in iam.list_attached_role_policies(RoleName=role_name)[ + "AttachedPolicies" + ]: + iam.detach_role_policy( + RoleName=role_name, + PolicyArn=policy["PolicyArn"], + ) iam.delete_role(RoleName=role_name) return resp diff --git a/tests/test_secretsmanager/__init__.py b/tests/test_secretsmanager/__init__.py index cc6729aa9d5e..fd749b0fbbc6 100644 --- a/tests/test_secretsmanager/__init__.py +++ b/tests/test_secretsmanager/__init__.py @@ -17,27 +17,28 @@ def secretsmanager_aws_verified(func): """ @wraps(func) - def pagination_wrapper(): + def pagination_wrapper(**kwargs): allow_aws_request = ( os.environ.get("MOTO_TEST_ALLOW_AWS_REQUEST", "false").lower() == "true" ) if allow_aws_request: - return create_secret_and_execute(func) + return create_secret_and_execute(kwargs, func) else: with mock_aws(): - return create_secret_and_execute(func) + return create_secret_and_execute(kwargs, func) - def create_secret_and_execute(func): + def create_secret_and_execute(kwargs, func): sm_client = boto3.client("secretsmanager", "us-east-1") - secret_arn = sm_client.create_secret( + secret = sm_client.create_secret( Name=f"moto_secret_{str(uuid4())[0:6]}", SecretString="old_secret", - )["ARN"] + ) try: - return func(secret_arn) + kwargs["secret"] = secret + return func(**kwargs) finally: - sm_client.delete_secret(SecretId=secret_arn) + sm_client.delete_secret(SecretId=secret["ARN"]) return pagination_wrapper diff --git a/tests/test_secretsmanager/test_secretsmanager.py b/tests/test_secretsmanager/test_secretsmanager.py index fea06fa0a322..6a853d964c01 100644 --- a/tests/test_secretsmanager/test_secretsmanager.py +++ b/tests/test_secretsmanager/test_secretsmanager.py @@ -15,6 +15,10 @@ from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from moto.secretsmanager.utils import SecretsManagerSecretIdentifier from moto.utilities.id_generator import TAG_KEY_CUSTOM_ID +from tests import allow_aws_request +from tests.test_awslambda import lambda_aws_verified +from tests.test_awslambda.utilities import _process_lambda +from tests.test_dynamodb import dynamodb_aws_verified from .. import DEFAULT_ACCOUNT_ID from . import secretsmanager_aws_verified @@ -1110,20 +1114,47 @@ def test_rotate_secret_rotation_period_too_long(): def get_rotation_zip_file(): - from tests.test_awslambda.utilities import _process_lambda + endpoint = "" if allow_aws_request() else 'endpoint_url="http://motoserver:5000"' - func_str = """ + func_str = ( + """ import boto3 import json +import os def lambda_handler(event, context): arn = event['SecretId'] token = event['ClientRequestToken'] step = event['Step'] - client = boto3.client("secretsmanager", region_name="us-west-2", endpoint_url="http://motoserver:5000") + client = boto3.client("secretsmanager", region_name="us-east-1", """ + + endpoint + + """) metadata = client.describe_secret(SecretId=arn) - value = client.get_secret_value(SecretId=arn, VersionId=token, VersionStage="AWSPENDING") + metadata.pop('LastChangedDate', None) + metadata.pop('LastAccessedDate', None) + metadata.pop('NextRotationDate', None) + metadata.pop('CreatedDate') + metadata.pop('ResponseMetadata') + print(metadata) + versions = client.list_secret_version_ids(SecretId=arn, IncludeDeprecated=True)["Versions"] + for v in versions: + v.pop('LastAccessedDate', None) + v.pop('CreatedDate', None) + print(versions) + try: + pending_value = client.get_secret_value(SecretId=arn, VersionId=token, VersionStage='AWSPENDING') + pending_value.pop('CreatedDate', None) + pending_value.pop('ResponseMetadata') + except Exception as e: + pending_value = str(e) + print(pending_value) + + dynamodb = boto3.resource("dynamodb", region_name="us-east-1", """ + + endpoint + + """) + table = dynamodb.Table(os.environ["table_name"]) + table.put_item(Item={"pk": step, "token": token, "metadata": metadata, "versions": versions, "pending_value": pending_value}) if not metadata['RotationEnabled']: print("Secret %s is not enabled for rotation." % arn) @@ -1140,24 +1171,19 @@ def lambda_handler(event, context): raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn)) if step == 'createSecret': - try: - client.get_secret_value(SecretId=arn, VersionId=token, VersionStage='AWSPENDING') - except client.exceptions.ResourceNotFoundException: - client.put_secret_value( - SecretId=arn, - ClientRequestToken=token, - SecretString=json.dumps({'create': True}), - VersionStages=['AWSPENDING'] - ) - - if step == 'setSecret': client.put_secret_value( SecretId=arn, ClientRequestToken=token, SecretString='UpdatedValue', - VersionStages=["AWSPENDING"] + VersionStages=['AWSPENDING'] ) + if step == 'setSecret': + # This method should set the AWSPENDING secret in the service that the secret belongs to. + # For example, if the secret is a database credential, + # this method should take the value of the AWSPENDING secret and set the user's password to this value in the database. + pass + elif step == 'finishSecret': current_version = next( version @@ -1177,60 +1203,103 @@ def lambda_handler(event, context): RemoveFromVersionId=token ) """ + ) return _process_lambda(func_str) -if settings.TEST_SERVER_MODE: - - @mock_aws - def test_rotate_secret_using_lambda(): - from tests.test_awslambda.utilities import get_role_name - - # Passing a `RotationLambdaARN` value to `rotate_secret` should invoke lambda - lambda_conn = boto3.client( - "lambda", region_name="us-west-2", endpoint_url="http://localhost:5000" +@pytest.mark.aws_verified +@dynamodb_aws_verified() +@lambda_aws_verified +@secretsmanager_aws_verified +def test_rotate_secret_using_lambda(secret=None, iam_role_arn=None, table_name=None): + role_name = iam_role_arn.split("/")[-1] + if not allow_aws_request() and not settings.TEST_SERVER_MODE: + raise SkipTest("Can only test this in ServerMode") + + iam = boto3.client("iam", "us-east-1") + if allow_aws_request(): + iam.attach_role_policy( + PolicyArn="arn:aws:iam::aws:policy/SecretsManagerReadWrite", + RoleName=role_name, ) - func = lambda_conn.create_function( - FunctionName="testFunction", - Runtime="python3.11", - Role=get_role_name(), - Handler="lambda_function.lambda_handler", - Code={"ZipFile": get_rotation_zip_file()}, - Description="Secret rotator", - Timeout=3, - MemorySize=128, - Publish=True, + # Testing this against AWS itself is a bit of pain + # Uncomment this to get more insights into what is happening during execution of the Lambda + # iam.attach_role_policy( + # PolicyArn="arn:aws:iam::aws:policy/CloudWatchLogsFullAccess", + # RoleName=role_name, + # ) + iam.attach_role_policy( + PolicyArn="arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess", + RoleName=role_name, ) - secrets_conn = boto3.client( - "secretsmanager", - region_name="us-west-2", - endpoint_url="http://localhost:5000", - ) - secret = secrets_conn.create_secret( - Name=DEFAULT_SECRET_NAME, SecretString="InitialValue" - ) - initial_version = secret["VersionId"] + function_name = "moto_test_" + str(uuid4())[0:6] - rotated_secret = secrets_conn.rotate_secret( - SecretId=DEFAULT_SECRET_NAME, - RotationLambdaARN=func["FunctionArn"], - RotationRules={"AutomaticallyAfterDays": 30}, - ) + # Passing a `RotationLambdaARN` value to `rotate_secret` should invoke lambda + lambda_conn = boto3.client("lambda", region_name="us-east-1") + func = lambda_conn.create_function( + FunctionName=function_name, + Runtime="python3.11", + Role=iam_role_arn, + Handler="lambda_function.lambda_handler", + Code={"ZipFile": get_rotation_zip_file()}, + Publish=True, + Environment={"Variables": {"table_name": table_name}}, + ) + lambda_conn.add_permission( + FunctionName=function_name, + StatementId="allow_secrets_manager", + Action="lambda:InvokeFunction", + Principal="secretsmanager.amazonaws.com", + ) + lambda_conn.get_waiter("function_active_v2").wait(FunctionName=function_name) + + secrets_conn = boto3.client("secretsmanager", region_name="us-east-1") + + initial_version = secret["VersionId"] + + rotated_secret = secrets_conn.rotate_secret( + SecretId=secret["ARN"], + RotationLambdaARN=func["FunctionArn"], + RotationRules={"AutomaticallyAfterDays": 30}, + RotateImmediately=True, + ) - # Ensure we received an updated VersionId from `rotate_secret` - assert rotated_secret["VersionId"] != initial_version + # Ensure we received an updated VersionId from `rotate_secret` + assert rotated_secret["VersionId"] != initial_version + secret_not_updated = True + while secret_not_updated: updated_secret = secrets_conn.get_secret_value( - SecretId=DEFAULT_SECRET_NAME, VersionStage="AWSCURRENT" + SecretId=secret["ARN"], VersionStage="AWSCURRENT" ) - rotated_version = updated_secret["VersionId"] + if updated_secret["SecretString"] == "UpdatedValue": + secret_not_updated = False + else: + from time import sleep + + sleep(5) + rotated_version = updated_secret["VersionId"] + + assert initial_version != rotated_version + + metadata = secrets_conn.describe_secret(SecretId=secret["ARN"]) + assert metadata["VersionIdsToStages"][initial_version] == ["AWSPREVIOUS"] + assert metadata["VersionIdsToStages"][rotated_version] == ["AWSCURRENT"] + assert updated_secret["SecretString"] == "UpdatedValue" + + lambda_conn.delete_function(FunctionName=function_name) + + dynamodb = boto3.resource("dynamodb", region_name="us-east-1") + items = dynamodb.Table(table_name).scan()["Items"] + + create_secret = [i for i in items if i["pk"] == "createSecret"][0] + assert "(ResourceNotFoundException)" in create_secret["pending_value"] + assert create_secret["versions"][0]["VersionStages"] == ["AWSCURRENT"] - assert initial_version != rotated_version - metadata = secrets_conn.describe_secret(SecretId=DEFAULT_SECRET_NAME) - assert metadata["VersionIdsToStages"][initial_version] == ["AWSPREVIOUS"] - assert metadata["VersionIdsToStages"][rotated_version] == ["AWSCURRENT"] - assert updated_secret["SecretString"] == "UpdatedValue" + finish_secret = [i for i in items if i["pk"] == "finishSecret"][0] + assert finish_secret["pending_value"]["SecretString"] == "UpdatedValue" + assert finish_secret["pending_value"]["VersionStages"] == ["AWSPENDING"] @mock_aws @@ -1887,49 +1956,49 @@ def test_update_secret_with_client_request_token(): @secretsmanager_aws_verified @pytest.mark.aws_verified -def test_update_secret_version_stage_manually(secret_arn=None): +def test_update_secret_version_stage_manually(secret=None): sm_client = boto3.client("secretsmanager", "us-east-1") current_version = sm_client.put_secret_value( - SecretId=secret_arn, + SecretId=secret["ARN"], SecretString="previous_secret", VersionStages=["AWSCURRENT"], )["VersionId"] initial_secret = sm_client.get_secret_value( - SecretId=secret_arn, VersionStage="AWSCURRENT" + SecretId=secret["ARN"], VersionStage="AWSCURRENT" ) assert initial_secret["VersionStages"] == ["AWSCURRENT"] assert initial_secret["SecretString"] == "previous_secret" token = str(uuid4()) sm_client.put_secret_value( - SecretId=secret_arn, + SecretId=secret["ARN"], ClientRequestToken=token, SecretString="new_secret", VersionStages=["AWSPENDING"], ) pending_secret = sm_client.get_secret_value( - SecretId=secret_arn, VersionStage="AWSPENDING" + SecretId=secret["ARN"], VersionStage="AWSPENDING" ) assert pending_secret["VersionStages"] == ["AWSPENDING"] assert pending_secret["SecretString"] == "new_secret" sm_client.update_secret_version_stage( - SecretId=secret_arn, + SecretId=secret["ARN"], VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=current_version, ) current_secret = sm_client.get_secret_value( - SecretId=secret_arn, VersionStage="AWSCURRENT" + SecretId=secret["ARN"], VersionStage="AWSCURRENT" ) assert list(sorted(current_secret["VersionStages"])) == ["AWSCURRENT", "AWSPENDING"] assert current_secret["SecretString"] == "new_secret" previous_secret = sm_client.get_secret_value( - SecretId=secret_arn, VersionStage="AWSPREVIOUS" + SecretId=secret["ARN"], VersionStage="AWSPREVIOUS" ) assert previous_secret["VersionStages"] == ["AWSPREVIOUS"] assert previous_secret["SecretString"] == "previous_secret" @@ -1937,17 +2006,17 @@ def test_update_secret_version_stage_manually(secret_arn=None): @secretsmanager_aws_verified @pytest.mark.aws_verified -def test_update_secret_version_stage_dont_specify_current_stage(secret_arn=None): +def test_update_secret_version_stage_dont_specify_current_stage(secret=None): sm_client = boto3.client("secretsmanager", "us-east-1") current_version = sm_client.put_secret_value( - SecretId=secret_arn, + SecretId=secret["ARN"], SecretString="previous_secret", VersionStages=["AWSCURRENT"], )["VersionId"] token = str(uuid4()) sm_client.put_secret_value( - SecretId=secret_arn, + SecretId=secret["ARN"], ClientRequestToken=token, SecretString="new_secret", VersionStages=["AWSPENDING"], @@ -1956,7 +2025,7 @@ def test_update_secret_version_stage_dont_specify_current_stage(secret_arn=None) # Without specifying version that currently has stage AWSCURRENT with pytest.raises(ClientError) as exc: sm_client.update_secret_version_stage( - SecretId=secret_arn, VersionStage="AWSCURRENT", MoveToVersionId=token + SecretId=secret["ARN"], VersionStage="AWSCURRENT", MoveToVersionId=token ) err = exc.value.response["Error"] assert err["Code"] == "InvalidParameterException"