Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from weaviate.collections import Collection
from weaviate.collections.classes.batch import ErrorReference
from weaviate.collections.classes.config import CollectionConfig, CollectionConfigSimple
from weaviate.collections.classes.filters import _Filters
from weaviate.collections.classes.internal import (
Object,
QueryReturnType,
Expand Down Expand Up @@ -191,6 +192,74 @@ def get_collection(self, name: str) -> Collection:
client = self.conn
return client.collections.get(name)

def delete_by_property(
self,
*,
collection_names: list[str] | str,
filter_criteria: _Filters,
if_error: str | None = None,
dry_run: bool = False,
verbose: bool = False,
) -> list[str] | None:
"""
Delete objects in collections using a provided Filter object. The maximum number of objects that can be deleted at once should be set through environment variable `QUERY_MAXIMUM_RESULTS`.

:param collection_names: The name(s) of the collection(s) to delete from.
:param filter_criteria: A `Filter` object defining the filter criteria for deletion.
:param if_error: define the actions to be taken if there is an error while deleting objects, possible
options are `None` and `continue`
:param dry_run: Use 'dry_run' to check how many objects would be deleted, without actually performing the deletion.
:param verbose: Set output to 'verbose' to see more details (ID and deletion status) for each deletion
:return: If `if_error="continue"`, returns list of failed collection names. Else, returns None.

Example:
>>> from weaviate.classes.query import Filter
>>> my_filter = (
>>> Filter.by_property("round").equal("Double Jeopardy!") &
>>> Filter.by_property("points").less_than(600)
>>> )
>>> delete_by_filter(
>>> collection_names=["collection_a", "collection_b"],
>>> filter_criteria=my_filter,
>>> if_error="continue"
>>> )
"""
collection_names = [collection_names] if isinstance(collection_names, str) else collection_names

failed_collection_list = []
for collection_name in collection_names:
try:
self.log.info("Attempting to delete objects from '%s'", collection_name)

for attempt in Retrying(
stop=stop_after_attempt(3),
retry=(
retry_if_exception(lambda exc: check_http_error_is_retryable(exc))
| retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
),
):
with attempt:
self.log.info(attempt)
collection = self.get_collection(collection_name)
delete_many_return = collection.data.delete_many(
where=filter_criteria, verbose=verbose, dry_run=dry_run
)
if dry_run:
self.log.info(delete_many_return)
except Exception as e:
# Capture generic exception to avoid missing any error, but we could anticipate the following errors:
# 1. weaviate.exceptions.UnexpectedStatusCodeException
# 2. weaviate.exceptions.WeaviateDeleteManyError
if if_error == "continue":
self.log.error(e)
failed_collection_list.append(collection_name)
else:
raise e

if if_error == "continue":
return failed_collection_list
return None

def delete_collections(
self, collection_names: list[str] | str, if_error: str = "stop"
) -> list[str] | None:
Expand Down
69 changes: 69 additions & 0 deletions providers/weaviate/tests/unit/weaviate/hooks/test_weaviate.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,75 @@ def test_batch_data_retry(weaviate_hook):
)


@mock.patch("airflow.providers.weaviate.hooks.weaviate.WeaviateHook.get_conn")
def test_delete_by_property_retry(get_conn, weaviate_hook):
from weaviate.classes.query import Filter

mock_collection = MagicMock()
weaviate_hook.get_collection = MagicMock(return_value=mock_collection)

get_conn.return_value.collections.get.return_value = mock_collection

response = requests.Response()
response.status_code = 429
error = requests.exceptions.HTTPError()
error.response = response
side_effect = [error, error, None]

mock_collection.data.delete_many.side_effect = side_effect

weaviate_hook.delete_by_property(
collection_names="collection_a",
filter_criteria=Filter.by_property("name").equal("John"),
if_error="continue",
)

assert mock_collection.data.delete_many.call_count == len(side_effect)


@mock.patch("airflow.providers.weaviate.hooks.weaviate.WeaviateHook.get_collection")
def test_delete_by_property_get_exception(mock_get_collection, weaviate_hook):
from weaviate.classes.query import Filter
from weaviate.exceptions import WeaviateDeleteManyError

collection_names = ["collection_a", "collection_b", "collection_c"]

mock_collection_b = MagicMock()
mock_collection_b.data.delete_many.return_value = None

mock_collection_c = MagicMock()
mock_collection_c.data.delete_many.return_value = None
mock_collection_c.data.delete_many.side_effect = WeaviateDeleteManyError(
"A delete many request to Weaviate fails in any way"
)

mock_get_collection.side_effect = [
weaviate.UnexpectedStatusCodeException("something failed", requests.Response()),
mock_collection_b,
mock_collection_c,
]

# Test when if_error='continue' – expect failed collections list
error_list = weaviate_hook.delete_by_property(
collection_names=collection_names,
filter_criteria=Filter.by_property("name").equal("John"),
if_error="continue",
)
assert error_list == ["collection_a", "collection_c"]

mock_get_collection.reset_mock()
mock_get_collection.side_effect = weaviate.UnexpectedStatusCodeException(
"something failed", requests.Response()
)

with pytest.raises(weaviate.UnexpectedStatusCodeException):
weaviate_hook.delete_by_property(
collection_names="collection_a",
filter_criteria=Filter.by_property("name").equal("John"),
if_error="stop",
)


@mock.patch("airflow.providers.weaviate.hooks.weaviate.WeaviateHook.get_conn")
def test_delete_collections(get_conn, weaviate_hook):
collection_names = ["collection_a", "collection_b"]
Expand Down