-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New Progress Bar, Backoff, Batching #165
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice! A few questions, mostly for my understanding and curious if we can run the BPP without a progress bar? It's small but has some perf impact in aggregate. Also, seems not useful to report per "worker" when distributing compute.
@@ -2,4 +2,4 @@ | |||
|
|||
PATH=/home/vscode/.cargo/bin:$PATH | |||
cd dolma | |||
source /home/vscode/miniforge3/bin/activate && pip install cmake "maturin[patchelf]>=1.1,<2.0" | |||
source /home/vscode/miniforge3/bin/activate && pip install cmake "maturin>=1.5,<2.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙏
@@ -30,6 +31,8 @@ dependencies = [ | |||
"numpy", | |||
"necessary>=0.4.3", | |||
"charset-normalizer>=3.2.0", | |||
"zstandard>=0.20.0", | |||
"backoff>=2.0.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this version required? There's 2 minor versions since this 2.0 release "2.2.1"
def __radd__(self: BPP, other: BPP) -> BPP: | ||
"""Combine two parallel processors into one.""" | ||
return other.__add__(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you describe when this is useful?
"""Process multiple files. Naively calls process_single for each file, but can be overridden.""" | ||
for src_path, dst_path, single_kwargs in zip(source_paths, destination_paths, kwargs): | ||
cls.process_single(source_path=src_path, destination_path=dst_path, queue=queue, **single_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe include an example of overloading this processing method?
cls.get_logger().warning(message) | ||
|
||
@classmethod | ||
def _process_batch_and_save_status( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this actually saving status or just the metadata outcome?
len(all_process_kwargs), | ||
) | ||
# no need to be wasteful with processes: we only need as many cores a the number of batches | ||
num_processes = min(self.num_processes, len(batches)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you have more batches than available procs/cores?
Args: | ||
iterables (Iterable[T]): One or more iterables to group into batches. | ||
batch_size (int): The size of each batch. Defaults to 1. | ||
drop_last (bool): Whether to drop the last batch if it is smaller than `batch_size`. Defaults to False. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What scenario would you want to drop the last batch?
if not hasattr(self, "PROGRESS_BAR_CLS"): | ||
self.PROGRESS_BAR_CLS = BaseProgressBar.from_increment_function(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be run without a progress bar at all?
class TestBatching(TestCase): | ||
def test_batching(self): | ||
a = [1, 2, 3, 4, 5] | ||
b = [6, 7, 8, 9, 0] | ||
|
||
output = list(batch_iterator(a, b, batch_size=2)) | ||
self.assertEqual(len(output), 3) | ||
self.assertEqual(output[0], [(1, 2), (6, 7)]) | ||
self.assertEqual(output[1], [(3, 4), (8, 9)]) | ||
self.assertEqual(output[2], [(5,), (0,)]) | ||
|
||
def test_single_batching(self): | ||
a = [1, 2, 3, 4, 5] | ||
|
||
output = list(batch_iterator(a, batch_size=2)) | ||
|
||
self.assertEqual(len(output), 3) | ||
self.assertEqual(output[0], [(1, 2)]) | ||
self.assertEqual(output[1], [(3, 4)]) | ||
self.assertEqual(output[2], [(5,)]) | ||
|
||
def test_longer_batch_than_slice(self): | ||
a = list(range(3)) | ||
b = list(range(3, 6)) | ||
c = list(range(6, 9)) | ||
|
||
output = list(batch_iterator(a, b, c, batch_size=4)) | ||
|
||
self.assertEqual(len(output), 1) | ||
self.assertEqual(output[0], [(0, 1, 2), (3, 4, 5), (6, 7, 8)]) | ||
|
||
def test_drop_last(self): | ||
a = [1, 2, 3, 4, 5] | ||
b = [6, 7, 8, 9, 0] | ||
|
||
output = list(batch_iterator(a, b, batch_size=2, drop_last=True)) | ||
self.assertEqual(len(output), 2) | ||
self.assertEqual(output[0], [(1, 2), (6, 7)]) | ||
self.assertEqual(output[1], [(3, 4), (8, 9)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⭐
dolma_tests_skip = os.environ.get(DOLMA_TESTS_SKIP_AWS_ENV_VAR) | ||
LOGGER.info(f"{DOLMA_TESTS_SKIP_AWS_ENV_VAR}: {dolma_tests_skip}") | ||
return (dolma_tests_skip or "false").lower() == "true" | ||
dolma_tests_skip = yaml.safe_load(os.environ.get(DOLMA_TESTS_SKIP_AWS_ENV_VAR) or "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
safe_load is duplicative if casting to bool() regardless below right? More a nit than anything, I guess it wouldn't hurt...
This PR adds three nice features to
BaseParallelProcessor
:parallel.py
PoolWithDebug
wrapper aroundmultiprocessing.Pool
that transparently disables multiprocessing when debuggingbackoff
library to implement backoff and retries in case of failure