Skip to content

Commit

Permalink
Add create_resumable_upload_session (#2989)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jon Wayne Parrott authored Feb 9, 2017
1 parent db0dc85 commit 9619004
Show file tree
Hide file tree
Showing 2 changed files with 276 additions and 38 deletions.
241 changes: 203 additions & 38 deletions storage/google/cloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,110 @@ def download_as_string(self, client=None):
self.download_to_file(string_buffer, client=client)
return string_buffer.getvalue()

def _create_upload(
self, client, file_obj=None, size=None, content_type=None,
chunk_size=None, strategy=None, extra_headers=None):
"""Helper for upload methods.
Creates a :class:`google.cloud.core.streaming.Upload` object to handle
the details of uploading a file to Cloud Storage.
:type client: :class:`~google.cloud.storage.client.Client` or
``NoneType``
:param client: Optional. The client to use. If not passed, falls back
to the ``client`` stored on the blob's bucket.
:type file_obj: file
:param file_obj: A file handle open for reading.
:type size: int
:param size: The size of the upload, in bytes.
:type content_type: str
:param content_type: Optional type of content being uploaded.
:type chunk_size: int
:param chunk_size: The size of each chunk when doing resumable and
media uploads.
:type strategy: str
:param strategy: Either
:attr:`google.cloud.core.streaming.transfer.SIMPLE_UPLOAD` or
:attr:`google.cloud.core.streaming.transfer.RESUMABLE_UPLOAD`.
:type extra_headers: dict
:param extra_headers: Additional headers to be sent with the upload
initiation request.
:rtype: Tuple[google.cloud.core.streaming.Upload,
google.cloud.core.streaming.Request,
google.cloud.core.streaming.Response]
:returns: The Upload object, the upload HTTP request, and the upload
initiation response.
"""

client = self._require_client(client)

# Use ``_base_connection`` rather ``_connection`` since the current
# connection may be a batch. A batch wraps a client's connection,
# but does not store the ``http`` object. The rest (API_BASE_URL and
# build_api_url) are also defined on the Batch class, but we just
# use the wrapped connection since it has all three (http,
# API_BASE_URL and build_api_url).
connection = client._base_connection

content_type = (content_type or self._properties.get('contentType') or
'application/octet-stream')

headers = {
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate',
'User-Agent': connection.USER_AGENT,
}

if extra_headers:
headers.update(extra_headers)

headers.update(_get_encryption_headers(self._encryption_key))

# Use apitools' Upload functionality
upload = Upload(
file_obj, content_type, total_size=size, auto_transfer=False)

if chunk_size is not None:
upload.chunksize = chunk_size

if strategy is not None:
upload.strategy = RESUMABLE_UPLOAD

url_builder = _UrlBuilder(
bucket_name=self.bucket.name,
object_name=self.name)
upload_config = _UploadConfig()

# Temporary URL until strategy is determined.
base_url = connection.API_BASE_URL + '/upload'
upload_url = connection.build_api_url(
api_base_url=base_url,
path=self.bucket.path + '/o')

# Configure the upload request parameters.
request = Request(upload_url, 'POST', headers)
upload.configure_request(upload_config, request, url_builder)

# Configure final URL
query_params = url_builder.query_params
base_url = connection.API_BASE_URL + '/upload'
request.url = connection.build_api_url(
api_base_url=base_url,
path=self.bucket.path + '/o',
query_params=query_params)

# Start the upload session
response = upload.initialize_upload(request, connection.http)

return upload, request, response

@staticmethod
def _check_response_error(request, http_response):
"""Helper for :meth:`upload_from_file`."""
Expand All @@ -390,7 +494,6 @@ def _check_response_error(request, http_response):
raise make_exception(faux_response, http_response.content,
error_info=request.url)

# pylint: disable=too-many-locals
def upload_from_file(self, file_obj, rewind=False, size=None,
content_type=None, num_retries=6, client=None):
"""Upload the contents of this blob from a file-like object.
Expand Down Expand Up @@ -459,8 +562,6 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
# use the wrapped connection since it has all three (http,
# API_BASE_URL and build_api_url).
connection = client._base_connection
content_type = (content_type or self._properties.get('contentType') or
'application/octet-stream')

# Rewind the file if desired.
if rewind:
Expand All @@ -475,52 +576,28 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
except (OSError, UnsupportedOperation):
pass # Assuming fd is not an actual file (maybe socket).

headers = {
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate',
'User-Agent': connection.USER_AGENT,
}

headers.update(_get_encryption_headers(self._encryption_key))

upload = Upload(file_obj, content_type, total_bytes,
auto_transfer=False)

chunk_size = None
strategy = None
if self.chunk_size is not None:
upload.chunksize = self.chunk_size
chunk_size = self.chunk_size

if total_bytes is None:
upload.strategy = RESUMABLE_UPLOAD
strategy = RESUMABLE_UPLOAD
elif total_bytes is None:
raise ValueError('total bytes could not be determined. Please '
'pass an explicit size, or supply a chunk size '
'for a streaming transfer.')

url_builder = _UrlBuilder(bucket_name=self.bucket.name,
object_name=self.name)
upload_config = _UploadConfig()

# Temporary URL, until we know simple vs. resumable.
base_url = connection.API_BASE_URL + '/upload'
upload_url = connection.build_api_url(api_base_url=base_url,
path=self.bucket.path + '/o')

# Use apitools 'Upload' facility.
request = Request(upload_url, 'POST', headers)

upload.configure_request(upload_config, request, url_builder)
query_params = url_builder.query_params
base_url = connection.API_BASE_URL + '/upload'
request.url = connection.build_api_url(api_base_url=base_url,
path=self.bucket.path + '/o',
query_params=query_params)
upload.initialize_upload(request, connection.http)
upload, request, _ = self._create_upload(
client, file_obj=file_obj, size=total_bytes,
content_type=content_type, chunk_size=chunk_size,
strategy=strategy)

if upload.strategy == RESUMABLE_UPLOAD:
http_response = upload.stream_file(use_chunks=True)
else:
http_response = make_api_request(connection.http, request,
retries=num_retries)
http_response = make_api_request(
connection.http, request, retries=num_retries)

self._check_response_error(request, http_response)
response_content = http_response.content
Expand All @@ -529,7 +606,6 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
six.string_types): # pragma: NO COVER Python3
response_content = response_content.decode('utf-8')
self._set_properties(json.loads(response_content))
# pylint: enable=too-many-locals

def upload_from_filename(self, filename, content_type=None, client=None):
"""Upload this blob's contents from the content of a named file.
Expand Down Expand Up @@ -604,6 +680,95 @@ def upload_from_string(self, data, content_type='text/plain', client=None):
file_obj=string_buffer, rewind=True, size=len(data),
content_type=content_type, client=client)

def create_resumable_upload_session(
self,
content_type=None,
size=None,
origin=None,
client=None):
"""Create a resumable upload session.
Resumable upload sessions allow you to start an upload session from
one client and complete the session in another. This method is called
by the initiator to set the metadata and limits. The initiator then
passes the session URL to the client that will upload the binary data.
The client performs a PUT request on the session URL to complete the
upload. This process allows untrusted clients to upload to an
access-controlled bucket. For more details, see the
`documentation on signed URLs`_.
.. _documentation on signed URLs: https://cloud.google.com/storage\
/docs/access-control/signed-urls#signing-resumable
The content type of the upload will either be
- The value passed in to the function (if any)
- The value stored on the current blob
- The default value of 'application/octet-stream'
.. note::
The effect of uploading to an existing blob depends on the
"versioning" and "lifecycle" policies defined on the blob's
bucket. In the absence of those policies, upload will
overwrite any existing contents.
See the `object versioning
<https://cloud.google.com/storage/docs/object-versioning>`_ and
`lifecycle <https://cloud.google.com/storage/docs/lifecycle>`_
API documents for details.
If :attr:`encryption_key` is set, the blob will be `encrypted`_.
.. _encrypted: https://cloud.google.com/storage/docs/\
encryption#customer-supplied
:type size: int
:param size: Optional, the maximum number of bytes that can be
uploaded using this session. If the size is not known when creating
the session, this should be left blank.
:type content_type: str
:param content_type: Optional type of content being uploaded. This can
be used to restrict the allowed file type that can be uploaded
to the size.
:type origin: str
:param origin: Optional origin. If set, the upload can only be
completed by a user-agent that uploads from the given origin. This
can be useful when passing the session to a web client.
:type client: :class:`~google.cloud.storage.client.Client` or
``NoneType``
:param client: Optional. The client to use. If not passed, falls back
to the ``client`` stored on the blob's bucket.
:rtype: str
:returns: The resumable upload session URL. The upload can be
completed by making an HTTP PUT request with the file's contents.
:raises: :class:`google.cloud.exceptions.GoogleCloudError`
if the session creation response returns an error status.
"""

extra_headers = {}

if origin is not None:
# This header is specifically for client-side uploads, it
# determines the origins allowed for CORS.
extra_headers['Origin'] = origin

_, _, start_response = self._create_upload(
client,
size=size,
content_type=content_type,
strategy=RESUMABLE_UPLOAD,
extra_headers=extra_headers)

# The location header contains the session URL. This can be used
# to continue the upload.
resumable_upload_session_url = start_response.info['location']

return resumable_upload_session_url

def make_public(self, client=None):
"""Make this blob public giving all users read access.
Expand Down
73 changes: 73 additions & 0 deletions storage/unit_tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,79 @@ def test_upload_from_string_text_w_key(self):
self.assertEqual(headers['Content-Type'], 'text/plain')
self.assertEqual(rq[0]['body'], ENCODED)

def test_create_resumable_upload_session(self):
from six.moves.http_client import OK
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit

BLOB_NAME = 'blob-name'
UPLOAD_URL = 'http://example.com/upload/name/key'
loc_response = {'status': OK, 'location': UPLOAD_URL}
connection = _Connection(
(loc_response, '{}'),
)
client = _Client(connection)
bucket = _Bucket(client=client)
blob = self._make_one(BLOB_NAME, bucket=bucket)

resumable_url = blob.create_resumable_upload_session()

self.assertEqual(resumable_url, UPLOAD_URL)

rq = connection.http._requested
self.assertEqual(len(rq), 1)
self.assertEqual(rq[0]['method'], 'POST')

uri = rq[0]['uri']
scheme, netloc, path, qs, _ = urlsplit(uri)
self.assertEqual(scheme, 'http')
self.assertEqual(netloc, 'example.com')
self.assertEqual(path, '/b/name/o')
self.assertEqual(dict(parse_qsl(qs)),
{'uploadType': 'resumable', 'name': BLOB_NAME})
headers = {
key.title(): str(value) for key, value in rq[0]['headers'].items()}
self.assertEqual(headers['Content-Length'], '0')
self.assertEqual(
headers['X-Upload-Content-Type'], 'application/octet-stream')

def test_create_resumable_upload_session_args(self):
from six.moves.http_client import OK

BLOB_NAME = 'blob-name'
UPLOAD_URL = 'http://example.com/upload/name/key'
CONTENT_TYPE = 'text/plain'
SIZE = 1024
ORIGIN = 'http://google.com'

loc_response = {'status': OK, 'location': UPLOAD_URL}
connection = _Connection(
(loc_response, '{}'),
)
client = _Client(connection)
bucket = _Bucket(client=client)
blob = self._make_one(BLOB_NAME, bucket=bucket)

resumable_url = blob.create_resumable_upload_session(
content_type=CONTENT_TYPE,
size=SIZE,
origin=ORIGIN)

self.assertEqual(resumable_url, UPLOAD_URL)

rq = connection.http._requested
self.assertEqual(len(rq), 1)
self.assertEqual(rq[0]['method'], 'POST')

headers = {
key.title(): str(value) for key, value in rq[0]['headers'].items()}
self.assertEqual(headers['Content-Length'], '0')
self.assertEqual(headers['X-Upload-Content-Length'], str(SIZE))
self.assertEqual(
headers['X-Upload-Content-Type'], 'text/plain')
self.assertEqual(
headers['Origin'], ORIGIN)

def test_make_public(self):
from six.moves.http_client import OK
from google.cloud.storage.acl import _ACLEntity
Expand Down

0 comments on commit 9619004

Please sign in to comment.