Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve azureai search deleting #14693

Merged
merged 3 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
VectorStoreQuery,
VectorStoreQueryMode,
VectorStoreQueryResult,
FilterOperator,
)
from llama_index.core.vector_stores.utils import (
legacy_metadata_dict_to_node,
Expand Down Expand Up @@ -827,20 +828,26 @@ def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
"""
# Locate documents to delete
filter = f'{self._field_mapping["doc_id"]} eq \'{ref_doc_id}\''
results = self._search_client.search(search_text="*", filter=filter)
batch_size = 1000

logger.debug(f"Searching with filter {filter}")
while True:
results = self._search_client.search(
search_text="*",
filter=filter,
top=batch_size,
)

docs_to_delete = []
for result in results:
doc = {}
doc["id"] = result[self._field_mapping["id"]]
logger.debug(f"Found document to delete: {doc}")
docs_to_delete.append(doc)
logger.debug(f"Searching with filter {filter}")

docs_to_delete = [
{"id": result[self._field_mapping["id"]]} for result in results
]

if len(docs_to_delete) > 0:
logger.debug(f"Deleting {len(docs_to_delete)} documents")
self._search_client.delete_documents(docs_to_delete)
if docs_to_delete:
logger.debug(f"Deleting {len(docs_to_delete)} documents")
self._search_client.delete_documents(docs_to_delete)
else:
break

async def adelete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
"""
Expand All @@ -849,22 +856,126 @@ async def adelete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
"""
# Locate documents to delete
filter = f'{self._field_mapping["doc_id"]} eq \'{ref_doc_id}\''
batch_size = 1000

while True:
results = await self._async_search_client.search(
search_text="*",
filter=filter,
top=batch_size,
)

results = await self._async_search_client.search(search_text="*", filter=filter)
logger.debug(f"Searching with filter {filter}")

logger.debug(f"Searching with filter {filter}")
docs_to_delete = [
{"id": result[self._field_mapping["id"]]} async for result in results
]

docs_to_delete = []
if docs_to_delete:
logger.debug(f"Deleting {len(docs_to_delete)} documents")
await self._async_search_client.delete_documents(docs_to_delete)
else:
break

for result in results:
doc = {}
doc["id"] = result[self._field_mapping["id"]]
logger.debug(f"Found document to delete: {doc}")
docs_to_delete.append(doc)

if len(docs_to_delete) > 0:
logger.debug(f"Deleting {len(docs_to_delete)} documents")
await self._search_client.delete_documents(docs_to_delete)
def delete_nodes(
self,
node_ids: Optional[List[str]] = None,
filters: Optional[MetadataFilters] = None,
**delete_kwargs: Any,
) -> None:
"""
Delete documents from the AI Search Index.
"""
if node_ids is None and filters is None:
raise ValueError("Either node_ids or filters must be provided")

filter = self._build_filter_delete_query(node_ids, filters)

batch_size = 1000

while True:
results = self._search_client.search(
search_text="*",
filter=filter,
top=batch_size,
)

logger.debug(f"Searching with filter {filter}")

docs_to_delete = [
{"id": result[self._field_mapping["id"]]} for result in results
]

if docs_to_delete:
logger.debug(f"Deleting {len(docs_to_delete)} documents")
self._search_client.delete_documents(docs_to_delete)
else:
break

async def adelete_nodes(
self,
node_ids: Optional[List[str]] = None,
filters: Optional[MetadataFilters] = None,
**delete_kwargs: Any,
) -> None:
"""
Delete documents from the AI Search Index.
"""
if node_ids is None and filters is None:
raise ValueError("Either node_ids or filters must be provided")

filter = self._build_filter_delete_query(node_ids, filters)

batch_size = 1000

while True:
results = await self._async_search_client.search(
search_text="*",
filter=filter,
top=batch_size,
)

logger.debug(f"Searching with filter {filter}")

docs_to_delete = [
{"id": result[self._field_mapping["id"]]} async for result in results
]

if docs_to_delete:
logger.debug(f"Deleting {len(docs_to_delete)} documents")
await self._async_search_client.delete_documents(docs_to_delete)
else:
break

def _build_filter_delete_query(
self,
node_ids: Optional[List[str]] = None,
filters: Optional[MetadataFilters] = None,
) -> str:
"""Build the OData filter query for the deletion process."""
if node_ids:
return " or ".join(
[
f'{self._field_mapping["id"]} eq \'{node_id}\''
for node_id in node_ids
]
)

if filters and filters.filters:
# Find the filter with key doc_ids
doc_ids_filter = next(
(f for f in filters.filters if f.key == "doc_id"), None
)
if doc_ids_filter and doc_ids_filter.operator == FilterOperator.IN:
# use search.in to filter on multiple values
doc_ids_str = ",".join(doc_ids_filter.value)
return (
f"search.in({self._field_mapping['doc_id']}, '{doc_ids_str}', ',')"
)

return self._create_odata_filter(filters)

raise ValueError("Invalid filter configuration")

def _create_odata_filter(self, metadata_filters: MetadataFilters) -> str:
"""Generate an OData filter string using supplied metadata filters."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ exclude = ["**/BUILD"]
license = "MIT"
name = "llama-index-vector-stores-azureaisearch"
readme = "README.md"
version = "0.1.9"
version = "0.1.10"

[tool.poetry.dependencies]
python = ">=3.8.1,<4.0"
Expand Down
Loading