Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions s3tests/functional/test_iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
get_alt_iam_client,
get_alt_user_id,
get_sts_client,
get_sts_user_id,
)
from .utils import _get_status, _get_status_and_error_code
from .iam import iam_root, iam_alt_root
Expand Down Expand Up @@ -3004,3 +3005,168 @@ def test_get_account_summary_policy(iam_root):
assert 'UsersQuota' in summary
assert 'GroupsQuota' in summary
assert 'AccessKeysPerUserQuota' in summary

@pytest.mark.iam_account
def test_bucket_acl_set_owner_with_canonical_and_account_id(iam_root):
"""Verify put_bucket_acl accepts AccessControlPolicy.Owner.ID specified as either
the canonical (legacy) user id or the numeric account id.
"""
s3 = get_iam_root_client(service_name='s3')
bucket = get_new_bucket(s3)

# obtain both forms
canonical_id = 'testacct1root' # from vstart file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very specific to vstart, so we need to find something that works in teuthology too. that usually means relying on s3tests.conf, but since these tests are so specific to rgw behavior i don't think there's a good way to configure this generally

maybe we're better off writing a script under qa/workunits/rgw/ that actually does user migrations to test the behavior

main_arn = iam_root.get_user()['User']['Arn']
account_id = main_arn.removeprefix('arn:aws:iam::').removesuffix(':root')
assert canonical_id != account_id

# base grants: FULL_CONTROL for owner (using canonical ID)
policy_canonical = {
'Owner': {'ID': canonical_id},
'Grants': [
{
'Grantee': {'Type': 'CanonicalUser', 'ID': canonical_id},
'Permission': 'FULL_CONTROL'
}
]
}
s3.put_bucket_acl(Bucket=bucket, AccessControlPolicy=policy_canonical)
resp = s3.get_bucket_acl(Bucket=bucket)
assert resp['Owner']['ID'] == canonical_id

# now attempt with account id as Owner.ID
policy_account = {
'Owner': {'ID': account_id}, # variant form
'Grants': [
{
'Grantee': {'Type': 'CanonicalUser', 'ID': account_id},
'Permission': 'FULL_CONTROL'
}
]
}
s3.put_bucket_acl(Bucket=bucket, AccessControlPolicy=policy_account)
resp2 = s3.get_bucket_acl(Bucket=bucket)
# Stored owner id should remain canonical id
assert resp2['Owner']['ID'] == account_id


@pytest.mark.iam_account
def test_object_acl_set_owner_with_canonical_and_account_id(iam_root):
"""Verify put_object_acl accepts AccessControlPolicy.Owner.ID as canonical user id
and numeric account id.
"""
s3 = get_iam_root_client(service_name='s3')
bucket = get_new_bucket(s3)
canonical_id = 'testacct1root'
main_arn = iam_root.get_user()['User']['Arn']
account_id = main_arn.removeprefix('arn:aws:iam::').removesuffix(':root')

s3.put_object(Bucket=bucket, Key='obj', Body='data')

# set ACL with canonical owner id
acl_canonical = {
'Owner': {'ID': canonical_id},
'Grants': [
{
'Grantee': {'Type': 'CanonicalUser', 'ID': account_id},
'Permission': 'FULL_CONTROL'
}
]
}
s3.put_object_acl(Bucket=bucket, Key='obj', AccessControlPolicy=acl_canonical)
r1 = s3.get_object_acl(Bucket=bucket, Key='obj')
assert r1['Owner']['ID'] == canonical_id

# set ACL with account id as owner id
acl_account = {
'Owner': {'ID': account_id},
'Grants': [
{
'Grantee': {'Type': 'CanonicalUser', 'ID': account_id},
'Permission': 'FULL_CONTROL'
}
]
}
s3.put_object_acl(Bucket=bucket, Key='obj', AccessControlPolicy=acl_account)
r2 = s3.get_object_acl(Bucket=bucket, Key='obj')
assert r2['Owner']['ID'] == account_id

@pytest.mark.iam_account
@pytest.mark.iam_cross_account
@pytest.mark.iam_role
@pytest.mark.role_policy
def test_cross_account_role_assume_cannot_change_acl_owner(iam_root, iam_alt_root):
"""Verify that when a user from alt account assumes a role in the main account,
they cannot modify the ACL owner.
The put_bucket_acl should fail with AccessDenied or similar error.
"""
path = get_iam_path_prefix()
user_name = make_iam_name('AltUser')
role_name = make_iam_name('MyRole')
session_name = 'MySession'
s3_main = get_iam_root_client(service_name='s3')
bucket_name = get_new_bucket(s3_main)

user = iam_alt_root.create_user(UserName=user_name, Path=path)['User']
user_arn = user['Arn']
key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey']

trust_policy = json.dumps({
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Action': 'sts:AssumeRole',
'Principal': {'AWS': user_arn}
}]
})
# returns MalformedPolicyDocument until the user arn starts working
role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role,
RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role']
role_arn = role['Arn']

sts = get_sts_client(aws_access_key_id=key['AccessKeyId'],
aws_secret_access_key=key['SecretAccessKey'])

# returns InvalidClientTokenId or AccessDenied until the access key starts working
response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role,
RoleArn=role_arn, RoleSessionName=session_name)
creds = response['Credentials']

s3_assumed = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'],
aws_secret_access_key = creds['SecretAccessKey'],
aws_session_token = creds['SessionToken'])

policy_name = 'AllowS3AclOperations'
policy = json.dumps({
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Action': ['s3:PutBucketAcl', 's3:GetBucketAcl'],
'Resource': '*'
}]
})
iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy)

alt_canonical_id = get_iam_alt_root_user_id()
modified_acl = {
'Owner': {'ID': get_sts_user_id()},
'Grants': [
{
'Grantee': {'Type': 'CanonicalUser', 'ID': alt_canonical_id},
'Permission': 'FULL_CONTROL'
}
]
}

# Attempting to change owner should fail with AccessDenied
e = assert_raises(ClientError, s3_assumed.put_bucket_acl,
Bucket=bucket_name, AccessControlPolicy=modified_acl)
status, error_code = _get_status_and_error_code(e.response)
assert status == 403
assert error_code == 'AccessDenied'

# Attempting to original account id owner should not fail with AccessDenied
main_arn = iam_root.get_user()['User']['Arn']
account_id = main_arn.removeprefix('arn:aws:iam::').removesuffix(':root')
modified_acl['Owner']['ID'] = account_id # try with actual account id form
retry_on('AccessDenied', 10, s3_assumed.put_bucket_acl, Bucket=bucket_name, AccessControlPolicy=modified_acl)