Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Progress Bar, Backoff, Batching #165

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open

New Progress Bar, Backoff, Batching #165

wants to merge 16 commits into from

Conversation

soldni
Copy link
Member

@soldni soldni commented May 23, 2024

This PR adds three nice features to BaseParallelProcessor:

  • Refactors progress bar out of parallel.py
  • Adds a PoolWithDebug wrapper around multiprocessing.Pool that transparently disables multiprocessing when debugging
  • Uses backoff library to implement backoff and retries in case of failure
  • Ability to create parallel processor that work in batch mode (will change tokenizer processor later to use this new functionality)

@soldni soldni requested review from undfined and kyleclo and removed request for undfined May 23, 2024 17:10
Copy link
Contributor

@undfined undfined left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks nice! A few questions, mostly for my understanding and curious if we can run the BPP without a progress bar? It's small but has some perf impact in aggregate. Also, seems not useful to report per "worker" when distributing compute.

@@ -2,4 +2,4 @@

PATH=/home/vscode/.cargo/bin:$PATH
cd dolma
source /home/vscode/miniforge3/bin/activate && pip install cmake "maturin[patchelf]>=1.1,<2.0"
source /home/vscode/miniforge3/bin/activate && pip install cmake "maturin>=1.5,<2.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏

@@ -30,6 +31,8 @@ dependencies = [
"numpy",
"necessary>=0.4.3",
"charset-normalizer>=3.2.0",
"zstandard>=0.20.0",
"backoff>=2.0.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this version required? There's 2 minor versions since this 2.0 release "2.2.1"

Comment on lines +261 to +263
def __radd__(self: BPP, other: BPP) -> BPP:
"""Combine two parallel processors into one."""
return other.__add__(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you describe when this is useful?

Comment on lines +278 to +280
"""Process multiple files. Naively calls process_single for each file, but can be overridden."""
for src_path, dst_path, single_kwargs in zip(source_paths, destination_paths, kwargs):
cls.process_single(source_path=src_path, destination_path=dst_path, queue=queue, **single_kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe include an example of overloading this processing method?

cls.get_logger().warning(message)

@classmethod
def _process_batch_and_save_status(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually saving status or just the metadata outcome?

len(all_process_kwargs),
)
# no need to be wasteful with processes: we only need as many cores a the number of batches
num_processes = min(self.num_processes, len(batches))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you have more batches than available procs/cores?

Args:
iterables (Iterable[T]): One or more iterables to group into batches.
batch_size (int): The size of each batch. Defaults to 1.
drop_last (bool): Whether to drop the last batch if it is smaller than `batch_size`. Defaults to False.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What scenario would you want to drop the last batch?

Comment on lines +190 to +191
if not hasattr(self, "PROGRESS_BAR_CLS"):
self.PROGRESS_BAR_CLS = BaseProgressBar.from_increment_function(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be run without a progress bar at all?

Comment on lines +89 to +127
class TestBatching(TestCase):
def test_batching(self):
a = [1, 2, 3, 4, 5]
b = [6, 7, 8, 9, 0]

output = list(batch_iterator(a, b, batch_size=2))
self.assertEqual(len(output), 3)
self.assertEqual(output[0], [(1, 2), (6, 7)])
self.assertEqual(output[1], [(3, 4), (8, 9)])
self.assertEqual(output[2], [(5,), (0,)])

def test_single_batching(self):
a = [1, 2, 3, 4, 5]

output = list(batch_iterator(a, batch_size=2))

self.assertEqual(len(output), 3)
self.assertEqual(output[0], [(1, 2)])
self.assertEqual(output[1], [(3, 4)])
self.assertEqual(output[2], [(5,)])

def test_longer_batch_than_slice(self):
a = list(range(3))
b = list(range(3, 6))
c = list(range(6, 9))

output = list(batch_iterator(a, b, c, batch_size=4))

self.assertEqual(len(output), 1)
self.assertEqual(output[0], [(0, 1, 2), (3, 4, 5), (6, 7, 8)])

def test_drop_last(self):
a = [1, 2, 3, 4, 5]
b = [6, 7, 8, 9, 0]

output = list(batch_iterator(a, b, batch_size=2, drop_last=True))
self.assertEqual(len(output), 2)
self.assertEqual(output[0], [(1, 2), (6, 7)])
self.assertEqual(output[1], [(3, 4), (8, 9)])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dolma_tests_skip = os.environ.get(DOLMA_TESTS_SKIP_AWS_ENV_VAR)
LOGGER.info(f"{DOLMA_TESTS_SKIP_AWS_ENV_VAR}: {dolma_tests_skip}")
return (dolma_tests_skip or "false").lower() == "true"
dolma_tests_skip = yaml.safe_load(os.environ.get(DOLMA_TESTS_SKIP_AWS_ENV_VAR) or "false")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

safe_load is duplicative if casting to bool() regardless below right? More a nit than anything, I guess it wouldn't hurt...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants