Skip to content

Commit f46f678

Browse files
authored
Merge pull request #345 from scrapinghub/reservoir-sampling
Reservoir Sampling
2 parents 4a06cf8 + cae34b5 commit f46f678

11 files changed

+479
-274
lines changed

exporters/module_loader.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def load_stats_manager(self, options, metadata, **kwargs):
4949
return self._load_module(options, metadata, BaseStatsManager, **kwargs)
5050

5151
def load_write_buffer(self, options, metadata, **kwargs):
52-
from exporters.write_buffer import WriteBuffer
52+
from exporters.write_buffers.base import WriteBuffer
5353
return self._load_module(options, metadata, WriteBuffer, **kwargs)
5454

5555
def load_class(self, class_path):

exporters/write_buffer.py

-250
This file was deleted.

exporters/write_buffers/__init__.py

Whitespace-only changes.

exporters/write_buffers/base.py

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import os
2+
3+
from exporters.utils import remove_if_exists
4+
from exporters.pipeline.base_pipeline_item import BasePipelineItem
5+
from exporters.writers.filebase_base_writer import FilebasedGroupingBufferFilesTracker
6+
7+
from .utils import hash_for_file
8+
from .grouping import GroupingBufferFilesTracker
9+
10+
11+
class WriteBuffer(BasePipelineItem):
12+
13+
group_files_tracker_class = GroupingBufferFilesTracker
14+
filebased_group_files_tracker_class = FilebasedGroupingBufferFilesTracker
15+
supported_options = {
16+
}
17+
18+
def __init__(self, options, metadata, *args, **kwargs):
19+
super(WriteBuffer, self).__init__(options, metadata, *args, **kwargs)
20+
self.check_options()
21+
self.files = []
22+
self.items_per_buffer_write = kwargs['items_per_buffer_write']
23+
self.size_per_buffer_write = kwargs['size_per_buffer_write']
24+
self.hash_algorithm = kwargs.get('hash_algorithm')
25+
self.items_group_files = kwargs['items_group_files_handler']
26+
self.compression_format = kwargs.get('compression_format', 'gz')
27+
self.is_new_buffer = True
28+
29+
def buffer(self, item):
30+
"""
31+
Receive an item and write it.
32+
"""
33+
key = self.get_key_from_item(item)
34+
if not self.grouping_info.is_first_file_item(key):
35+
self.items_group_files.add_item_separator_to_file(key)
36+
self.grouping_info.ensure_group_info(key)
37+
self.items_group_files.add_item_to_file(item, key)
38+
39+
def finish_buffer_write(self, key):
40+
self.items_group_files.end_group_file(key)
41+
42+
def pack_buffer(self, key):
43+
"""Prepare current buffer file for group of given key to be written
44+
(by gathering statistics).
45+
"""
46+
self.finish_buffer_write(key)
47+
file_path = self.items_group_files.get_current_buffer_file_for_group(key).path
48+
file_hash = None
49+
if self.hash_algorithm:
50+
file_hash = hash_for_file(file_path, self.hash_algorithm)
51+
52+
file_size = os.path.getsize(file_path)
53+
write_info = {
54+
'number_of_records': self.grouping_info[key]['buffered_items'],
55+
'file_path': file_path,
56+
'size': file_size,
57+
'file_hash': file_hash,
58+
}
59+
self.set_metadata_for_file(file_path, **write_info)
60+
return write_info
61+
62+
def add_new_buffer_for_group(self, key):
63+
self.items_group_files.create_new_group_file(key)
64+
65+
def clean_tmp_files(self, write_info):
66+
remove_if_exists(write_info.get('path'))
67+
remove_if_exists(write_info.get('file_path'))
68+
69+
def should_write_buffer(self, key):
70+
if self.size_per_buffer_write and os.path.getsize(
71+
self.grouping_info[key]['group_file'][-1].path) >= self.size_per_buffer_write:
72+
return True
73+
buffered_items = self.grouping_info[key].get('buffered_items', 0)
74+
return buffered_items >= self.items_per_buffer_write
75+
76+
def close(self):
77+
self.items_group_files.close()
78+
79+
def get_key_from_item(self, item):
80+
return tuple(item.group_membership)
81+
82+
@property
83+
def grouping_info(self):
84+
return self.items_group_files.grouping_info
85+
86+
def set_metadata(self, key, value, module='write_buffer'):
87+
super(WriteBuffer, self).set_metadata(key, value, module)
88+
89+
def get_metadata(self, key, module='write_buffer'):
90+
return super(WriteBuffer, self).get_metadata(key, module) or {}
91+
92+
def get_all_metadata(self, module='write_buffer'):
93+
return super(WriteBuffer, self).get_all_metadata(module)
94+
95+
def set_metadata_for_file(self, file_name, **kwargs):
96+
if file_name not in self.get_all_metadata():
97+
self.set_metadata(file_name, kwargs)
98+
else:
99+
self.get_metadata(file_name).update(**kwargs)
100+
101+
def get_metadata_for_file(self, file_name, key):
102+
file_meta = self.get_metadata(file_name)
103+
return file_meta.get(key) if file_meta else None

0 commit comments

Comments
 (0)