From b5a0f050ca1e61fa399517cbe4b1228f3fbb2cb8 Mon Sep 17 00:00:00 2001 From: "Terence D. Honles" Date: Wed, 5 Sep 2018 11:04:40 -0700 Subject: [PATCH] add support for signing cloudfront URLs --- storages/backends/s3boto3.py | 65 ++++++++++++++++++++++++++++++++++-- tests/test_s3boto3.py | 37 ++++++++++++++++++++ tox.ini | 8 +++-- 3 files changed, 104 insertions(+), 6 deletions(-) diff --git a/storages/backends/s3boto3.py b/storages/backends/s3boto3.py index 6a977537e..b8719a7fe 100644 --- a/storages/backends/s3boto3.py +++ b/storages/backends/s3boto3.py @@ -4,6 +4,7 @@ import posixpath import threading import warnings +from datetime import datetime, timedelta from gzip import GzipFile from tempfile import SpooledTemporaryFile @@ -28,11 +29,56 @@ from boto3 import __version__ as boto3_version from botocore.client import Config from botocore.exceptions import ClientError + from botocore.signers import CloudFrontSigner except ImportError: raise ImproperlyConfigured("Could not load Boto3's S3 bindings.\n" "See https://github.com/boto/boto3") +# NOTE: these are defined as functions so both can be tested +def _use_cryptography_signer(): + # https://cryptography.io as an RSA backend + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import padding + from cryptography.hazmat.primitives.serialization import ( + load_pem_private_key + ) + + def _cloud_front_signer_from_pem(key_id, pem): + key = load_pem_private_key( + pem, password=None, backend=default_backend()) + + return CloudFrontSigner( + key_id, lambda x: key.sign(x, padding.PKCS1v15(), hashes.SHA1())) + + return _cloud_front_signer_from_pem + + +def _use_rsa_signer(): + # https://stuvel.eu/rsa as an RSA backend + import rsa + + def _cloud_front_signer_from_pem(key_id, pem): + key = rsa.PrivateKey.load_pkcs1(pem) + return CloudFrontSigner(key_id, lambda x: rsa.sign(x, key, 'SHA-1')) + + return _cloud_front_signer_from_pem + + +for _signer_factory in (_use_cryptography_signer, _use_rsa_signer): + try: + _cloud_front_signer_from_pem = _signer_factory() + break + except ImportError: + pass +else: + def _cloud_front_signer_from_pem(key_id, pem): + raise ImproperlyConfigured( + 'An RSA backend is required for signing cloudfront URLs.\n' + 'Supported backends are packages: cryptography and rsa.') + + boto3_version_info = tuple([int(i) for i in boto3_version.split('.')]) @@ -201,6 +247,10 @@ class S3Boto3Storage(Storage): location = setting('AWS_LOCATION', '') encryption = setting('AWS_S3_ENCRYPTION', False) custom_domain = setting('AWS_S3_CUSTOM_DOMAIN') + cloudfront_key_id = setting('AWS_CLOUDFRONT_KEY_ID') + cloudfront_keys = dict( + (key_id, _cloud_front_signer_from_pem(key_id, pem)) + for key_id, pem in setting('AWS_CLOUDFRONT_KEYS', {}).items()) addressing_style = setting('AWS_S3_ADDRESSING_STYLE') secure_urls = setting('AWS_S3_SECURE_URLS', True) file_name_charset = setting('AWS_S3_FILE_NAME_CHARSET', 'utf-8') @@ -610,12 +660,21 @@ def url(self, name, parameters=None, expire=None): # Preserve the trailing slash after normalizing the path. # TODO: Handle force_http=not self.secure_urls like in s3boto name = self._normalize_name(self._clean_name(name)) - if self.custom_domain: - return "%s//%s/%s" % (self.url_protocol, - self.custom_domain, filepath_to_uri(name)) if expire is None: expire = self.querystring_expire + if self.custom_domain: + url = "%s//%s/%s" % ( + self.url_protocol, self.custom_domain, filepath_to_uri(name)) + + if self.cloudfront_key_id: + signer = self.cloudfront_keys[self.cloudfront_key_id] + expiration = datetime.utcnow() + timedelta(seconds=expire) + + return signer.generate_presigned_url(url, date_less_than=expiration) + + return url + params = parameters.copy() if parameters else {} params['Bucket'] = self.bucket.name params['Key'] = self._encode_name(name) diff --git a/tests/test_s3boto3.py b/tests/test_s3boto3.py index a4489fcfa..b8bfe1565 100644 --- a/tests/test_s3boto3.py +++ b/tests/test_s3boto3.py @@ -6,6 +6,7 @@ import threading import warnings from datetime import datetime +from textwrap import dedent from unittest import skipIf from botocore.exceptions import ClientError @@ -505,6 +506,42 @@ def test_storage_url(self): ExpiresIn=custom_expire ) + def test_storage_url_custom_domain_signed_urls(self): + key_id = 'test-key' + filename = 'file.txt' + pem = dedent( + '''\ + -----BEGIN RSA PRIVATE KEY----- + MIICWwIBAAKBgQCXVuwcMk+JmVSKuQ1K4dZx4Z1dEcRQgTlqvhAyljIpttXlZh2/ + fD3GkJCiqfwEmo+cdNK/LFzRj/CX8Wz1z1lH2USONpG6sAkotkatCbejiItDu5y6 + janGJHfuWXu6B/o9gwZylU1gIsePY3lLNk+r9QhXUO4jXw6zLJftVwKPhQIDAQAB + AoGAbpkRV9HUmoQ5al+uPSkp5HOy4s8XHpYxdbaMc8ubwSxiyJCF8OhE5RXE/Xso + N90UUox1b0xmUKfWddPzgvgTD/Ub7D6Ukf+nVWDX60tWgNxICAUHptGL3tWweaAy + H+0+vZ0TzvTt9r00vW0FzO7F8X9/Rs1ntDRLtF3RCCxdq0kCQQDHFu+t811lCvy/ + 67rMEKGvNsNNSTrzOrNr3PqUrCnOrzKazjFVjsKv5VzI/U+rXGYKWJsMpuCFiHZ3 + DILUC09TAkEAwpm2S6MN6pzn9eY6pmhOxZ+GQGGRUkKZfC1GDxaRSRb8sKTjptYw + WSemJSxiDzdj3Po2hF0lbhkpJgUq6xnCxwJAZgHHfn5CLSJrDD7Q7/vZi/foK3JJ + BRTfl3Wa4pAvv5meuRjKyEakVBGV79lyd5+ZHNX3Y40hXunjoO3FHrZIxwJAdRzu + waxahrRxQOKSr20c4wAzWnGddIUSO9I/VHs/al5EKsbBHrnOlQkwizSfuwqZtfZ7 + csNf8FeCFRiNELoLJwJAZxWBE2+8J9VW9AQ0SE7j4FyM/B8FvRhF5PLAAsw/OxHO + SxiFP7Ptdac1tm5H5zOqaqSHWphI19HNNilXKmxuCA== + -----END RSA PRIVATE KEY-----''' + ).encode('ascii') + + url = 'https://mock.cloudfront.net/file.txt?Expires=3600&Signature=DbqVgh3FHtttQxof214tSAVE8Nqn3Q4Ii7eR3iykbOqAPbV89HC3EB~0CWxarpLNtbfosS5LxiP5EutriM7E8uR4Gm~UVY-PFUjPcwqdnmAiKJF0EVs7koJcMR8MKDStuWfFKVUPJ8H7ORYTOrixyHBV2NOrpI6SN5UX6ctNM50_&Key-Pair-Id=test-key' # noqa + + self.storage.custom_domain = "mock.cloudfront.net" + self.storage.cloudfront_key_id = key_id + + for pem_to_signer in ( + s3boto3._use_cryptography_signer(), + s3boto3._use_rsa_signer()): + self.storage.cloudfront_keys = {key_id: pem_to_signer(key_id, pem)} + + with mock.patch('storages.backends.s3boto3.datetime') as mock_datetime: + mock_datetime.utcnow.return_value = datetime.utcfromtimestamp(0) + self.assertEqual(self.storage.url(filename), url) + def test_generated_url_is_encoded(self): self.storage.custom_domain = "mock.cloudfront.net" filename = "whacky & filename.mp4" diff --git a/tox.ini b/tox.ini index 9257bebca..f9fb1aa55 100644 --- a/tox.ini +++ b/tox.ini @@ -20,16 +20,18 @@ deps = django21: Django>=2.1,<2.2 djangomaster: https://github.com/django/django/archive/master.tar.gz py27: mock + apache-libcloud pytest pytest-cov - apache-libcloud + azure-storage-blob>=1.3.1 + azure>=3.0.0 boto boto3 + cryptography dropbox google-cloud-storage paramiko - azure>=3.0.0 - azure-storage-blob>=1.3.1 + rsa [testenv:integration] ignore_errors = True