Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix copy_snapshot handler to work with clients #498

Merged
merged 1 commit into from
Mar 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion botocore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ def _convert_to_request_dict(self, api_params, operation_model):
endpoint_prefix=self._service_model.endpoint_prefix,
operation_name=operation_name),
model=operation_model, params=request_dict,
request_signer=self._request_signer
request_signer=self._request_signer,
endpoint=self._endpoint
)
return request_dict

Expand Down
31 changes: 22 additions & 9 deletions botocore/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def quote_source_header(params, **kwargs):
value.encode('utf-8'), '/~')


def copy_snapshot_encrypted(operation, params, request_signer, **kwargs):
def copy_snapshot_encrypted(params, request_signer, endpoint, **kwargs):
# The presigned URL that facilities copying an encrypted snapshot.
# If the user does not provide this value, we will automatically
# calculate on behalf of the user and inject the PresignedUrl
Expand All @@ -307,18 +307,31 @@ def copy_snapshot_encrypted(operation, params, request_signer, **kwargs):
# If the customer provided this value, then there's nothing for
# us to do.
return
params['DestinationRegion'] = request_signer._region_name
destination_region = request_signer._region_name
params['DestinationRegion'] = destination_region
# The request will be sent to the destination region, so we need
# to create an endpoint to the source region and create a presigned
# url based on the source endpoint.
region = params['SourceRegion']
source_endpoint = operation.service.get_endpoint(region)
source_region = params['SourceRegion']

presigner = request_signer.get_auth(
'ec2', region, signature_version='v4-query')
request = source_endpoint.create_request(request_dict)
presigner.add_auth(request=request.original)
request = request.original.prepare()
params['PresignedUrl'] = request.url
'ec2', source_region, signature_version='v4-query')

request = endpoint.create_request(request_dict)
original = request.original
# The better way to do this is to actually get the
# endpoint_resolver and get the endpoint_url given the
# source region. In this specific case, we know that
# we can safely replace the dest region with the source
# region because of the supported EC2 regions, but in
# general this is not a safe assumption to make.
# I think eventually we should try to plumb through something
# that allows us to resolve endpoints from regions.
original.url = original.url.replace(destination_region, source_region)

presigner.add_auth(request=original)
request = original.prepare()
params['PresignedUrl'] = original.url


def json_decode_policies(parsed, model, **kwargs):
Expand Down
56 changes: 25 additions & 31 deletions tests/integration/test_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,39 @@
class TestEC2(unittest.TestCase):
def setUp(self):
self.session = botocore.session.get_session()
self.client = self.session.create_client(
'ec2', region_name='us-west-2')

def test_can_make_request(self):
# Basic smoke test to ensure we can talk to ec2.
service = self.session.get_service('ec2')
endpoint = service.get_endpoint('us-west-2')
operation = service.get_operation('DescribeAvailabilityZones')
http, result = operation.call(endpoint)
result = self.client.describe_availability_zones()
zones = list(sorted(a['ZoneName'] for a in result['AvailabilityZones']))
self.assertEqual(zones, ['us-west-2a', 'us-west-2b', 'us-west-2c'])


class TestEC2Pagination(unittest.TestCase):
def setUp(self):
self.session = botocore.session.get_session()
self.service = self.session.get_service('ec2')
self.endpoint = self.service.get_endpoint('us-west-2')
self.client = self.session.create_client(
'ec2', region_name='us-west-2')

def test_can_paginate(self):
# Using an operation that we know will paginate.
operation = self.service.get_operation('DescribeReservedInstancesOfferings')
generator = operation.paginate(self.endpoint)
results = list(itertools.islice(generator, 0, 3))
paginator = self.client.get_paginator(
'describe_reserved_instances_offerings')
pages = paginator.paginate()
results = list(itertools.islice(pages, 0, 3))
self.assertEqual(len(results), 3)
self.assertTrue(results[0][1]['NextToken'] != results[1][1]['NextToken'])
self.assertTrue(results[0]['NextToken'] != results[1]['NextToken'])

def test_can_paginate_with_page_size(self):
# Using an operation that we know will paginate.
operation = self.service.get_operation('DescribeReservedInstancesOfferings')
generator = operation.paginate(self.endpoint, page_size=1)
results = list(itertools.islice(generator, 0, 3))
paginator = self.client.get_paginator(
'describe_reserved_instances_offerings')
pages = paginator.paginate(page_size=1)
results = list(itertools.islice(pages, 0, 3))
self.assertEqual(len(results), 3)
for result in results:
parsed = result[1]
for parsed in results:
reserved_inst_offer = parsed['ReservedInstancesOfferings']
# There should only be one reserved instance offering on each
# page.
Expand All @@ -64,17 +64,11 @@ def test_can_paginate_with_page_size(self):
class TestCopySnapshotCustomization(unittest.TestCase):
def setUp(self):
self.session = botocore.session.get_session()
# Note, the EBS copy snapshot customization is not ported
# over to the client interface so we have to use the
# Service/Operation objects for the actual CopySnapshot
# operation being tested.
self.service = self.session.get_service('ec2')
self.copy_snapshot = self.service.get_operation('CopySnapshot')
# However, all the test fixture setup/cleanup can use
# the client interface.
self.client = self.session.create_client('ec2', 'us-west-2')
self.us_east_1 = self.service.get_endpoint('us-east-1')
self.us_west_2 = self.service.get_endpoint('us-west-2')
self.client_us_east_1 = self.session.create_client(
'ec2', 'us-east-1')

def create_volume(self, encrypted=False):
available_zones = self.client.describe_availability_zones()
Expand Down Expand Up @@ -105,25 +99,25 @@ def test_can_copy_snapshot(self):
volume_id = self.create_volume()
snapshot_id = self.create_snapshot(volume_id)

http, parsed = self.copy_snapshot.call(
self.us_east_1, SourceRegion='us-west-2',
result = self.client_us_east_1.copy_snapshot(
SourceRegion='us-west-2',
SourceSnapshotId=snapshot_id)
self.assertEqual(http.status_code, 200)
self.assertIn('SnapshotId', result)

# Cleanup code. We can wait for the snapshot to be complete
# and then we can delete the snapshot.
self.cleanup_copied_snapshot(parsed['SnapshotId'])
self.cleanup_copied_snapshot(result['SnapshotId'])

def test_can_copy_encrypted_snapshot(self):
# Note that we're creating an encrypted volume here.
volume_id = self.create_volume(encrypted=True)
snapshot_id = self.create_snapshot(volume_id)

http, parsed = self.copy_snapshot.call(
self.us_east_1, SourceRegion='us-west-2',
result = self.client_us_east_1.copy_snapshot(
SourceRegion='us-west-2',
SourceSnapshotId=snapshot_id)
self.assertEqual(http.status_code, 200)
self.cleanup_copied_snapshot(parsed['SnapshotId'])
self.assertIn('SnapshotId', result)
self.cleanup_copied_snapshot(result['SnapshotId'])


if __name__ == '__main__':
Expand Down
70 changes: 47 additions & 23 deletions tests/unit/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,48 +61,72 @@ def test_quote_source_header(self):
self.assertEqual(
params['headers']['x-amz-copy-source'], 'foo%2B%2Bbar.txt')

def test_presigned_url_already_present(self):
params = {'body': {'PresignedUrl': 'https://foo'}}
handlers.copy_snapshot_encrypted(params, None, None)
self.assertEqual(params['body']['PresignedUrl'], 'https://foo')

def test_copy_snapshot_encrypted(self):
operation = mock.Mock()
source_endpoint = mock.Mock()
signed_request = mock.Mock()
signed_request.original.prepare().url = 'SIGNED_REQUEST'
source_endpoint.create_request.return_value = signed_request
operation.service.get_endpoint.return_value = source_endpoint
v4query_auth = mock.Mock()

def add_auth(request):
request.url += '?PRESIGNED_STUFF'

v4query_auth.add_auth = add_auth

request_signer = mock.Mock()
request_signer._region_name = 'us-east-1'
request_signer.get_auth.return_value = v4query_auth

params = {'SourceRegion': 'us-west-2'}
handlers.copy_snapshot_encrypted(operation, {'body': params},
request_signer)
self.assertEqual(params['PresignedUrl'], 'SIGNED_REQUEST')
# We created an endpoint in the source region.
operation.service.get_endpoint.assert_called_with('us-west-2')
endpoint = mock.Mock()
request = AWSRequest()
request.method = 'POST'
request.url = 'https://ec2.us-east-1.amazonaws.com'
request = request.prepare()
endpoint.create_request.return_value = request

handlers.copy_snapshot_encrypted({'body': params},
request_signer,
endpoint)
self.assertEqual(params['PresignedUrl'],
'https://ec2.us-west-2.amazonaws.com?PRESIGNED_STUFF')
# We should also populate the DestinationRegion with the
# region_name of the endpoint object.
self.assertEqual(params['DestinationRegion'], 'us-east-1')

def test_destination_region_left_untouched(self):
def test_destination_region_always_changed(self):
# If the user provides a destination region, we will still
# override the DesinationRegion with the region_name from
# the endpoint object.
operation = mock.Mock()
source_endpoint = mock.Mock()
signed_request = mock.Mock()
signed_request.url = 'SIGNED_REQUEST'
source_endpoint.auth.credentials = mock.sentinel.credentials
source_endpoint.create_request.return_value = signed_request
operation.service.get_endpoint.return_value = source_endpoint
actual_region = 'us-west-1'
v4query_auth = mock.Mock()

def add_auth(request):
request.url += '?PRESIGNED_STUFF'

v4query_auth.add_auth = add_auth

request_signer = mock.Mock()
request_signer._region_name = 'us-west-1'
request_signer._region_name = actual_region
request_signer.get_auth.return_value = v4query_auth

endpoint = mock.Mock()
request = AWSRequest()
request.method = 'POST'
request.url = 'https://ec2.us-east-1.amazonaws.com'
request = request.prepare()
endpoint.create_request.return_value = request

# The user provides us-east-1, but we will override this to
# endpoint.region_name, of 'us-west-1' in this case.
params = {'SourceRegion': 'us-west-2', 'DestinationRegion': 'us-east-1'}
handlers.copy_snapshot_encrypted(operation, {'body': params},
request_signer)
handlers.copy_snapshot_encrypted({'body': params},
request_signer,
endpoint)
# Always use the DestinationRegion from the endpoint, regardless of
# whatever value the user provides.
self.assertEqual(params['DestinationRegion'], 'us-west-1')
self.assertEqual(params['DestinationRegion'], actual_region)

def test_500_status_code_set_for_200_response(self):
http_response = mock.Mock()
Expand Down