Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Vector Database Service to allow stages to read and write to VDBs #1225

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5d7b3cb
Added milvus vdb prototype impl
bsuryadevara Sep 26, 2023
4807f3d
Added milvus vdb prototype impl
bsuryadevara Sep 26, 2023
b1f94fb
Added llamaindex and langchain prototypes
bsuryadevara Sep 27, 2023
d912645
doc updates
bsuryadevara Sep 27, 2023
4ecd37f
updates to milvus vd service
bsuryadevara Sep 30, 2023
c18125a
updated search and upsert functions
bsuryadevara Oct 2, 2023
a6ef60e
Added write_to_vector_db stage
bsuryadevara Oct 3, 2023
7389542
Added tests to get started
bsuryadevara Oct 3, 2023
3a31cee
Added tests to get started
bsuryadevara Oct 3, 2023
4cfba55
Added MilvusClient extension class to support missing functions
bsuryadevara Oct 4, 2023
b83f517
Added tests for Milvus vector database serivce
bsuryadevara Oct 4, 2023
b7fee57
Added tests for Milvus vector database service
bsuryadevara Oct 4, 2023
cde18b2
Added tests for Milvus vector database service
bsuryadevara Oct 4, 2023
c9316c0
Added milvus lite to pipeline tests
bsuryadevara Oct 9, 2023
36f1f18
Added tests with milvus lite
bsuryadevara Oct 11, 2023
2f24cc2
Updated Milvus VDB tests
bsuryadevara Oct 11, 2023
9670c97
Merge remote-tracking branch 'upstream/branch-23.11' into 1177-fea-ad…
bsuryadevara Oct 11, 2023
e4b8a02
Updated Milvus VDB tests
bsuryadevara Oct 11, 2023
a5e742e
Added tests with milvus lite
bsuryadevara Oct 11, 2023
3d0e01b
Renamed a file
bsuryadevara Oct 11, 2023
cd52a5f
Feedback changes
bsuryadevara Oct 12, 2023
5ce3402
Feedback changes
bsuryadevara Oct 12, 2023
9e6989a
Removed register stage decorator
bsuryadevara Oct 12, 2023
cf327b5
Ignore pymilvus in the docs
bsuryadevara Oct 13, 2023
a6a6f43
Update variable names
bsuryadevara Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Feedback changes
  • Loading branch information
bsuryadevara committed Oct 12, 2023
commit cd52a5f69efe5e36cd974fbc06cf543ff6dc8e8d
17 changes: 12 additions & 5 deletions morpheus/service/milvus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import typing

import pymilvus
from pymilvus.orm.mutation import MutationResult

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -152,7 +153,7 @@ def release_collection(self, collection_name: str) -> None:
conn.release_collection(collection_name=collection_name)

@handle_exceptions("upsert", "Error upserting collection entities")
def upsert(self, collection_name: str, entities: list, **kwargs: dict[str, typing.Any]) -> typing.Any:
def upsert(self, collection_name: str, entities: list, **kwargs: dict[str, typing.Any]) -> MutationResult:
"""
Upsert entities into a collection.

Expand All @@ -167,14 +168,14 @@ def upsert(self, collection_name: str, entities: list, **kwargs: dict[str, typin

Returns
-------
typing.Any
MutationResult
Result of the upsert operation.
"""
conn = self._get_connection()
return conn.upsert(collection_name=collection_name, entities=entities, **kwargs)

@handle_exceptions("delete_by_expr", "Error deleting collection entities")
def delete_by_expr(self, collection_name: str, expression: str, **kwargs: dict[str, typing.Any]) -> None:
def delete_by_expr(self, collection_name: str, expression: str, **kwargs: dict[str, typing.Any]) -> MutationResult:
"""
Delete entities from a collection using an expression.

Expand All @@ -186,9 +187,14 @@ def delete_by_expr(self, collection_name: str, expression: str, **kwargs: dict[s
Deletion expression.
**kwargs : dict
Additional keyword arguments for the delete operation.

Returns
-------
MutationResult
Returns result of delete operation.
"""
conn = self._get_connection()
conn.delete(collection_name=collection_name, expression=expression, **kwargs)
return conn.delete(collection_name=collection_name, expression=expression, **kwargs)

@handle_exceptions("has_partition", "Error checking partition existence")
def has_partition(self, collection_name: str, partition_name: str) -> bool:
Expand Down Expand Up @@ -223,7 +229,7 @@ def drop_partition(self, collection_name: str, partition_name: str) -> None:
Name of the partition to drop.
"""
conn = self._get_connection()
conn.drop(collection_name=collection_name, partition_name=partition_name)
conn.drop_partition(collection_name=collection_name, partition_name=partition_name)

@handle_exceptions("drop_index", "Error dropping index")
def drop_index(self, collection_name: str, field_name: str, index_name: str) -> None:
Expand Down Expand Up @@ -260,4 +266,5 @@ def get_collection(self, collection_name: str, **kwargs: dict[str, typing.Any])
Returns pymilvus Collection instance.
"""
collection = pymilvus.Collection(name=collection_name, using=self._using, **kwargs)

return collection
173 changes: 81 additions & 92 deletions morpheus/service/milvus_vector_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import pandas as pd
import pymilvus
from pymilvus.orm.mutation import MutationResult

import cudf

Expand Down Expand Up @@ -76,53 +77,8 @@ class MilvusVectorDBService(VectorDBService):
"""

_collection_locks = {}
_cleanup_interval = 3600

@classmethod
def get_collection_lock(cls, name: str) -> threading.Lock:
"""
Get a lock fora given collection name.

Parameters
----------
name : str
Name of the collection for which to acquire the lock.

Returns
-------
threading.Lock
A thread lock specific to the given collection name.
"""
if name not in cls._collection_locks:
cls._collection_locks[name] = {"lock": threading.Lock(), "last_used": time.time()}
else:
cls._collection_locks[name]["last_used"] = time.time()
return cls._collection_locks[name]["lock"]

@classmethod
def initiate_cleanup_thread(cls):
"""
Start a cleanup thread for managing collection locks. This method starts a daemon thread that periodically
removes collection locks that have exceeded the cleanup interval.
"""
cls.cleanup_thread = threading.Thread(target=cls.cleanup, daemon=True)
cls.cleanup_thread.start()

@classmethod
def cleanup(cls):
"""
Cleanup thread for managing collection locks. This method runs as a thread and periodically removes
collection locks that have exceeded the cleanup interval. It helps to free up locks associated with
collections that are no longer in use.
"""
while True:
current_time = time.time()
for name, lock_info in cls._collection_locks.copy().items():
last_used = lock_info["last_used"]
if current_time - last_used >= cls._cleanup_interval:
logger.debug("Cleaning up lock for collection: %s", name)
del cls._collection_locks[name]
time.sleep(cls._cleanup_interval)
_cleanup_interval = 600 # 10mins
_last_cleanup_time = time.time()

def __init__(self,
uri: str,
Expand Down Expand Up @@ -240,16 +196,16 @@ def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing.
self._client.create_partition(collection_name=name, partition_name=part["name"], timeout=timeout)

@with_collection_lock
def insert(self, name: str, data: typing.Union[list[list], list[dict], dict], **kwargs: dict[str,
typing.Any]) -> dict:
def insert(self, name: str, data: list[list] | list[dict], **kwargs: dict[str,
typing.Any]) -> dict[str, typing.Any]:
"""
Insert a collection specific data in the Milvus vector database.

Parameters
----------
name : str
Name of the collection to be inserted.
data : typing.Union[list[list], list[dict], dict]
data : list[list] | list[dict]
Data to be inserted in the collection.
**kwargs : dict[str, typing.Any]
Additional keyword arguments containing collection configuration.
Expand All @@ -267,10 +223,8 @@ def insert(self, name: str, data: typing.Union[list[list], list[dict], dict], **

return self._collection_insert(name, data, **kwargs)

def _collection_insert(self,
name: str,
data: typing.Union[list[list], list[dict], dict],
**kwargs: dict[str, typing.Any]) -> dict:
def _collection_insert(self, name: str, data: list[list] | list[dict],
**kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]:

if not self.has_store_object(name):
raise RuntimeError(f"Collection {name} doesn't exist.")
Expand Down Expand Up @@ -301,18 +255,16 @@ def _collection_insert(self,
return result_dict

@with_collection_lock
def insert_dataframe(self,
name: str,
df: typing.Union[cudf.DataFrame, pd.DataFrame],
**kwargs: dict[str, typing.Any]) -> dict:
def insert_dataframe(self, name: str, df: cudf.DataFrame | pd.DataFrame,
**kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]:
"""
Converts dataframe to rows and insert to a collection in the Milvus vector database.

Parameters
----------
name : str
Name of the collection to be inserted.
df : typing.Union[cudf.DataFrame, pd.DataFrame]
df : cudf.DataFrame | pd.DataFrame
Dataframe to be inserted in the collection.
**kwargs : dict[str, typing.Any]
Additional keyword arguments containing collection configuration.
Expand All @@ -338,7 +290,7 @@ def insert_dataframe(self,
return self._collection_insert(name, dict_of_rows, **kwargs)

@with_collection_lock
def search(self, name: str, query: typing.Union[str, dict] = None, **kwargs: dict[str, typing.Any]) -> typing.Any:
def search(self, name: str, query: str = None, **kwargs: dict[str, typing.Any]) -> typing.Any:
"""
Search for data in a collection in the Milvus vector database.

Expand All @@ -348,9 +300,8 @@ def search(self, name: str, query: typing.Union[str, dict] = None, **kwargs: dic
----------
name : str
Name of the collection to search within.
query : Union[str, dict], optional
The search query, which can be a JSON-like string or a dictionary,
by default None.
query : str, optional
The search query, which can be a filter expression, by default None.
**kwargs : dict
Additional keyword arguments for the search operation.

Expand Down Expand Up @@ -392,22 +343,22 @@ def search(self, name: str, query: typing.Union[str, dict] = None, **kwargs: dic
self._client.release_collection(collection_name=name)

@with_collection_lock
def update(self, name: str, data: list[dict], **kwargs: dict[str, typing.Any]) -> dict:
def update(self, name: str, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]:
"""
Update data in the vector database.

Parameters
----------
name : str
Name of the resource.
data : list[dict]
Data to be updated in the resource.
data : list[typing.Any]
Data to be updated in the collection.
**kwargs : dict[str, typing.Any]
Extra keyword arguments specific to upsert operation.

Returns
-------
dict
dict[str, typing.Any]
Returns result of the updated operation stats.
"""

Expand All @@ -416,28 +367,18 @@ def update(self, name: str, data: list[dict], **kwargs: dict[str, typing.Any]) -

result = self._client.upsert(collection_name=name, entities=data, **kwargs)

result_dict = {
"insert_count": result.insert_count,
"delete_count": result.delete_count,
"upsert_count": result.upsert_count,
"timestamp": result.timestamp,
"succ_count": result.succ_count,
"err_count": result.err_count
}

return result_dict
return self._convert_mutation_result_to_dict(result=result)

@with_collection_lock
def delete_by_keys(self, name: str, keys: typing.Union[int, str, list], **kwargs: dict[str,
typing.Any]) -> typing.Any:
def delete_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> typing.Any:
"""
Delete vectors by keys from the resource.

Parameters
----------
name : str
Name of the resource.
keys : typing.Union[int, str, list]
keys : int | str | list
Primary keys to delete vectors.
**kwargs : dict[str, typing.Any]
Extra keyword arguments specific to the vector database implementation.
Expand All @@ -448,51 +389,52 @@ def delete_by_keys(self, name: str, keys: typing.Union[int, str, list], **kwargs
Returns result of the given keys that are delete from the collection.
"""

response = self._client.delete(collection_name=name, pks=keys, **kwargs)
result = self._client.delete(collection_name=name, pks=keys, **kwargs)

return response
return result

@with_collection_lock
def delete(self, name: str, expr: typing.Union[str, dict], **kwargs: dict[str, typing.Any]) -> typing.Any:
def delete(self, name: str, expr: str, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]:
"""
Delete vectors from the resource using expressions.

Parameters
----------
name : str
Name of the resource.
expr : typing.Union[str, dict]
expr : str
Delete expression.
**kwargs : dict[str, typing.Any]
Extra keyword arguments specific to the vector database implementation.

Returns
-------
typing.Any
dict[str, typing.Any]
Returns result of the given keys that are delete from the collection.
"""

return self._client.delete_by_expr(collection_name=name, expression=expr, **kwargs)
result = self._client.delete_by_expr(collection_name=name, expression=expr, **kwargs)

return self._convert_mutation_result_to_dict(result=result)

@with_collection_lock
def retrieve_by_keys(self, name: str, keys: typing.Union[int, str, list], **kwargs: dict[str,
typing.Any]) -> list[dict]:
def retrieve_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> list[typing.Any]:
"""
Retrieve the inserted vectors using their primary keys from the Collection.

Parameters
----------
name : str
Name of the collection.
keys : typing.Union[int, str, list]
keys : int | str | list
Primary keys to get vectors for. Depending on pk_field type it can be int or str
or a list of either.
**kwargs : dict[str, typing.Any]
Additional keyword arguments for the retrieval operation.

Returns
-------
list[dict]
list[typing.Any]
Returns result rows of the given keys from the collection.
"""

Expand Down Expand Up @@ -568,7 +510,9 @@ def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None:
resource = kwargs.get("resource", "collection")
if resource == "collection":
self._client.drop_collection(collection_name=name)
elif resource == "partition" and "partition_name" in kwargs:
elif resource == "partition":
if "partition_name" not in kwargs:
raise ValueError("Mandatory argument 'partition_name' is required when resource='partition'")
partition_name = kwargs["partition_name"]
if self._client.has_partition(collection_name=name, partition_name=partition_name):
self._client.drop_partition(collection_name=name, partition_name=partition_name)
Expand All @@ -578,7 +522,8 @@ def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None:
field_name=kwargs["field_name"],
index_name=kwargs["index_name"])
else:
raise ValueError("Mandatory arguments 'field_name' and 'index_name' are missing")
raise ValueError(
"Mandatory arguments 'field_name' and 'index_name' are required when resource='index'")

def describe(self, name: str, **kwargs: dict[str, typing.Any]) -> dict:
"""
Expand Down Expand Up @@ -607,3 +552,47 @@ def close(self) -> None:

"""
self._client.close()

def _convert_mutation_result_to_dict(self, result: MutationResult) -> dict[str, typing.Any]:
result_dict = {
"insert_count": result.insert_count,
"delete_count": result.delete_count,
"upsert_count": result.upsert_count,
"timestamp": result.timestamp,
"succ_count": result.succ_count,
"err_count": result.err_count
}
return result_dict

@classmethod
def get_collection_lock(cls, name: str) -> threading.Lock:
"""
Get a lock for a given collection name.

Parameters
----------
name : str
Name of the collection for which to acquire the lock.

Returns
-------
threading.Lock
A thread lock specific to the given collection name.
"""

current_time = time.time()

if name not in cls._collection_locks:
cls._collection_locks[name] = {"lock": threading.Lock(), "last_used": current_time}
else:
cls._collection_locks[name]["last_used"] = current_time

if (current_time - cls._last_cleanup_time) >= cls._cleanup_interval:
for lock_name, lock_info in cls._collection_locks.copy().items():
last_used = lock_info["last_used"]
if current_time - last_used >= cls._cleanup_interval:
logger.debug("Cleaning up lock for collection: %s", lock_name)
del cls._collection_locks[lock_name]
cls._last_cleanup_time = current_time

return cls._collection_locks[name]["lock"]
Loading