Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions context_chat_backend/chain/ingest/injest.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def _process_sources(
'len(filtered_sources)': len(filtered_sources),
'filtered_sources': filtered_sources,
})
loaded_source_ids = [source.filename for source in existing_sources]

# update userIds for existing sources
# allow the userIds as additional users, not as the only users
Expand All @@ -138,23 +139,26 @@ def _process_sources(
if len(filtered_sources) == 0:
# no new sources to embed
logger.debug('Filtered all sources, nothing to embed')
return [], []
return loaded_source_ids, [] # pyright: ignore[reportReturnType]

logger.debug('Filtered sources:', extra={
'source_ids': [source.filename for source in filtered_sources]
})
# invalid/empty sources are filtered out here and not counted in loaded/retryable
indocuments = _sources_to_indocuments(config, filtered_sources)

logger.debug('Converted all sources to documents')

if len(indocuments) == 0:
# document(s) were empty, not an error
# filtered document(s) were invalid/empty, not an error
logger.debug('All documents were found empty after being processed')
return [], []
return loaded_source_ids, [] # pyright: ignore[reportReturnType]

added_sources, not_added_sources = vectordb.add_indocuments(indocuments)
added_source_ids, retry_source_ids = vectordb.add_indocuments(indocuments)
loaded_source_ids.extend(added_source_ids)
logger.debug('Added documents to vectordb')
return added_sources, not_added_sources

return loaded_source_ids, retry_source_ids # pyright: ignore[reportReturnType]


def _decode_latin_1(s: str) -> str:
Expand Down
11 changes: 6 additions & 5 deletions context_chat_backend/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def _(sources: list[UploadFile]):
_indexing[source.filename] = source.size

try:
added_sources, not_added_sources = exec_in_proc(
loaded_sources, not_added_sources = exec_in_proc(
target=embed_sources,
args=(vectordb_loader, app.extra['CONFIG'], sources)
)
Expand All @@ -388,13 +388,14 @@ def _(sources: list[UploadFile]):
_indexing.pop(source.filename, None)
doc_parse_semaphore.release()

if len(added_sources) != len(sources):
if len(loaded_sources) != len(sources):
logger.debug('Some sources were not loaded', extra={
'Count of newly loaded sources': f'{len(added_sources)}/{len(sources)}',
'source_ids': added_sources,
'Count of loaded sources': f'{len(loaded_sources)}/{len(sources)}',
'source_ids': loaded_sources,
})

return JSONResponse({'loaded_sources': added_sources, 'sources_to_retry': not_added_sources})
# loaded sources include the existing sources that may only have their access updated
return JSONResponse({'loaded_sources': loaded_sources, 'sources_to_retry': not_added_sources})


class Query(BaseModel):
Expand Down
11 changes: 8 additions & 3 deletions context_chat_backend/vectordb/pgvector.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def get_users(self) -> list[str]:

def add_indocuments(self, indocuments: list[InDocument]) -> tuple[list[str], list[str]]:
added_sources = []
not_added_sources = []
retry_sources = []
batch_size = PG_BATCH_SIZE // 5

with self.session_maker() as session:
Expand All @@ -155,24 +155,29 @@ def add_indocuments(self, indocuments: list[InDocument]) -> tuple[list[str], lis

self.decl_update_access(indoc.userIds, indoc.source_id, session)
added_sources.append(indoc.source_id)
session.commit()
except SafeDbException as e:
# for when the source_id is not found. This here can be an error in the DB
# and the source should be retried later
logger.exception('Error adding documents to vectordb', exc_info=e, extra={
'source_id': indoc.source_id,
})
retry_sources.append(indoc.source_id)
continue
except EmbeddingException as e:
logger.exception('Error adding documents to vectordb', exc_info=e, extra={
'source_id': indoc.source_id,
})
not_added_sources.append(indoc.source_id)
retry_sources.append(indoc.source_id)
continue
except Exception as e:
logger.exception('Error adding documents to vectordb', exc_info=e, extra={
'source_id': indoc.source_id,
})
retry_sources.append(indoc.source_id)
continue

return added_sources, not_added_sources
return added_sources, retry_sources

@timed
def check_sources(self, sources: list[UploadFile]) -> tuple[list[str], list[str]]:
Expand Down
Loading