Skip to content

Commit

Permalink
Support blob streaming for file-like objects
Browse files Browse the repository at this point in the history
  • Loading branch information
linar-jether committed Jul 14, 2016
1 parent f2fae7b commit 43464b7
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 13 deletions.
17 changes: 13 additions & 4 deletions gcloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import copy
import hashlib
from io import BytesIO
from io import UnsupportedOperation
import json
import mimetypes
import os
Expand Down Expand Up @@ -491,10 +492,11 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
total_bytes = size
if total_bytes is None:
if hasattr(file_obj, 'fileno'):
total_bytes = os.fstat(file_obj.fileno()).st_size
else:
raise ValueError('total bytes could not be determined. Please '
'pass an explicit size.')
try:
total_bytes = os.fstat(file_obj.fileno()).st_size
except (OSError, UnsupportedOperation):
pass # Assuming fd is not an actual file (maybe socket).

headers = {
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate',
Expand All @@ -510,6 +512,13 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
if self.chunk_size is not None:
upload.chunksize = self.chunk_size

if total_bytes is None:
upload.strategy = RESUMABLE_UPLOAD
elif total_bytes is None:
raise ValueError('total bytes could not be determined. Please '
'pass an explicit size, or supply a chunk size '
'for a streaming transfer.')

url_builder = _UrlBuilder(bucket_name=self.bucket.name,
object_name=self.name)
upload_config = _UploadConfig()
Expand Down
96 changes: 96 additions & 0 deletions gcloud/storage/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,102 @@ def _upload_from_file_simple_test_helper(self, properties=None,
self.assertEqual(headers['Content-Length'], '6')
self.assertEqual(headers['Content-Type'], expected_content_type)

def test_upload_from_file_stream(self):
from six.moves.http_client import OK
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit
from gcloud.streaming import http_wrapper

BLOB_NAME = 'blob-name'
UPLOAD_URL = 'http://example.com/upload/name/key'
DATA = b'ABCDE'
loc_response = {'status': OK, 'location': UPLOAD_URL}
chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE,
'range': 'bytes 0-4'}
chunk2_response = {'status': OK}
# Need valid JSON on last response, since resumable.
connection = _Connection(
(loc_response, b''),
(chunk1_response, b''),
(chunk2_response, b'{}'),
)
client = _Client(connection)
bucket = _Bucket(client)
blob = self._makeOne(BLOB_NAME, bucket=bucket)
blob._CHUNK_SIZE_MULTIPLE = 1
blob.chunk_size = 5

from gcloud.streaming.test_transfer import _Stream
file_obj = _Stream(DATA)

# Mock stream closes at end of data, like a socket might
def is_stream_closed(stream):
if stream.tell() < len(DATA):
return stream._closed
else:
return stream.close() or True

_Stream.closed = property(is_stream_closed)

def fileno_mock():
from io import UnsupportedOperation
raise UnsupportedOperation()

file_obj.fileno = fileno_mock

blob.upload_from_file(file_obj)

# Remove the temp property
delattr(_Stream, "closed")

rq = connection.http._requested
self.assertEqual(len(rq), 3)

# Requested[0]
headers = dict(
[(x.title(), str(y)) for x, y in rq[0].pop('headers').items()])
self.assertEqual(headers['Content-Length'], '0')
self.assertEqual(headers['X-Upload-Content-Type'],
'application/octet-stream')

uri = rq[0].pop('uri')
scheme, netloc, path, qs, _ = urlsplit(uri)
self.assertEqual(scheme, 'http')
self.assertEqual(netloc, 'example.com')
self.assertEqual(path, '/b/name/o')
self.assertEqual(dict(parse_qsl(qs)),
{'uploadType': 'resumable', 'name': BLOB_NAME})
self.assertEqual(rq[0], {
'method': 'POST',
'body': '',
'connection_type': None,
'redirections': 5,
})

# Requested[1]
headers = dict(
[(x.title(), str(y)) for x, y in rq[1].pop('headers').items()])
self.assertEqual(headers['Content-Range'], 'bytes 0-4/*')
self.assertEqual(rq[1], {
'method': 'PUT',
'uri': UPLOAD_URL,
'body': DATA[:5],
'connection_type': None,
'redirections': 5,
})

# Requested[2]
headers = dict(
[(x.title(), str(y)) for x, y in rq[2].pop('headers').items()])
self.assertEqual(headers['Content-Range'], 'bytes 5-9/*')
self.assertEqual(rq[2], {
'method': 'PUT',
'uri': UPLOAD_URL,
'body': DATA[5:],
'connection_type': None,
'redirections': 5,
})

def test_upload_from_file_simple(self):
self._upload_from_file_simple_test_helper(
expected_content_type='application/octet-stream')
Expand Down
7 changes: 6 additions & 1 deletion gcloud/streaming/buffered_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ def __init__(self, stream, start, size):
self._stream = stream
self._start_pos = start
self._buffer_pos = 0
self._buffered_data = self._stream.read(size)

if not hasattr(self._stream, 'closed') or not self._stream.closed:
self._buffered_data = self._stream.read(size)
else:
self._buffered_data = b''

self._stream_at_end = len(self._buffered_data) < size
self._end_pos = self._start_pos + len(self._buffered_data)

Expand Down
18 changes: 10 additions & 8 deletions gcloud/streaming/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,14 +1038,16 @@ def stream_file(self, use_chunks=True):
'Failed to transfer all bytes in chunk, upload paused at '
'byte %d' % self.progress)
if self.complete and hasattr(self.stream, 'seek'):
current_pos = self.stream.tell()
self.stream.seek(0, os.SEEK_END)
end_pos = self.stream.tell()
self.stream.seek(current_pos)
if current_pos != end_pos:
raise TransferInvalidError(
'Upload complete with %s additional bytes left in stream' %
(int(end_pos) - int(current_pos)))
if not hasattr(self.stream, 'seekable') or self.stream.seekable():
current_pos = self.stream.tell()
self.stream.seek(0, os.SEEK_END)
end_pos = self.stream.tell()
self.stream.seek(current_pos)
if current_pos != end_pos:
raise TransferInvalidError(
'Upload complete with %s '
'additional bytes left in stream' %
(int(end_pos) - int(current_pos)))
return response

def _send_media_request(self, request, end):
Expand Down

0 comments on commit 43464b7

Please sign in to comment.