Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support blob streaming for file-like objects #1914

Merged
merged 3 commits into from
Jul 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions gcloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import copy
import hashlib
from io import BytesIO
from io import UnsupportedOperation
import json
import mimetypes
import os
Expand Down Expand Up @@ -491,10 +492,11 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
total_bytes = size
if total_bytes is None:
if hasattr(file_obj, 'fileno'):
total_bytes = os.fstat(file_obj.fileno()).st_size
else:
raise ValueError('total bytes could not be determined. Please '
'pass an explicit size.')
try:
total_bytes = os.fstat(file_obj.fileno()).st_size
except (OSError, UnsupportedOperation):
pass # Assuming fd is not an actual file (maybe socket).

headers = {
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate',
Expand All @@ -510,6 +512,13 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
if self.chunk_size is not None:
upload.chunksize = self.chunk_size

if total_bytes is None:
upload.strategy = RESUMABLE_UPLOAD
elif total_bytes is None:
raise ValueError('total bytes could not be determined. Please '
'pass an explicit size, or supply a chunk size '
'for a streaming transfer.')

url_builder = _UrlBuilder(bucket_name=self.bucket.name,
object_name=self.name)
upload_config = _UploadConfig()
Expand Down
96 changes: 96 additions & 0 deletions gcloud/storage/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,102 @@ def _upload_from_file_simple_test_helper(self, properties=None,
self.assertEqual(headers['Content-Length'], '6')
self.assertEqual(headers['Content-Type'], expected_content_type)

def test_upload_from_file_stream(self):
from six.moves.http_client import OK
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit
from gcloud.streaming import http_wrapper

BLOB_NAME = 'blob-name'
UPLOAD_URL = 'http://example.com/upload/name/key'
DATA = b'ABCDE'
loc_response = {'status': OK, 'location': UPLOAD_URL}
chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE,
'range': 'bytes 0-4'}
chunk2_response = {'status': OK}
# Need valid JSON on last response, since resumable.
connection = _Connection(
(loc_response, b''),
(chunk1_response, b''),
(chunk2_response, b'{}'),
)
client = _Client(connection)
bucket = _Bucket(client)
blob = self._makeOne(BLOB_NAME, bucket=bucket)
blob._CHUNK_SIZE_MULTIPLE = 1
blob.chunk_size = 5

from gcloud.streaming.test_transfer import _Stream
file_obj = _Stream(DATA)

# Mock stream closes at end of data, like a socket might
def is_stream_closed(stream):
if stream.tell() < len(DATA):
return stream._closed
else:
return stream.close() or True

_Stream.closed = property(is_stream_closed)

def fileno_mock():
from io import UnsupportedOperation
raise UnsupportedOperation()

file_obj.fileno = fileno_mock

blob.upload_from_file(file_obj)

# Remove the temp property
delattr(_Stream, "closed")

rq = connection.http._requested
self.assertEqual(len(rq), 3)

# Requested[0]
headers = dict(
[(x.title(), str(y)) for x, y in rq[0].pop('headers').items()])
self.assertEqual(headers['Content-Length'], '0')
self.assertEqual(headers['X-Upload-Content-Type'],
'application/octet-stream')

uri = rq[0].pop('uri')
scheme, netloc, path, qs, _ = urlsplit(uri)
self.assertEqual(scheme, 'http')
self.assertEqual(netloc, 'example.com')
self.assertEqual(path, '/b/name/o')
self.assertEqual(dict(parse_qsl(qs)),
{'uploadType': 'resumable', 'name': BLOB_NAME})
self.assertEqual(rq[0], {
'method': 'POST',
'body': '',
'connection_type': None,
'redirections': 5,
})

# Requested[1]
headers = dict(
[(x.title(), str(y)) for x, y in rq[1].pop('headers').items()])
self.assertEqual(headers['Content-Range'], 'bytes 0-4/*')
self.assertEqual(rq[1], {
'method': 'PUT',
'uri': UPLOAD_URL,
'body': DATA[:5],
'connection_type': None,
'redirections': 5,
})

# Requested[2]
headers = dict(
[(x.title(), str(y)) for x, y in rq[2].pop('headers').items()])
self.assertEqual(headers['Content-Range'], 'bytes */5')
self.assertEqual(rq[2], {
'method': 'PUT',
'uri': UPLOAD_URL,
'body': DATA[5:],
'connection_type': None,
'redirections': 5,
})

def test_upload_from_file_simple(self):
self._upload_from_file_simple_test_helper(
expected_content_type='application/octet-stream')
Expand Down
7 changes: 6 additions & 1 deletion gcloud/streaming/buffered_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ def __init__(self, stream, start, size):
self._stream = stream
self._start_pos = start
self._buffer_pos = 0
self._buffered_data = self._stream.read(size)

if not hasattr(self._stream, 'closed') or not self._stream.closed:
self._buffered_data = self._stream.read(size)
else:
self._buffered_data = b''

self._stream_at_end = len(self._buffered_data) < size
self._end_pos = self._start_pos + len(self._buffered_data)

Expand Down
18 changes: 10 additions & 8 deletions gcloud/streaming/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,14 +1038,16 @@ def stream_file(self, use_chunks=True):
'Failed to transfer all bytes in chunk, upload paused at '
'byte %d' % self.progress)
if self.complete and hasattr(self.stream, 'seek'):
current_pos = self.stream.tell()
self.stream.seek(0, os.SEEK_END)
end_pos = self.stream.tell()
self.stream.seek(current_pos)
if current_pos != end_pos:
raise TransferInvalidError(
'Upload complete with %s additional bytes left in stream' %
(int(end_pos) - int(current_pos)))
if not hasattr(self.stream, 'seekable') or self.stream.seekable():
current_pos = self.stream.tell()
self.stream.seek(0, os.SEEK_END)
end_pos = self.stream.tell()
self.stream.seek(current_pos)
if current_pos != end_pos:
raise TransferInvalidError(
'Upload complete with %s '
'additional bytes left in stream' %
(int(end_pos) - int(current_pos)))
return response

def _send_media_request(self, request, end):
Expand Down