|
26 | 26 |
|
27 | 27 | """ |
28 | 28 |
|
29 | | -import gevent.monkey |
30 | | -gevent.monkey.patch_all() |
31 | | - |
32 | | -import copy |
33 | 29 | import logging |
34 | | -import math |
35 | 30 | import os |
36 | | -import tempfile |
37 | 31 |
|
38 | 32 | from . import driver |
39 | 33 | from . import lru |
|
42 | 36 | logger = logging.getLogger(__name__) |
43 | 37 |
|
44 | 38 |
|
45 | | -class ParallelKey(object): |
46 | | - |
47 | | - """This class implements parallel transfer on a key to improve speed.""" |
48 | | - |
49 | | - CONCURRENCY = 5 |
50 | | - |
51 | | - def __init__(self, key): |
52 | | - logger.info('ParallelKey: {0}; size={1}'.format(key, key.size)) |
53 | | - self._boto_key = key |
54 | | - self._cursor = 0 |
55 | | - self._max_completed_byte = 0 |
56 | | - self._max_completed_index = 0 |
57 | | - self._tmpfile = tempfile.NamedTemporaryFile(mode='rb') |
58 | | - self._completed = [0] * self.CONCURRENCY |
59 | | - self._spawn_jobs() |
60 | | - |
61 | | - def __del__(self): |
62 | | - self._tmpfile.close() |
63 | | - |
64 | | - def _generate_bytes_ranges(self, num_parts): |
65 | | - size = self._boto_key.size |
66 | | - chunk_size = int(math.ceil(1.0 * size / num_parts)) |
67 | | - for i in range(num_parts): |
68 | | - yield (i, chunk_size * i, min(chunk_size * (i + 1) - 1, size - 1)) |
69 | | - |
70 | | - def _fetch_part(self, fname, index, min_cur, max_cur): |
71 | | - boto_key = copy.copy(self._boto_key) |
72 | | - with open(fname, 'wb') as f: |
73 | | - f.seek(min_cur) |
74 | | - brange = 'bytes={0}-{1}'.format(min_cur, max_cur) |
75 | | - boto_key.get_contents_to_file(f, headers={'Range': brange}) |
76 | | - boto_key.close() |
77 | | - self._completed[index] = (index, max_cur) |
78 | | - self._refresh_max_completed_byte() |
79 | | - |
80 | | - def _spawn_jobs(self): |
81 | | - bytes_ranges = self._generate_bytes_ranges(self.CONCURRENCY) |
82 | | - for i, min_cur, max_cur in bytes_ranges: |
83 | | - gevent.spawn(self._fetch_part, self._tmpfile.name, |
84 | | - i, min_cur, max_cur) |
85 | | - |
86 | | - def _refresh_max_completed_byte(self): |
87 | | - for v in self._completed[self._max_completed_index:]: |
88 | | - if v == 0: |
89 | | - return |
90 | | - self._max_completed_index = v[0] |
91 | | - self._max_completed_byte = v[1] |
92 | | - if self._max_completed_index >= len(self._completed) - 1: |
93 | | - percent = round( |
94 | | - (100.0 * self._cursor) / self._boto_key.size, 1) |
95 | | - logger.info('ParallelKey: {0}; buffering complete at {1}% of ' |
96 | | - 'the total transfer; now serving straight from ' |
97 | | - 'the tempfile'.format(self._boto_key, percent)) |
98 | | - |
99 | | - def read(self, size): |
100 | | - if self._cursor >= self._boto_key.size: |
101 | | - # Read completed |
102 | | - return '' |
103 | | - sz = size |
104 | | - if self._max_completed_index < len(self._completed) - 1: |
105 | | - # Not all data arrived yet |
106 | | - if self._cursor + size > self._max_completed_byte: |
107 | | - while self._cursor >= self._max_completed_byte: |
108 | | - # We're waiting for more data to arrive |
109 | | - gevent.sleep(0.2) |
110 | | - if self._cursor + sz > self._max_completed_byte: |
111 | | - sz = self._max_completed_byte - self._cursor |
112 | | - # Use a low-level read to avoid any buffering (makes sure we don't |
113 | | - # read more than `sz' bytes). |
114 | | - buf = os.read(self._tmpfile.file.fileno(), sz) |
115 | | - self._cursor += len(buf) |
116 | | - if not buf: |
117 | | - message = ('ParallelKey: {0}; got en empty read on the buffer! ' |
118 | | - 'cursor={1}, size={2}; Transfer interrupted.'.format( |
119 | | - self._boto_key, self._cursor, self._boto_key.size)) |
120 | | - logging.error(message) |
121 | | - raise RuntimeError(message) |
122 | | - return buf |
123 | | - |
124 | | - |
125 | 39 | class Base(driver.Base): |
126 | 40 |
|
127 | 41 | supports_bytes_range = True |
@@ -174,11 +88,6 @@ def stream_read(self, path, bytes_range=None): |
174 | 88 | key = self._boto_bucket.lookup(path, headers=headers) |
175 | 89 | if not key: |
176 | 90 | raise FileNotFoundError('%s is not there' % path) |
177 | | - if not bytes_range and key.size > 1024 * 1024: |
178 | | - # Use the parallel key only if the key size is > 1MB |
179 | | - # And if bytes_range is not enabled (since ParallelKey is already |
180 | | - # using bytes range) |
181 | | - key = ParallelKey(key) |
182 | 91 | while True: |
183 | 92 | buf = key.read(self.buffer_size) |
184 | 93 | if not buf: |
|
0 commit comments