Skip to content

Commit

Permalink
add support for signing cloudfront URLs
Browse files Browse the repository at this point in the history
  • Loading branch information
terencehonles committed Sep 5, 2018
1 parent e0a00fb commit 5b36b22
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 6 deletions.
77 changes: 74 additions & 3 deletions storages/backends/s3boto3.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import mimetypes
import os
import posixpath
import sys
import threading
import warnings

from datetime import datetime, timedelta
from gzip import GzipFile
from tempfile import SpooledTemporaryFile

Expand All @@ -28,11 +31,56 @@
from boto3 import __version__ as boto3_version
from botocore.client import Config
from botocore.exceptions import ClientError
from botocore.signers import CloudFrontSigner
except ImportError:
raise ImproperlyConfigured("Could not load Boto3's S3 bindings.\n"
"See https://github.com/boto/boto3")


# NOTE: these are defined as functions so both can be tested
def _use_cryptography_signer():
# https://cryptography.io as an RSA backend
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import (
load_pem_private_key
)

def _cloud_front_signer_from_pem(key_id, pem):
key = load_pem_private_key(
pem, password=None, backend=default_backend())

return CloudFrontSigner(
key_id, lambda x: key.sign(x, padding.PKCS1v15(), hashes.SHA1()))

return _cloud_front_signer_from_pem


def _use_rsa_signer():
# https://stuvel.eu/rsa as an RSA backend
import rsa

def _cloud_front_signer_from_pem(key_id, pem):
key = rsa.PrivateKey.load_pkcs1(pem)
return CloudFrontSigner(key_id, lambda x: rsa.sign(x, key, 'SHA-1'))

return _cloud_front_signer_from_pem


for _signer_factory in (_use_cryptography_signer, _use_rsa_signer):
try:
_cloud_front_signer_from_pem = _signer_factory()
break
except ImportError:
pass
else:
def _cloud_front_signer_from_pem(key_id, pem):
raise ImproperlyConfigured(
'An RSA backend is required for signing cloudfront URLs.\n'
'Supported backends are packages: cryptography and rsa.')


boto3_version_info = tuple([int(i) for i in boto3_version.split('.')])


Expand Down Expand Up @@ -201,6 +249,10 @@ class S3Boto3Storage(Storage):
location = setting('AWS_LOCATION', '')
encryption = setting('AWS_S3_ENCRYPTION', False)
custom_domain = setting('AWS_S3_CUSTOM_DOMAIN')
cloudfront_key_id = setting('AWS_CLOUDFRONT_KEY_ID')
cloudfront_keys = dict(
(key_id, _cloud_front_signer_from_pem(key_id, pem))
for key_id, pem in setting('AWS_CLOUDFRONT_KEYS', {}).items())
addressing_style = setting('AWS_S3_ADDRESSING_STYLE')
secure_urls = setting('AWS_S3_SECURE_URLS', True)
file_name_charset = setting('AWS_S3_FILE_NAME_CHARSET', 'utf-8')
Expand Down Expand Up @@ -287,6 +339,16 @@ def __init__(self, acl=None, bucket=None, **settings):
"set AWS_DEFAULT_ACL."
)

if self.cloudfront_key_id is None and self.cloudfront_keys:
if sys.version_info < (3, 6) and len(self.cloudfront_keys) > 1:
warnings.warn(
'Dictionary order is not guaranteed for Python < 3.6, '
'and the "first" of AWS_CLOUDFRONT_KEYS is not guaranteed. '
'Set AWS_CLOUDFRONT_KEY_ID to silence this warning.'
)

self.cloudfront_key_id = next(iter(self.cloudfront_keys.keys()))

def __getstate__(self):
state = self.__dict__.copy()
state.pop('_connections', None)
Expand Down Expand Up @@ -610,12 +672,21 @@ def url(self, name, parameters=None, expire=None):
# Preserve the trailing slash after normalizing the path.
# TODO: Handle force_http=not self.secure_urls like in s3boto
name = self._normalize_name(self._clean_name(name))
if self.custom_domain:
return "%s//%s/%s" % (self.url_protocol,
self.custom_domain, filepath_to_uri(name))
if expire is None:
expire = self.querystring_expire

if self.custom_domain:
url = "%s//%s/%s" % (
self.url_protocol, self.custom_domain, filepath_to_uri(name))

if self.cloudfront_key_id:
signer = self.cloudfront_keys[self.cloudfront_key_id]
expiration = datetime.utcnow() + timedelta(seconds=expire)

return signer.generate_presigned_url(url, date_less_than=expiration)

return url

params = parameters.copy() if parameters else {}
params['Bucket'] = self.bucket.name
params['Key'] = self._encode_name(name)
Expand Down
38 changes: 38 additions & 0 deletions tests/test_s3boto3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import pickle
import threading
import warnings

from datetime import datetime
from textwrap import dedent
from unittest import skipIf

from botocore.exceptions import ClientError
Expand Down Expand Up @@ -505,6 +507,42 @@ def test_storage_url(self):
ExpiresIn=custom_expire
)

def test_storage_url_custom_domain_signed_urls(self):
key_id = 'test-key'
filename = 'file.txt'
pem = dedent(
'''\
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQCXVuwcMk+JmVSKuQ1K4dZx4Z1dEcRQgTlqvhAyljIpttXlZh2/
fD3GkJCiqfwEmo+cdNK/LFzRj/CX8Wz1z1lH2USONpG6sAkotkatCbejiItDu5y6
janGJHfuWXu6B/o9gwZylU1gIsePY3lLNk+r9QhXUO4jXw6zLJftVwKPhQIDAQAB
AoGAbpkRV9HUmoQ5al+uPSkp5HOy4s8XHpYxdbaMc8ubwSxiyJCF8OhE5RXE/Xso
N90UUox1b0xmUKfWddPzgvgTD/Ub7D6Ukf+nVWDX60tWgNxICAUHptGL3tWweaAy
H+0+vZ0TzvTt9r00vW0FzO7F8X9/Rs1ntDRLtF3RCCxdq0kCQQDHFu+t811lCvy/
67rMEKGvNsNNSTrzOrNr3PqUrCnOrzKazjFVjsKv5VzI/U+rXGYKWJsMpuCFiHZ3
DILUC09TAkEAwpm2S6MN6pzn9eY6pmhOxZ+GQGGRUkKZfC1GDxaRSRb8sKTjptYw
WSemJSxiDzdj3Po2hF0lbhkpJgUq6xnCxwJAZgHHfn5CLSJrDD7Q7/vZi/foK3JJ
BRTfl3Wa4pAvv5meuRjKyEakVBGV79lyd5+ZHNX3Y40hXunjoO3FHrZIxwJAdRzu
waxahrRxQOKSr20c4wAzWnGddIUSO9I/VHs/al5EKsbBHrnOlQkwizSfuwqZtfZ7
csNf8FeCFRiNELoLJwJAZxWBE2+8J9VW9AQ0SE7j4FyM/B8FvRhF5PLAAsw/OxHO
SxiFP7Ptdac1tm5H5zOqaqSHWphI19HNNilXKmxuCA==
-----END RSA PRIVATE KEY-----'''
).encode('ascii')

url = 'https://mock.cloudfront.net/file.txt?Expires=3600&Signature=DbqVgh3FHtttQxof214tSAVE8Nqn3Q4Ii7eR3iykbOqAPbV89HC3EB~0CWxarpLNtbfosS5LxiP5EutriM7E8uR4Gm~UVY-PFUjPcwqdnmAiKJF0EVs7koJcMR8MKDStuWfFKVUPJ8H7ORYTOrixyHBV2NOrpI6SN5UX6ctNM50_&Key-Pair-Id=test-key' # noqa

self.storage.custom_domain = "mock.cloudfront.net"
self.storage.cloudfront_key_id = key_id

for pem_to_signer in (
s3boto3._use_cryptography_signer(),
s3boto3._use_rsa_signer()):
self.storage.cloudfront_keys = {key_id: pem_to_signer(key_id, pem)}

with mock.patch('storages.backends.s3boto3.datetime') as mock_datetime:
mock_datetime.utcnow.return_value = datetime.utcfromtimestamp(0)
self.assertEqual(self.storage.url(filename), url)

def test_generated_url_is_encoded(self):
self.storage.custom_domain = "mock.cloudfront.net"
filename = "whacky & filename.mp4"
Expand Down
8 changes: 5 additions & 3 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ deps =
django21: Django>=2.1,<2.2
djangomaster: https://github.com/django/django/archive/master.tar.gz
py27: mock
apache-libcloud
pytest
pytest-cov
apache-libcloud
azure-storage-blob>=1.3.1
azure>=3.0.0
boto
boto3
cryptography
dropbox
google-cloud-storage
paramiko
azure>=3.0.0
azure-storage-blob>=1.3.1
rsa

[testenv:integration]
ignore_errors = True
Expand Down

0 comments on commit 5b36b22

Please sign in to comment.