Skip to content

Conversation

Askir
Copy link
Contributor

@Askir Askir commented Jun 10, 2025

This doesn't make use of multiple CPU cores but it does allow to continue other vectorizers concurrently. If we push down e.g. processing time it would also allow us to stop somewhat gracefully (although you can't really kill a running future).

@Askir Askir temporarily deployed to internal-contributors June 10, 2025 15:25 — with GitHub Actions Inactive
@Askir Askir marked this pull request as ready for review June 10, 2025 16:08
@Askir Askir requested a review from a team as a code owner June 10, 2025 16:08
@smoya smoya requested a review from Copilot June 11, 2025 09:52
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR offloads document parsing to a thread pool to improve concurrency across vectorizers by making parsing methods asynchronous and running CPU-bound work off the event loop.

  • Changed parsing.parse interface to async and awaited it in the embedding generator.
  • Introduced a global ThreadPoolExecutor and updated parse_doc implementations to use run_in_executor.
  • Deferred parsing imports and wrapped blocking logic in synchronous helper methods.

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
projects/pgai/pgai/vectorizer/vectorizer/vectorizer.py Await the now-async parsing.parse call in _generate_embeddings.
projects/pgai/pgai/vectorizer/parsing.py Converted parse methods to async, added _PARSING_EXECUTOR, and offloaded blocking work to threads.
Comments suppressed due to low confidence (2)

projects/pgai/pgai/vectorizer/parsing.py:20

  • Please add or update unit tests to cover the new async parse methods and validate that blocking logic is correctly executed in the thread pool.
async def parse(self, _1: dict[str, Any], payload: str | LoadedDocument) -> str:

projects/pgai/pgai/vectorizer/vectorizer/vectorizer.py:1191

  • Since parse is now async, confirm all other callers of parsing.parse have been updated to await this method and update any related documentation to reflect this breaking change.
payload = await self.vectorizer.config.parsing.parse(item, payload)

from pgai.vectorizer.loading import LoadedDocument

# Thread pool for CPU-intensive parsing operations
_PARSING_EXECUTOR = ThreadPoolExecutor(max_workers=4, thread_name_prefix="parsing")
Copy link
Preview

Copilot AI Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Hardcoding max_workers=4 may not scale across environments—consider making this configurable or using os.cpu_count() to align with available cores.

Suggested change
_PARSING_EXECUTOR = ThreadPoolExecutor(max_workers=4, thread_name_prefix="parsing")
max_workers = int(os.getenv("PARSING_MAX_WORKERS", os.cpu_count() or 4))
_PARSING_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="parsing")

Copilot uses AI. Check for mistakes.

Copy link
Contributor

@smoya smoya Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's actually a good suggestion. Good bot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The os.cpu_count doesn't make a lot of sense since python is single threaded. This just configures how many documents can be parsed in parallel. But I'll make it configurable.

Copy link
Contributor

@alejandrodnm alejandrodnm Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the parsing is CPU bound why not use a ProcessPoolExecutor instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: python is not single threaded, it uses a lock to keep threads from executing in parallel. If you have IO bound tasks (reading files, network requests), the GIL is release so it gives concurrency benefits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay so more precisely python wont utilize more than 1 core to execute that parsing, I think the argument still stands. The cloud lambda also runs only with one core so multiprocessing instead of threading is just additional overhead. I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cloud lambda is configured to have 2560MiB of RAM, which according to this S/O post should correspond to two cores.

@Askir Askir temporarily deployed to internal-contributors June 17, 2025 15:47 — with GitHub Actions Inactive
@Askir Askir force-pushed the jascha/use-threadpool-executor branch from 5e54e3b to a997ed6 Compare June 17, 2025 16:04
@Askir Askir temporarily deployed to internal-contributors June 17, 2025 16:04 — with GitHub Actions Inactive
@Askir Askir requested a review from smoya June 17, 2025 16:13

# Thread pool for CPU-intensive parsing operations
max_workers = int(os.getenv("PARSING_MAX_WORKERS", 4))
_PARSING_EXECUTOR = ThreadPoolExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned here, but why not use a ProccessPoolExecutor?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants