-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: Removed Hard Dependency on Unstructured.io #123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,7 +7,7 @@ | |
| from app.schemas import DocumentsCreate, DocumentUpdate, DocumentRead | ||
| from app.users import current_active_user | ||
| from app.utils.check_ownership import check_ownership | ||
| from app.tasks.background_tasks import add_received_markdown_file_document, add_extension_received_document, add_received_file_document, add_crawled_url_document, add_youtube_video_document | ||
| from app.tasks.background_tasks import add_received_markdown_file_document, add_extension_received_document, add_received_file_document_using_unstructured, add_crawled_url_document, add_youtube_video_document, add_received_file_document_using_llamacloud | ||
| from app.config import config as app_config | ||
| # Force asyncio to use standard event loop before unstructured imports | ||
| import asyncio | ||
|
|
@@ -101,8 +101,7 @@ async def create_documents( | |
| content = await file.read() | ||
| with open(temp_path, "wb") as f: | ||
| f.write(content) | ||
|
|
||
| # Process in background to avoid uvloop conflicts | ||
|
|
||
| fastapi_background_tasks.add_task( | ||
| process_file_in_background_with_new_session, | ||
| temp_path, | ||
|
|
@@ -191,36 +190,74 @@ async def process_file_in_background( | |
| search_space_id | ||
| ) | ||
| else: | ||
| # Use synchronous unstructured API to avoid event loop issues | ||
| from langchain_unstructured import UnstructuredLoader | ||
|
|
||
| # Process the file | ||
| loader = UnstructuredLoader( | ||
| file_path, | ||
| mode="elements", | ||
| post_processors=[], | ||
| languages=["eng"], | ||
| include_orig_elements=False, | ||
| include_metadata=False, | ||
| strategy="auto", | ||
| ) | ||
| if app_config.ETL_SERVICE == "UNSTRUCTURED": | ||
| from langchain_unstructured import UnstructuredLoader | ||
|
Comment on lines
+194
to
+195
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Move imports to module level. Importing modules inside functions is an anti-pattern that affects performance and readability. Move all imports to the top of the file. Add these imports at the module level: from langchain_unstructured import UnstructuredLoader
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultTypeThen remove the redundant Also applies to: 210-210, 224-225, 241-241 🧰 Tools🪛 Pylint (3.3.7)[convention] 195-195: Trailing whitespace (C0303) [convention] 194-194: Import outside toplevel (langchain_unstructured.UnstructuredLoader) (C0415) 🤖 Prompt for AI Agents |
||
| # Process the file | ||
| loader = UnstructuredLoader( | ||
| file_path, | ||
| mode="elements", | ||
| post_processors=[], | ||
| languages=["eng"], | ||
| include_orig_elements=False, | ||
| include_metadata=False, | ||
| strategy="auto", | ||
| ) | ||
|
|
||
| docs = await loader.aload() | ||
| docs = await loader.aload() | ||
|
|
||
| # Clean up the temp file | ||
| import os | ||
| try: | ||
| os.unlink(file_path) | ||
| except: | ||
| pass | ||
|
|
||
| # Pass the documents to the existing background task | ||
| await add_received_file_document( | ||
| session, | ||
| filename, | ||
| docs, | ||
| search_space_id | ||
| ) | ||
| # Clean up the temp file | ||
| import os | ||
| try: | ||
| os.unlink(file_path) | ||
| except: | ||
| pass | ||
|
Comment on lines
+213
to
+214
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace bare except with specific exception handling. Using bare - except:
- pass
+ except (OSError, IOError):
+ pass # File might already be deletedAlso applies to: 244-245 🧰 Tools🪛 Ruff (0.11.9)211-214: Use Replace with (SIM105) 213-213: Do not use bare (E722) 🪛 Pylint (3.3.7)[warning] 213-214: No exception type(s) specified (W0702) 🤖 Prompt for AI Agents |
||
|
|
||
| # Pass the documents to the existing background task | ||
| await add_received_file_document_using_unstructured( | ||
| session, | ||
| filename, | ||
| docs, | ||
| search_space_id | ||
| ) | ||
| elif app_config.ETL_SERVICE == "LLAMACLOUD": | ||
| from llama_cloud_services import LlamaParse | ||
| from llama_cloud_services.parse.utils import ResultType | ||
|
|
||
|
|
||
| # Create LlamaParse parser instance | ||
| parser = LlamaParse( | ||
| api_key=app_config.LLAMA_CLOUD_API_KEY, | ||
| num_workers=1, # Use single worker for file processing | ||
| verbose=True, | ||
| language="en", | ||
| result_type=ResultType.MD | ||
| ) | ||
|
|
||
| # Parse the file asynchronously | ||
| result = await parser.aparse(file_path) | ||
|
|
||
| # Clean up the temp file | ||
| import os | ||
| try: | ||
| os.unlink(file_path) | ||
| except: | ||
| pass | ||
|
|
||
| # Get markdown documents from the result | ||
| markdown_documents = await result.aget_markdown_documents(split_by_page=False) | ||
|
|
||
| for doc in markdown_documents: | ||
| # Extract text content from the markdown documents | ||
| markdown_content = doc.text | ||
|
|
||
| # Process the documents using our LlamaCloud background task | ||
| await add_received_file_document_using_llamacloud( | ||
| session, | ||
| filename, | ||
| llamacloud_markdown_document=markdown_content, | ||
| search_space_id=search_space_id | ||
| ) | ||
| except Exception as e: | ||
| import logging | ||
| logging.error(f"Error processing file in background: {str(e)}") | ||
|
|
@@ -442,3 +479,5 @@ async def process_youtube_video_with_new_session( | |
| except Exception as e: | ||
| import logging | ||
| logging.error(f"Error processing YouTube video: {str(e)}") | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -268,7 +268,6 @@ async def add_received_markdown_file_document( | |||||||||
| document_type=DocumentType.FILE, | ||||||||||
| document_metadata={ | ||||||||||
| "FILE_NAME": file_name, | ||||||||||
| "SAVED_AT": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | ||||||||||
| }, | ||||||||||
| content=summary_content, | ||||||||||
| embedding=summary_embedding, | ||||||||||
|
|
@@ -289,7 +288,7 @@ async def add_received_markdown_file_document( | |||||||||
| raise RuntimeError(f"Failed to process file document: {str(e)}") | ||||||||||
|
|
||||||||||
|
|
||||||||||
| async def add_received_file_document( | ||||||||||
| async def add_received_file_document_using_unstructured( | ||||||||||
| session: AsyncSession, | ||||||||||
| file_name: str, | ||||||||||
| unstructured_processed_elements: List[LangChainDocument], | ||||||||||
|
|
@@ -336,7 +335,7 @@ async def add_received_file_document( | |||||||||
| document_type=DocumentType.FILE, | ||||||||||
| document_metadata={ | ||||||||||
| "FILE_NAME": file_name, | ||||||||||
| "SAVED_AT": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | ||||||||||
| "ETL_SERVICE": "UNSTRUCTURED", | ||||||||||
| }, | ||||||||||
| content=summary_content, | ||||||||||
| embedding=summary_embedding, | ||||||||||
|
|
@@ -357,6 +356,83 @@ async def add_received_file_document( | |||||||||
| raise RuntimeError(f"Failed to process file document: {str(e)}") | ||||||||||
|
|
||||||||||
|
|
||||||||||
| async def add_received_file_document_using_llamacloud( | ||||||||||
| session: AsyncSession, | ||||||||||
| file_name: str, | ||||||||||
| llamacloud_markdown_document: str, | ||||||||||
| search_space_id: int, | ||||||||||
| ) -> Optional[Document]: | ||||||||||
| """ | ||||||||||
| Process and store document content parsed by LlamaCloud. | ||||||||||
|
|
||||||||||
| Args: | ||||||||||
| session: Database session | ||||||||||
| file_name: Name of the processed file | ||||||||||
| llamacloud_markdown_documents: List of markdown content from LlamaCloud parsing | ||||||||||
| search_space_id: ID of the search space | ||||||||||
|
Comment on lines
+371
to
+372
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix docstring parameter description. The docstring incorrectly describes the parameter as a list when it's actually a single string. - llamacloud_markdown_documents: List of markdown content from LlamaCloud parsing
+ llamacloud_markdown_document: Markdown content from LlamaCloud parsing📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||
|
|
||||||||||
| Returns: | ||||||||||
| Document object if successful, None if failed | ||||||||||
| """ | ||||||||||
| try: | ||||||||||
| # Combine all markdown documents into one | ||||||||||
| file_in_markdown = llamacloud_markdown_document | ||||||||||
|
|
||||||||||
| content_hash = generate_content_hash(file_in_markdown) | ||||||||||
|
|
||||||||||
| # Check if document with this content hash already exists | ||||||||||
| existing_doc_result = await session.execute( | ||||||||||
| select(Document).where(Document.content_hash == content_hash) | ||||||||||
| ) | ||||||||||
| existing_document = existing_doc_result.scalars().first() | ||||||||||
|
|
||||||||||
| if existing_document: | ||||||||||
| logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.") | ||||||||||
| return existing_document | ||||||||||
|
|
||||||||||
| # Generate summary | ||||||||||
| summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance | ||||||||||
| summary_result = await summary_chain.ainvoke({"document": file_in_markdown}) | ||||||||||
| summary_content = summary_result.content | ||||||||||
| summary_embedding = config.embedding_model_instance.embed(summary_content) | ||||||||||
|
|
||||||||||
| # Process chunks | ||||||||||
| chunks = [ | ||||||||||
| Chunk( | ||||||||||
| content=chunk.text, | ||||||||||
| embedding=config.embedding_model_instance.embed(chunk.text), | ||||||||||
| ) | ||||||||||
| for chunk in config.chunker_instance.chunk(file_in_markdown) | ||||||||||
| ] | ||||||||||
|
|
||||||||||
| # Create and store document | ||||||||||
| document = Document( | ||||||||||
| search_space_id=search_space_id, | ||||||||||
| title=file_name, | ||||||||||
| document_type=DocumentType.FILE, | ||||||||||
| document_metadata={ | ||||||||||
| "FILE_NAME": file_name, | ||||||||||
| "ETL_SERVICE": "LLAMACLOUD", | ||||||||||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
| }, | ||||||||||
| content=summary_content, | ||||||||||
| embedding=summary_embedding, | ||||||||||
| chunks=chunks, | ||||||||||
| content_hash=content_hash, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| session.add(document) | ||||||||||
| await session.commit() | ||||||||||
| await session.refresh(document) | ||||||||||
|
|
||||||||||
| return document | ||||||||||
| except SQLAlchemyError as db_error: | ||||||||||
| await session.rollback() | ||||||||||
| raise db_error | ||||||||||
| except Exception as e: | ||||||||||
| await session.rollback() | ||||||||||
| raise RuntimeError(f"Failed to process file document using LlamaCloud: {str(e)}") | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use explicit exception chaining. When re-raising exceptions, use - raise RuntimeError(f"Failed to process file document using LlamaCloud: {str(e)}")
+ raise RuntimeError(f"Failed to process file document using LlamaCloud: {str(e)}") from e📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.11.9)434-434: Within an (B904) 🪛 Pylint (3.3.7)[warning] 434-434: Consider explicitly re-raising using 'raise RuntimeError(f'Failed to process file document using LlamaCloud: {str(e)}') from e' (W0707) 🤖 Prompt for AI Agents |
||||||||||
|
|
||||||||||
|
|
||||||||||
| async def add_youtube_video_document( | ||||||||||
| session: AsyncSession, url: str, search_space_id: int | ||||||||||
| ): | ||||||||||
|
|
||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add validation and error handling for ETL service configuration.
The conditional loading logic is correct, but consider adding validation to ensure robustness:
# ETL Service ETL_SERVICE = os.getenv("ETL_SERVICE") -if ETL_SERVICE == "UNSTRUCTURED": +if ETL_SERVICE == "UNSTRUCTURED": # Unstructured API Key UNSTRUCTURED_API_KEY = os.getenv("UNSTRUCTURED_API_KEY") - -elif ETL_SERVICE == "LLAMACLOUD": + if not UNSTRUCTURED_API_KEY: + raise ValueError("UNSTRUCTURED_API_KEY is required when ETL_SERVICE is set to 'UNSTRUCTURED'") + +elif ETL_SERVICE == "LLAMACLOUD": # LlamaCloud API Key LLAMA_CLOUD_API_KEY = os.getenv("LLAMA_CLOUD_API_KEY") - - + if not LLAMA_CLOUD_API_KEY: + raise ValueError("LLAMA_CLOUD_API_KEY is required when ETL_SERVICE is set to 'LLAMACLOUD'") + +elif ETL_SERVICE is not None: + raise ValueError(f"Invalid ETL_SERVICE value: '{ETL_SERVICE}'. Must be 'UNSTRUCTURED' or 'LLAMACLOUD'") +else: + raise ValueError("ETL_SERVICE environment variable is required")📝 Committable suggestion
🧰 Tools
🪛 Pylint (3.3.7)
[convention] 101-101: Trailing whitespace
(C0303)
[convention] 105-105: Trailing whitespace
(C0303)
[convention] 109-109: Trailing whitespace
(C0303)
🤖 Prompt for AI Agents