-
Notifications
You must be signed in to change notification settings - Fork 282
chore: parse in threadpool instead of on event loop #802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR offloads document parsing to a thread pool to improve concurrency across vectorizers by making parsing methods asynchronous and running CPU-bound work off the event loop.
- Changed
parsing.parse
interface toasync
and awaited it in the embedding generator. - Introduced a global
ThreadPoolExecutor
and updatedparse_doc
implementations to userun_in_executor
. - Deferred parsing imports and wrapped blocking logic in synchronous helper methods.
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
File | Description |
---|---|
projects/pgai/pgai/vectorizer/vectorizer/vectorizer.py | Await the now-async parsing.parse call in _generate_embeddings . |
projects/pgai/pgai/vectorizer/parsing.py | Converted parse methods to async , added _PARSING_EXECUTOR , and offloaded blocking work to threads. |
Comments suppressed due to low confidence (2)
projects/pgai/pgai/vectorizer/parsing.py:20
- Please add or update unit tests to cover the new async
parse
methods and validate that blocking logic is correctly executed in the thread pool.
async def parse(self, _1: dict[str, Any], payload: str | LoadedDocument) -> str:
projects/pgai/pgai/vectorizer/vectorizer/vectorizer.py:1191
- Since
parse
is now async, confirm all other callers ofparsing.parse
have been updated toawait
this method and update any related documentation to reflect this breaking change.
payload = await self.vectorizer.config.parsing.parse(item, payload)
from pgai.vectorizer.loading import LoadedDocument | ||
|
||
# Thread pool for CPU-intensive parsing operations | ||
_PARSING_EXECUTOR = ThreadPoolExecutor(max_workers=4, thread_name_prefix="parsing") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Hardcoding max_workers=4
may not scale across environments—consider making this configurable or using os.cpu_count()
to align with available cores.
_PARSING_EXECUTOR = ThreadPoolExecutor(max_workers=4, thread_name_prefix="parsing") | |
max_workers = int(os.getenv("PARSING_MAX_WORKERS", os.cpu_count() or 4)) | |
_PARSING_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="parsing") |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's actually a good suggestion. Good bot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The os.cpu_count doesn't make a lot of sense since python is single threaded. This just configures how many documents can be parsed in parallel. But I'll make it configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the parsing is CPU bound why not use a ProcessPoolExecutor instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: python is not single threaded, it uses a lock to keep threads from executing in parallel. If you have IO bound tasks (reading files, network requests), the GIL is release so it gives concurrency benefits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay so more precisely python wont utilize more than 1 core to execute that parsing, I think the argument still stands. The cloud lambda also runs only with one core so multiprocessing instead of threading is just additional overhead. I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cloud lambda is configured to have 2560MiB of RAM, which according to this S/O post should correspond to two cores.
5e54e3b
to
a997ed6
Compare
|
||
# Thread pool for CPU-intensive parsing operations | ||
max_workers = int(os.getenv("PARSING_MAX_WORKERS", 4)) | ||
_PARSING_EXECUTOR = ThreadPoolExecutor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned here, but why not use a ProccessPoolExecutor
?
This doesn't make use of multiple CPU cores but it does allow to continue other vectorizers concurrently. If we push down e.g. processing time it would also allow us to stop somewhat gracefully (although you can't really kill a running future).