Skip to content

Commit d377801

Browse files
committed
feat: Parallel loading for each mediaWiki documents batch!
1 parent 0948b55 commit d377801

File tree

1 file changed

+22
-4
lines changed

1 file changed

+22
-4
lines changed

hivemind_etl/mediawiki/etl.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import os
33
import shutil
4+
from concurrent.futures import ThreadPoolExecutor, as_completed
45

56
from llama_index.core import Document
67
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline
@@ -100,10 +101,27 @@ def load(self, documents: list[Document]) -> None:
100101
ingestion_pipeline = CustomIngestionPipeline(
101102
self.community_id, collection_name=self.platform_id
102103
)
103-
# batch documents into chunks of 1000
104-
for i in range(0, len(documents), 1000):
105-
logging.info(f"Loading batch {i} of {len(documents)} documents into Qdrant!")
106-
ingestion_pipeline.run_pipeline(documents[i : i + 1000])
104+
105+
# Process batches in parallel using ThreadPoolExecutor
106+
batch_size = 1000
107+
batches = [documents[i:i + batch_size] for i in range(0, len(documents), batch_size)]
108+
109+
with ThreadPoolExecutor() as executor:
110+
# Submit all batch processing tasks
111+
future_to_batch = {
112+
executor.submit(ingestion_pipeline.run_pipeline, batch): i
113+
for i, batch in enumerate(batches)
114+
}
115+
116+
# Process completed batches and handle any errors
117+
for future in as_completed(future_to_batch):
118+
batch_idx = future_to_batch[future]
119+
try:
120+
future.result() # This will raise any exceptions that occurred
121+
logging.info(f"Successfully loaded batch {batch_idx} of {len(batches)} documents into Qdrant!")
122+
except Exception as e:
123+
logging.error(f"Error processing batch {batch_idx}: {e}")
124+
raise # Re-raise the exception to stop the process
107125

108126
logging.info(f"Loaded {len(documents)} documents into Qdrant!")
109127

0 commit comments

Comments
 (0)