Skip to content
This repository was archived by the owner on Sep 12, 2018. It is now read-only.

Commit 3bb1dee

Browse files
committed
Merge pull request #961 from Altiscale/chaiken_altiscale_issue_956
OPS-5907: remove ParallelKey class
2 parents 593a6c2 + cf1b018 commit 3bb1dee

File tree

1 file changed

+0
-88
lines changed
  • depends/docker-registry-core/docker_registry/core

1 file changed

+0
-88
lines changed

depends/docker-registry-core/docker_registry/core/boto.py

Lines changed: 0 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,8 @@
2929
import gevent.monkey
3030
gevent.monkey.patch_all()
3131

32-
import copy
3332
import logging
34-
import math
3533
import os
36-
import tempfile
3734

3835
from . import driver
3936
from . import lru
@@ -42,86 +39,6 @@
4239
logger = logging.getLogger(__name__)
4340

4441

45-
class ParallelKey(object):
46-
47-
"""This class implements parallel transfer on a key to improve speed."""
48-
49-
CONCURRENCY = 5
50-
51-
def __init__(self, key):
52-
logger.info('ParallelKey: {0}; size={1}'.format(key, key.size))
53-
self._boto_key = key
54-
self._cursor = 0
55-
self._max_completed_byte = 0
56-
self._max_completed_index = 0
57-
self._tmpfile = tempfile.NamedTemporaryFile(mode='rb')
58-
self._completed = [0] * self.CONCURRENCY
59-
self._spawn_jobs()
60-
61-
def __del__(self):
62-
self._tmpfile.close()
63-
64-
def _generate_bytes_ranges(self, num_parts):
65-
size = self._boto_key.size
66-
chunk_size = int(math.ceil(1.0 * size / num_parts))
67-
for i in range(num_parts):
68-
yield (i, chunk_size * i, min(chunk_size * (i + 1) - 1, size - 1))
69-
70-
def _fetch_part(self, fname, index, min_cur, max_cur):
71-
boto_key = copy.copy(self._boto_key)
72-
with open(fname, 'wb') as f:
73-
f.seek(min_cur)
74-
brange = 'bytes={0}-{1}'.format(min_cur, max_cur)
75-
boto_key.get_contents_to_file(f, headers={'Range': brange})
76-
boto_key.close()
77-
self._completed[index] = (index, max_cur)
78-
self._refresh_max_completed_byte()
79-
80-
def _spawn_jobs(self):
81-
bytes_ranges = self._generate_bytes_ranges(self.CONCURRENCY)
82-
for i, min_cur, max_cur in bytes_ranges:
83-
gevent.spawn(self._fetch_part, self._tmpfile.name,
84-
i, min_cur, max_cur)
85-
86-
def _refresh_max_completed_byte(self):
87-
for v in self._completed[self._max_completed_index:]:
88-
if v == 0:
89-
return
90-
self._max_completed_index = v[0]
91-
self._max_completed_byte = v[1]
92-
if self._max_completed_index >= len(self._completed) - 1:
93-
percent = round(
94-
(100.0 * self._cursor) / self._boto_key.size, 1)
95-
logger.info('ParallelKey: {0}; buffering complete at {1}% of '
96-
'the total transfer; now serving straight from '
97-
'the tempfile'.format(self._boto_key, percent))
98-
99-
def read(self, size):
100-
if self._cursor >= self._boto_key.size:
101-
# Read completed
102-
return ''
103-
sz = size
104-
if self._max_completed_index < len(self._completed) - 1:
105-
# Not all data arrived yet
106-
if self._cursor + size > self._max_completed_byte:
107-
while self._cursor >= self._max_completed_byte:
108-
# We're waiting for more data to arrive
109-
gevent.sleep(0.2)
110-
if self._cursor + sz > self._max_completed_byte:
111-
sz = self._max_completed_byte - self._cursor
112-
# Use a low-level read to avoid any buffering (makes sure we don't
113-
# read more than `sz' bytes).
114-
buf = os.read(self._tmpfile.file.fileno(), sz)
115-
self._cursor += len(buf)
116-
if not buf:
117-
message = ('ParallelKey: {0}; got en empty read on the buffer! '
118-
'cursor={1}, size={2}; Transfer interrupted.'.format(
119-
self._boto_key, self._cursor, self._boto_key.size))
120-
logging.error(message)
121-
raise RuntimeError(message)
122-
return buf
123-
124-
12542
class Base(driver.Base):
12643

12744
supports_bytes_range = True
@@ -174,11 +91,6 @@ def stream_read(self, path, bytes_range=None):
17491
key = self._boto_bucket.lookup(path, headers=headers)
17592
if not key:
17693
raise FileNotFoundError('%s is not there' % path)
177-
if not bytes_range and key.size > 1024 * 1024:
178-
# Use the parallel key only if the key size is > 1MB
179-
# And if bytes_range is not enabled (since ParallelKey is already
180-
# using bytes range)
181-
key = ParallelKey(key)
18294
while True:
18395
buf = key.read(self.buffer_size)
18496
if not buf:

0 commit comments

Comments
 (0)