Skip to content

Commit c4b4493

Browse files
committed
1. Add support for file bulk indexing
2. optimize async logic
1 parent 25efc5a commit c4b4493

File tree

24 files changed

+348
-326
lines changed

24 files changed

+348
-326
lines changed

api/routers/knowledge.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ class FileInfo(BaseModel):
3030
file_size: int
3131
content_type: str
3232

33-
class Config:
34-
from_attributes = True
33+
model_config = {"from_attributes": True}
3534

3635

3736
class FileListResponse(BaseModel):
@@ -44,7 +43,7 @@ class FileListResponse(BaseModel):
4443
response_model=str,
4544
status_code=status.HTTP_201_CREATED,
4645
)
47-
def upload_file(
46+
async def upload_file(
4847
file: UploadFile,
4948
user: Annotated[User | None, Depends(get_current_user)],
5049
):
@@ -67,7 +66,7 @@ def upload_file(
6766
try:
6867
print(f"Uploading file: {file.filename} for owner_id: {user.id}")
6968
# Convert string UUID to UUID object
70-
doc_id = knowledge_handler.upload_file(file, user.id)
69+
doc_id = await knowledge_handler.upload_file(file, user.id)
7170
return doc_id
7271
except ValueError as e:
7372
raise HTTPException(
@@ -204,8 +203,6 @@ class IndexTriggerRequest(BaseModel):
204203

205204
class IndexTriggerResponse(BaseModel):
206205
"""Response model for index triggering results"""
207-
total_files: int
208-
status: str
209206
message: str
210207

211208
class GraphExportRequest(BaseModel):
@@ -220,7 +217,7 @@ class GraphExportRequest(BaseModel):
220217
response_model=IndexTriggerResponse,
221218
status_code=status.HTTP_200_OK,
222219
)
223-
def trigger_indexing(
220+
async def trigger_indexing(
224221
request: IndexTriggerRequest,
225222
user: Annotated[User | None, Depends(get_current_user)],
226223
):
@@ -247,12 +244,10 @@ def trigger_indexing(
247244
)
248245

249246
try:
250-
result = knowledge_handler.trigger_indexing(request.file_ids, user.id)
247+
result = await knowledge_handler.trigger_indexing(request.file_ids, user.id)
251248

252249
return IndexTriggerResponse(
253-
total_files=result.get('total_files', 0),
254-
status=result.get('status', 'indexing_started'),
255-
message=result.get('message', 'Indexing started in background')
250+
message=result
256251
)
257252

258253
except HTTPException:

application/knowledge/module.py

Lines changed: 74 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import uuid
1313
import asyncio
1414
from fastapi.responses import Response
15-
from fastapi import File, UploadFile, HTTPException
15+
from fastapi import UploadFile, HTTPException
1616
from encapsulation.data_model.orm_models import FileMetadata, FileStatus
1717

1818
class Knowledge(AbstractModule):
@@ -22,10 +22,9 @@ def __init__(self, config: 'KnowledgeConfig'):
2222
self.file_index = config.index_manager_config.build()
2323

2424
# Semaphore to control concurrent indexing operations
25-
max_concurrent_indexing = config.max_concurrent_indexing
26-
self.indexing_semaphore = asyncio.Semaphore(max_concurrent_indexing)
25+
self.indexing_semaphore = asyncio.Semaphore(config.max_concurrent_indexing)
2726

28-
def upload_file(self, file: UploadFile, user_id: uuid.UUID) -> str:
27+
async def upload_file(self, file: UploadFile, user_id: uuid.UUID) -> str:
2928
try:
3029
doc_id = self.file_storage.upload_file(
3130
filename=file.filename,
@@ -34,29 +33,17 @@ def upload_file(self, file: UploadFile, user_id: uuid.UUID) -> str:
3433
content_type=file.content_type
3534
)
3635
# Start indexing in background (fire-and-forget)
37-
self._start_background_indexing(doc_id)
36+
# execute file indexing without waiting for it to complete
37+
task = asyncio.create_task(self._index_file_background(doc_id))
38+
# Add error callback to log any unhandled exceptions
39+
task.add_done_callback(lambda t: logger.error(f"Background indexing task failed: {t.exception()}") if t.exception() else None)
3840
logger.info(f"File {file.filename} uploaded with ID {doc_id}, indexing started in background")
3941
return doc_id
4042

4143
except Exception as e:
4244
logger.error(e)
4345
raise
4446

45-
def _start_background_indexing(self, doc_id: str):
46-
"""Start background indexing task safely"""
47-
try:
48-
# Try to get the current event loop
49-
loop = asyncio.get_running_loop()
50-
# If we're in an async context, create the task
51-
loop.create_task(self._index_file_background(doc_id))
52-
except RuntimeError:
53-
# No event loop running, start a new one in a thread
54-
import threading
55-
def run_async():
56-
asyncio.run(self._index_file_background(doc_id))
57-
thread = threading.Thread(target=run_async, daemon=True)
58-
thread.start()
59-
6047
async def _index_file_background(self, doc_id: str):
6148
"""Background task for indexing files with semaphore control"""
6249
async with self.indexing_semaphore:
@@ -178,7 +165,7 @@ def count_user_files(
178165
logger.error(f"Failed to count files for user {user_id}: {e}")
179166
raise HTTPException(status_code=500, detail=f"Failed to count files: {str(e)}")
180167

181-
def trigger_indexing(self, file_ids: List[str], user_id: uuid.UUID) -> Dict[str, Any]:
168+
async def trigger_indexing(self, file_ids: List[str], user_id: uuid.UUID) -> str:
182169
"""
183170
Trigger indexing for multiple files asynchronously.
184171
@@ -187,87 +174,79 @@ def trigger_indexing(self, file_ids: List[str], user_id: uuid.UUID) -> Dict[str,
187174
user_id: UUID of the user requesting the indexing
188175
189176
Returns:
190-
Dictionary containing basic info about the triggered indexing
177+
String containing basic info about the triggered indexing or error message
191178
"""
192-
try:
193-
# Validate files and collect valid ones
194-
valid_file_ids = []
195-
invalid_files = []
196-
197-
for file_id in file_ids:
198-
try:
199-
metadata = self.file_storage.get_file_metadata(file_id)
200-
if metadata is None:
201-
invalid_files.append(f"File not found: {file_id}")
202-
continue
203-
if metadata.owner_id != user_id:
204-
invalid_files.append(f"You are not allowed to access file: {file_id}")
205-
continue
206-
valid_file_ids.append(file_id)
207-
except Exception as e:
208-
invalid_files.append(f"Error accessing file {file_id}: {str(e)}")
179+
# Validate files and collect those eligible for indexing
180+
# Only allow indexing of STORED or FAILED files
181+
# Skip files that are INDEXED, or in intermediate states (PARSED, CHUNKED) indicating processing is in progress
182+
valid_files = []
183+
invalid_files = []
184+
skipped_files = []
185+
186+
for file_id in file_ids:
187+
try:
188+
metadata = self.file_storage.get_file_metadata(file_id)
189+
if not metadata:
190+
invalid_files.append(f"File not found or invalid: {file_id}")
209191
continue
210-
211-
# If no valid files, return error
212-
if not valid_file_ids:
213-
raise HTTPException(
214-
status_code=400,
215-
detail=f"No valid files to index. Issues: {'; '.join(invalid_files)}"
216-
)
217-
218-
logger.info(f"Triggering async indexing for {len(valid_file_ids)} valid files (out of {len(file_ids)} requested) for user {user_id}")
219-
220-
# Start background indexing task for valid files only
221-
self._start_background_indexing_multiple_files(valid_file_ids, user_id)
222-
223-
# Return immediately with basic info
224-
message = f"Indexing started for {len(valid_file_ids)} files in background"
192+
if metadata.owner_id != user_id:
193+
invalid_files.append(f"You are not authorized to operate on this file: {file_id}")
194+
continue
195+
196+
# Only allow indexing for STORED or FAILED files
197+
# Skip files that are already indexed or in intermediate processing states
198+
if metadata.status == FileStatus.STORED or metadata.status == FileStatus.FAILED:
199+
valid_files.append(file_id)
200+
else:
201+
skipped_files.append(file_id)
202+
except Exception as e:
203+
invalid_files.append(file_id)
204+
logger.exception(f"Error accessing file {file_id}")
205+
continue
206+
207+
# If all files are invalid or already indexed/in progress, directly return
208+
if not valid_files:
209+
message_parts = []
225210
if invalid_files:
226-
message += f". Skipped {len(invalid_files)} invalid files: {'; '.join(invalid_files)}"
227-
228-
return {
229-
"total_files": len(file_ids), # Return original count for consistency with test expectations
230-
"status": "indexing_started",
231-
"message": message
232-
}
233-
234-
except HTTPException:
235-
# Re-raise HTTP exceptions (400, 403)
236-
raise
237-
except Exception as e:
238-
logger.error(f"Failed to trigger indexing for user {user_id}: {e}")
239-
raise HTTPException(status_code=500, detail=f"Failed to trigger indexing: {str(e)}")
211+
message_parts.append(f"Invalid files: {'; '.join(invalid_files)}")
212+
if skipped_files:
213+
message_parts.append(f"Skipped files (already indexed or in progress): {'; '.join(skipped_files)}")
214+
message_parts.append("No files scheduled for indexing.")
215+
return "\n".join(message_parts)
240216

241-
def _start_background_indexing_multiple_files(self, file_ids: List[str], user_id: uuid.UUID):
242-
"""Start background indexing task for multiple files safely"""
243-
try:
244-
# Try to get the current event loop
245-
loop = asyncio.get_running_loop()
246-
# If we're in an async context, create the task
247-
loop.create_task(self._index_multiple_files_background(file_ids, user_id))
248-
except RuntimeError:
249-
# No event loop running, start a new one in a thread
250-
import threading
251-
def run_async():
252-
asyncio.run(self._index_multiple_files_background(file_ids, user_id))
253-
thread = threading.Thread(target=run_async, daemon=True)
254-
thread.start()
217+
logger.info(
218+
f"Triggering indexing for files: {'; '.join(valid_files)}"
219+
)
220+
221+
# Start background indexing task for files not indexed yet only
222+
await self._index_multiple_files_background(valid_files, user_id)
223+
224+
# Return immediately with basic info
225+
message_parts = [
226+
f"Indexing started for files: {'; '.join(valid_files)}"
227+
]
228+
if skipped_files:
229+
message_parts.append(f"Skipped files (already indexed or in progress): {'; '.join(skipped_files)}")
230+
if invalid_files:
231+
message_parts.append(f"Invalid files: {'; '.join(invalid_files)}")
232+
233+
return "\n".join(message_parts)
255234

256235
async def _index_multiple_files_background(self, file_ids: List[str], user_id: uuid.UUID):
257236
"""Background task for indexing multiple files with semaphore control"""
258-
async with self.indexing_semaphore:
259-
try:
260-
logger.info(f"Starting background indexing for {len(file_ids)} files for user {user_id} (semaphore acquired)")
261-
262-
# Use the IndexManager's process_multiple_files method
263-
result = self.file_index.process_multiple_files(file_ids)
264-
265-
logger.info(f"Background indexing completed for user {user_id}: {result.get('successful_files', 0)} successful, {result.get('failed_files', 0)} failed out of {result.get('total_files', 0)} files")
266-
267-
except Exception as e:
268-
logger.error(f"Background indexing failed for user {user_id}: {str(e)}")
269-
finally:
270-
logger.debug(f"Background indexing semaphore released for user {user_id} ({len(file_ids)} files)")
237+
238+
try:
239+
logger.info(f"Starting background indexing for {len(file_ids)} files for user {user_id} (semaphore acquired)")
240+
241+
# Use the IndexManager's process_multiple_files method
242+
result = await self.file_index.process_multiple_files(file_ids)
243+
244+
logger.info(f"Background indexing completed for user {user_id}: {result.get('successful_files', 0)} successful, {result.get('failed_files', 0)} failed out of {result.get('total_files', 0)} files")
245+
246+
except Exception as e:
247+
logger.error(f"Background indexing failed for user {user_id}: {str(e)}")
248+
finally:
249+
logger.debug(f"Background indexing semaphore released for user {user_id} ({len(file_ids)} files)")
271250

272251
def get_indexing_status(self) -> Dict[str, Any]:
273252
"""

config/core/file_management/parser_combinator_config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,7 @@ class ParserCombinatorConfig(AbstractConfig):
5050
# Native parser for office documents and text files (optional)
5151
native_parser: Optional[NativeParserConfig] = None
5252

53+
concurrent_num: int = 20
54+
5355
def build(self) -> ParserCombinator:
5456
return ParserCombinator(self)

config/json_configs/knowledge.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{
2+
"max_concurrent_indexing": 20,
23
"file_storage_config": {
34
"type": "file_storage",
45
"file_db_config": {

core/file_management/extractor/base.py

Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class ExtractorBase(AbstractModule):
1818
def __init__(self, config):
1919
super().__init__(config)
2020
self.llm = config.llm_config.build()
21+
self.extraction_semaphore = asyncio.Semaphore(config.max_concurrent)
2122

2223
@abstractmethod
2324
async def extract(self, chunk: Chunk) -> GraphData:
@@ -33,45 +34,26 @@ async def extract(self, chunk: Chunk) -> GraphData:
3334

3435
async def process_chunk(self, chunk: Chunk) -> Chunk:
3536
"""process a single chunk"""
36-
try:
37-
graph_data = await self.extract(chunk)
38-
chunk.graph = graph_data
39-
return chunk
40-
except Exception as e:
41-
logger.error(f"Error processing chunk {chunk.id}: {e}", exc_info=True)
42-
chunk.graph = GraphData() # return empty graph data
43-
return chunk
37+
async with self.extraction_semaphore:
38+
try:
39+
graph_data = await self.extract(chunk)
40+
chunk.graph = graph_data
41+
return chunk
42+
except Exception as e:
43+
logger.error(f"Error processing chunk {chunk.id}: {e}", exc_info=True)
44+
chunk.graph = GraphData() # return empty graph data
45+
return chunk
4446

4547
async def extract_concurrent(self, chunks: List[Chunk]) -> List[Chunk]:
4648
"""extract from multiple chunks concurrently"""
4749
if not chunks:
4850
return []
4951

50-
semaphore = asyncio.Semaphore(self.config.max_concurrent)
5152
logger.info(f"Starting concurrent extraction with max_concurrent={self.config.max_concurrent}")
5253

53-
async def process_with_semaphore(chunk: Chunk) -> Chunk:
54-
async with semaphore:
55-
return await self.process_chunk(chunk)
54+
# process_chunk handles all exceptions internally, so we don't need return_exceptions=True
55+
tasks = [self.process_chunk(chunk) for chunk in chunks]
56+
return await asyncio.gather(*tasks)
5657

57-
return await asyncio.gather(*[process_with_semaphore(chunk) for chunk in chunks])
58-
59-
def __call__(self, chunks: List[Chunk]) -> List[Chunk]:
60-
"""sync interface that handles both sync and async contexts"""
61-
try:
62-
# Check if we're in an async context
63-
asyncio.get_running_loop()
64-
# If we're already in an event loop, create a new thread with its own event loop
65-
import concurrent.futures
66-
67-
# Create a new thread with its own event loop for concurrent processing
68-
def run_in_thread():
69-
return asyncio.run(self.extract_concurrent(chunks))
70-
71-
with concurrent.futures.ThreadPoolExecutor() as executor:
72-
future = executor.submit(run_in_thread)
73-
return future.result()
74-
75-
except RuntimeError:
76-
# No event loop running, safe to use asyncio.run
77-
return asyncio.run(self.extract_concurrent(chunks))
58+
async def __call__(self, chunks: List[Chunk]) -> List[Chunk]:
59+
return await self.extract_concurrent(chunks)

0 commit comments

Comments
 (0)