Skip to content

feat: add batch delete documents support to API, SDK, CLI, and tests #151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from core.models.prompts import validate_prompt_overrides_with_http_exception
from core.models.request import (
AgentQueryRequest,
BatchDeleteRequest,
BatchIngestResponse,
CompletionQueryRequest,
CreateGraphRequest,
Expand Down Expand Up @@ -1058,6 +1059,42 @@ async def delete_document(document_id: str, auth: AuthContext = Depends(verify_t
raise HTTPException(status_code=403, detail=str(e))


MAX_BATCH_DELETE = 100


@app.post("/documents/batch_delete")
@telemetry.track(operation_type="batch_delete_documents", metadata_resolver=telemetry.document_delete_metadata)
async def batch_delete_documents(request: BatchDeleteRequest, auth: AuthContext = Depends(verify_token)):
"""
Batch delete documents by their IDs.

Args:
request: List of document IDs in JSON.
auth: AuthContext

Returns:
Dict: Status and count of deleted documents
"""
document_ids = request.document_ids
if not document_ids:
raise HTTPException(status_code=400, detail="No document IDs provided for deletion")
if len(document_ids) > MAX_BATCH_DELETE:
raise HTTPException(status_code=400, detail=f"Batch size exceeds maximum limit of {MAX_BATCH_DELETE}")

try:
success, failed = await document_service.delete_documents(request.document_ids, auth)
return {
"status": "partial_success" if failed else "success",
"deleted_count": len(success),
"failed_count": len(failed),
"deleted_ids": success,
"failed_ids": failed,
}
except Exception as e:
logger.error(f"Batch deletion failed: {e}")
raise HTTPException(status_code=500, detail="Batch deletion failed")


@app.get("/documents/filename/{filename}", response_model=Document)
async def get_document_by_filename(
filename: str,
Expand Down Expand Up @@ -2050,8 +2087,7 @@ async def set_folder_rule(
except Exception as rule_apply_error:
last_error = rule_apply_error
logger.warning(
f"Metadata extraction attempt {retry_count + 1} failed: "
f"{rule_apply_error}"
f"Metadata extraction attempt {retry_count + 1} failed: {rule_apply_error}"
)
if retry_count == max_retries - 1: # Last attempt
logger.error(f"All {max_retries} metadata extraction attempts failed")
Expand Down
9 changes: 9 additions & 0 deletions core/models/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,12 @@ class AgentQueryRequest(BaseModel):
"""Request model for agent queries"""

query: str = Field(..., description="Natural language query for the Morphik agent")


class BatchDeleteRequest(BaseModel):
"""Request model for delete batch documents"""

document_ids: List[str] = Field(
...,
description="List of document IDs to be deleted. Must be a list of strings.",
)
35 changes: 32 additions & 3 deletions core/services/document_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ async def retrieve_chunks(
chunks = await self.reranker.rerank(query, chunks)
chunks.sort(key=lambda x: x.score, reverse=True)
chunks = chunks[:k]
logger.debug(f"Reranked {k*10} chunks and selected the top {k}")
logger.debug(f"Reranked {k * 10} chunks and selected the top {k}")

# Combine multiple chunk sources if needed
chunks = await self._combine_multi_and_regular_chunks(
Expand Down Expand Up @@ -852,7 +852,7 @@ async def ingest_file_content(
if folder_name:
try:
await self._ensure_folder_exists(folder_name, doc.external_id, auth)
logger.debug(f"Ensured folder '{folder_name}' exists " f"and contains document {doc.external_id}")
logger.debug(f"Ensured folder '{folder_name}' exists and contains document {doc.external_id}")
except Exception as e:
logger.error(
f"Error during _ensure_folder_exists for doc {doc.external_id}"
Expand Down Expand Up @@ -1210,7 +1210,7 @@ async def store_document_with_retry():
current_retry_delay *= 2
else:
logger.error(
f"All database connection attempts failed " f"after {max_retries} retries: {error_msg}"
f"All database connection attempts failed after {max_retries} retries: {error_msg}"
)
raise Exception("Failed to store document metadata after multiple retries")
else:
Expand Down Expand Up @@ -2036,6 +2036,35 @@ def _update_metadata_and_version(

history.append(entry)

async def delete_documents(
self,
document_ids: List[str],
auth: AuthContext,
) -> tuple[list[str | None], list[str | None]]:
"""
Batch delete documents and their associated data.

Args:
document_ids: List of document IDs to delete.
auth: Authentication context.

Returns:
tuple: A tuple containing two lists:
- List of successfully deleted document IDs.
- List of failed document IDs.
"""
success = []
failed = []
for doc_id in document_ids:
try:
success = await self.delete_document(doc_id, auth)
if success:
success.append(doc_id)
except Exception as e:
logger.warning(f"Failed to delete document {doc_id}: {e}")
failed.append(doc_id)
return success, failed

# ------------------------------------------------------------------
# Helper – choose bucket per app (isolation)
# ------------------------------------------------------------------
Expand Down
14 changes: 13 additions & 1 deletion core/services/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def export(self, spans):
# Use exponential backoff
delay = self.retry_delay * (2 ** (retries - 1))
self.logger.warning(
f"Honeycomb trace export attempt {retries} failed: {str(e)}. " f"Retrying in {delay}s..."
f"Honeycomb trace export attempt {retries} failed: {str(e)}. Retrying in {delay}s..."
)
time.sleep(delay)
else:
Expand Down Expand Up @@ -809,6 +809,18 @@ def _setup_metadata_extractors(self):
]
)

self.batch_document_delete_metadata = MetadataExtractor(
[
MetadataField(
"document_count",
"request",
transform=lambda req: len(req.document_ids) if req and hasattr(req, "document_ids") else 0,
),
MetadataField("folder_name", "request"),
MetadataField("end_user_id", "request"),
]
)

def track(self, operation_type: Optional[str] = None, metadata_resolver: Optional[Callable] = None):
"""
Decorator for tracking API operations with telemetry.
Expand Down
5 changes: 5 additions & 0 deletions sdks/python/morphik/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -2540,3 +2540,8 @@ async def wait_for_graph_completion(
raise RuntimeError(graph.error or "Graph processing failed")
await asyncio.sleep(check_interval_seconds)
raise TimeoutError("Timed out waiting for graph completion")

async def batch_delete_documents(self, document_ids: List[str]) -> int:
"""Delete multiple documents by their IDs (async)."""
response = await self._request("POST", "documents/batch_delete", data={"document_ids": document_ids})
return response["deleted"]
5 changes: 5 additions & 0 deletions sdks/python/morphik/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2735,3 +2735,8 @@ def wait_for_graph_completion(
raise RuntimeError(graph.error or "Graph processing failed")
time.sleep(check_interval_seconds)
raise TimeoutError("Timed out waiting for graph completion")

def batch_delete_documents(self, document_ids: List[str]) -> int:
"""Delete multiple documents by their IDs."""
response = self._request("POST", "documents/batch_delete", data={"document_ids": document_ids})
return response["deleted"]
39 changes: 39 additions & 0 deletions sdks/python/morphik/tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,42 @@ async def test_query_with_dict_schema(self, db):

finally:
await db.delete_document(doc.external_id)

@pytest.mark.asyncio
async def test_batch_delete_documents(self, db):
"""Test batch deleting multiple documents"""
# Given
doc1 = await db.ingest_text(
content="This is batch delete document 1",
filename=f"batch_del_1_{uuid.uuid4().hex[:6]}.txt",
metadata={"test_case": "batch_delete"},
)
doc2 = await db.ingest_text(
content="This is batch delete document 2",
filename=f"batch_del_2_{uuid.uuid4().hex[:6]}.txt",
metadata={"test_case": "batch_delete"},
)

# Then
assert doc1.external_id and doc2.external_id

# When
success, failed = await db.batch_delete_documents([doc1.external_id, doc2.external_id])

# Then
assert len(success) == 2
assert len(failed) == 0

@pytest.mark.asyncio
async def test_batch_delete_with_invalid_id(self, db):
doc = await db.ingest_text(
content="This is batch delete document 1",
filename=f"batch_del_1_{uuid.uuid4().hex[:6]}.txt",
metadata={"test_case": "batch_delete"},
)

# Include a fake ID
success, failed = db.batch_delete_documents([doc.external_id, "nonexistent_id_123"])

assert doc.external_id in success
assert "nonexistent_id_123" in failed
38 changes: 38 additions & 0 deletions sdks/python/morphik/tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,41 @@ def test_query_with_dict_schema(self, db):

finally:
db.delete_document(doc.external_id)

def test_batch_delete_documents(self, db):
"""Test batch deleting multiple documents (sync)"""
# Given
doc1 = db.ingest_text(
content="First document to test batch delete",
filename=f"batch_delete_1_{uuid.uuid4().hex[:6]}.txt",
metadata={"test_id": "sync_batch_delete_test"},
)
doc2 = db.ingest_text(
content="Second document to test batch delete",
filename=f"batch_delete_2_{uuid.uuid4().hex[:6]}.txt",
metadata={"test_id": "sync_batch_delete_test"},
)

# Then
assert doc1.external_id is not None
assert doc2.external_id is not None

# When
success, failed = db.batch_delete_documents([doc1.external_id, doc2.external_id])

# Then
assert len(success) == 2
assert len(failed) == 0

def test_batch_delete_with_invalid_id(self, db):
doc = db.ingest_text(
content="Valid document",
filename=f"batch_delete_valid_{uuid.uuid4().hex[:6]}.txt",
metadata={"test_id": "partial_batch_delete_test"},
)

# Include a fake ID
success, failed = db.batch_delete_documents([doc.external_id, "nonexistent_id_123"])

assert doc.external_id in success
assert "nonexistent_id_123" in failed
13 changes: 13 additions & 0 deletions shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,18 @@ def list_graphs(self) -> list:
graphs = self._client.list_graphs()
return [graph.model_dump() for graph in graphs] if graphs else []

def batch_delete_documents(self, document_ids: List[str]) -> dict:
"""
Delete multiple documents in a single batch.

Args:
document_ids: List of document external_ids to delete

Returns:
dict: Deletion result from the server
"""
return self._client.batch_delete_documents(document_ids)

def close(self):
"""Close the client connection"""
self._client.close()
Expand Down Expand Up @@ -899,6 +911,7 @@ def query(self, query: str, max_tokens: int = None, temperature: float = None) -
print(" db.create_graph('knowledge_graph', filters={'category': 'research'})")
print(" db.query('How does X relate to Y?', graph_name='knowledge_graph', include_paths=True)")
print("Type help(db) for documentation.")
print(" db.batch_delete_documents(['doc_id1', 'doc_id2'])")

# Start the shell
shell.interact(banner="")