Skip to content

Commit f0a1157

Browse files
committed
Merge pull request #654 from tseaver/15-add_storage_batch
#15: Add `storage.batch.Batch`
2 parents d43c4a0 + c828bda commit f0a1157

File tree

4 files changed

+632
-5
lines changed

4 files changed

+632
-5
lines changed

gcloud/storage/batch.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
# Copyright 2014 Google Inc. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Batch updates / deletes of storage buckets / blobs.
15+
16+
See: https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch
17+
"""
18+
from email.encoders import encode_noop
19+
from email.generator import Generator
20+
from email.mime.application import MIMEApplication
21+
from email.mime.multipart import MIMEMultipart
22+
from email.parser import Parser
23+
import io
24+
import json
25+
26+
import six
27+
28+
from gcloud._localstack import _LocalStack
29+
from gcloud.storage.connection import Connection
30+
31+
32+
_BATCHES = _LocalStack()
33+
34+
35+
class MIMEApplicationHTTP(MIMEApplication):
36+
"""MIME type for ``application/http``.
37+
38+
Constructs payload from headers and body
39+
40+
:type headers: dict
41+
:param headers: HTTP headers
42+
43+
:type body: text or None
44+
:param body: HTTP payload
45+
"""
46+
def __init__(self, method, uri, headers, body):
47+
if isinstance(body, dict):
48+
body = json.dumps(body)
49+
headers['Content-Type'] = 'application/json'
50+
headers['Content-Length'] = len(body)
51+
if body is None:
52+
body = ''
53+
lines = ['%s %s HTTP/1.1' % (method, uri)]
54+
lines.extend(['%s: %s' % (key, value)
55+
for key, value in sorted(headers.items())])
56+
lines.append('')
57+
lines.append(body)
58+
payload = '\r\n'.join(lines)
59+
if six.PY2: # pragma: NO COVER Python2
60+
# Sigh. email.message.Message is an old-style class, so we
61+
# cannot use 'super()'.
62+
MIMEApplication.__init__(self, payload, 'http', encode_noop)
63+
else: # pragma: NO COVER Python3
64+
super_init = super(MIMEApplicationHTTP, self).__init__
65+
super_init(payload, 'http', encode_noop)
66+
67+
68+
class NoContent(object):
69+
"""Emulate an HTTP '204 No Content' response."""
70+
status = 204
71+
72+
73+
class Batch(Connection):
74+
"""Proxy an underlying connection, batching up change operations.
75+
76+
:type connection: :class:`gcloud.storage.connection.Connection`
77+
:param connection: the connection for which the batch proxies.
78+
"""
79+
_MAX_BATCH_SIZE = 1000
80+
81+
def __init__(self, connection):
82+
super(Batch, self).__init__(project=connection.project)
83+
self._connection = connection
84+
self._requests = []
85+
self._responses = []
86+
87+
def _do_request(self, method, url, headers, data):
88+
"""Override Connection: defer actual HTTP request.
89+
90+
Only allow up to ``_MAX_BATCH_SIZE`` requests to be deferred.
91+
92+
:type method: string
93+
:param method: The HTTP method to use in the request.
94+
95+
:type url: string
96+
:param url: The URL to send the request to.
97+
98+
:type headers: dict
99+
:param headers: A dictionary of HTTP headers to send with the request.
100+
101+
:type data: string
102+
:param data: The data to send as the body of the request.
103+
104+
:rtype: tuple of ``response`` (a dictionary of sorts)
105+
and ``content`` (a string).
106+
:returns: The HTTP response object and the content of the response.
107+
"""
108+
if method == 'GET':
109+
_req = self._connection.http.request
110+
return _req(method=method, uri=url, headers=headers, body=data)
111+
112+
if len(self._requests) >= self._MAX_BATCH_SIZE:
113+
raise ValueError("Too many deferred requests (max %d)" %
114+
self._MAX_BATCH_SIZE)
115+
self._requests.append((method, url, headers, data))
116+
return NoContent(), ''
117+
118+
def finish(self):
119+
"""Submit a single `multipart/mixed` request w/ deferred requests.
120+
121+
:rtype: list of tuples
122+
:returns: one ``(status, reason, payload)`` tuple per deferred request.
123+
:raises: ValueError if no requests have been deferred.
124+
"""
125+
if len(self._requests) == 0:
126+
raise ValueError("No deferred requests")
127+
128+
multi = MIMEMultipart()
129+
130+
for method, uri, headers, body in self._requests:
131+
subrequest = MIMEApplicationHTTP(method, uri, headers, body)
132+
multi.attach(subrequest)
133+
134+
# The `email` package expects to deal with "native" strings
135+
if six.PY3: # pragma: NO COVER Python3
136+
buf = io.StringIO()
137+
else: # pragma: NO COVER Python2
138+
buf = io.BytesIO()
139+
generator = Generator(buf, False, 0)
140+
generator.flatten(multi)
141+
payload = buf.getvalue()
142+
143+
# Strip off redundant header text
144+
_, body = payload.split('\n\n', 1)
145+
headers = dict(multi._headers)
146+
147+
url = '%s/batch' % self.API_BASE_URL
148+
149+
_req = self._connection._make_request
150+
response, content = _req('POST', url, data=body, headers=headers)
151+
self._responses = list(_unpack_batch_response(response, content))
152+
return self._responses
153+
154+
def __enter__(self):
155+
_BATCHES.push(self)
156+
return self
157+
158+
def __exit__(self, exc_type, exc_val, exc_tb):
159+
try:
160+
if exc_type is None:
161+
self.finish()
162+
finally:
163+
_BATCHES.pop()
164+
165+
166+
def _unpack_batch_response(response, content):
167+
"""Convert response, content -> [(status, reason, payload)]."""
168+
parser = Parser()
169+
faux_message = ('Content-Type: %s\nMIME-Version: 1.0\n\n%s' %
170+
(response['content-type'], content))
171+
172+
message = parser.parsestr(faux_message)
173+
174+
if not isinstance(message._payload, list):
175+
raise ValueError('Bad response: not multi-part')
176+
177+
for subrequest in message._payload:
178+
status_line, rest = subrequest._payload.split('\n', 1)
179+
_, status, reason = status_line.split(' ', 2)
180+
message = parser.parsestr(rest)
181+
payload = message._payload
182+
ctype = message['Content-Type']
183+
if ctype and ctype.startswith('application/json'):
184+
payload = json.loads(payload)
185+
yield status, reason, payload

gcloud/storage/connection.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ def _make_request(self, method, url, data=None, content_type=None,
149149
150150
:rtype: tuple of ``response`` (a dictionary of sorts)
151151
and ``content`` (a string).
152-
:returns: The HTTP response object and the content of the response.
152+
:returns: The HTTP response object and the content of the response,
153+
returned by :meth:`_do_request`.
153154
"""
154155
headers = headers or {}
155156
headers['Accept-Encoding'] = 'gzip'
@@ -166,6 +167,30 @@ def _make_request(self, method, url, data=None, content_type=None,
166167

167168
headers['User-Agent'] = self.USER_AGENT
168169

170+
return self._do_request(method, url, headers, data)
171+
172+
def _do_request(self, method, url, headers, data):
173+
"""Low-level helper: perform the actual API request over HTTP.
174+
175+
Allows :class:`gcloud.storage.batch.Batch` to override, deferring
176+
the request.
177+
178+
:type method: string
179+
:param method: The HTTP method to use in the request.
180+
181+
:type url: string
182+
:param url: The URL to send the request to.
183+
184+
:type headers: dict
185+
:param headers: A dictionary of HTTP headers to send with the request.
186+
187+
:type data: string
188+
:param data: The data to send as the body of the request.
189+
190+
:rtype: tuple of ``response`` (a dictionary of sorts)
191+
and ``content`` (a string).
192+
:returns: The HTTP response object and the content of the response.
193+
"""
169194
return self.http.request(uri=url, method=method, headers=headers,
170195
body=data)
171196

0 commit comments

Comments
 (0)