Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Vector Database Service to allow stages to read and write to VDBs #1225

Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5d7b3cb
Added milvus vdb prototype impl
bsuryadevara Sep 26, 2023
4807f3d
Added milvus vdb prototype impl
bsuryadevara Sep 26, 2023
b1f94fb
Added llamaindex and langchain prototypes
bsuryadevara Sep 27, 2023
d912645
doc updates
bsuryadevara Sep 27, 2023
4ecd37f
updates to milvus vd service
bsuryadevara Sep 30, 2023
c18125a
updated search and upsert functions
bsuryadevara Oct 2, 2023
a6ef60e
Added write_to_vector_db stage
bsuryadevara Oct 3, 2023
7389542
Added tests to get started
bsuryadevara Oct 3, 2023
3a31cee
Added tests to get started
bsuryadevara Oct 3, 2023
4cfba55
Added MilvusClient extension class to support missing functions
bsuryadevara Oct 4, 2023
b83f517
Added tests for Milvus vector database serivce
bsuryadevara Oct 4, 2023
b7fee57
Added tests for Milvus vector database service
bsuryadevara Oct 4, 2023
cde18b2
Added tests for Milvus vector database service
bsuryadevara Oct 4, 2023
c9316c0
Added milvus lite to pipeline tests
bsuryadevara Oct 9, 2023
36f1f18
Added tests with milvus lite
bsuryadevara Oct 11, 2023
2f24cc2
Updated Milvus VDB tests
bsuryadevara Oct 11, 2023
9670c97
Merge remote-tracking branch 'upstream/branch-23.11' into 1177-fea-ad…
bsuryadevara Oct 11, 2023
e4b8a02
Updated Milvus VDB tests
bsuryadevara Oct 11, 2023
a5e742e
Added tests with milvus lite
bsuryadevara Oct 11, 2023
3d0e01b
Renamed a file
bsuryadevara Oct 11, 2023
cd52a5f
Feedback changes
bsuryadevara Oct 12, 2023
5ce3402
Feedback changes
bsuryadevara Oct 12, 2023
9e6989a
Removed register stage decorator
bsuryadevara Oct 12, 2023
cf327b5
Ignore pymilvus in the docs
bsuryadevara Oct 13, 2023
a6a6f43
Update variable names
bsuryadevara Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,5 @@ dependencies:
# Add additional dev dependencies here
- databricks-connect
- pytest-kafka==0.6.0
- pymilvus==2.3.1
- milvus
13 changes: 13 additions & 0 deletions morpheus/service/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
263 changes: 263 additions & 0 deletions morpheus/service/milvus_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import typing

import pymilvus

logger = logging.getLogger(__name__)

# Milvus data type mapping dictionary
MILVUS_DATA_TYPE_MAP = {
"int8": pymilvus.DataType.INT8,
"int16": pymilvus.DataType.INT16,
"int32": pymilvus.DataType.INT32,
"int64": pymilvus.DataType.INT64,
"bool": pymilvus.DataType.BOOL,
"float": pymilvus.DataType.FLOAT,
"double": pymilvus.DataType.DOUBLE,
"binary_vector": pymilvus.DataType.BINARY_VECTOR,
"float_vector": pymilvus.DataType.FLOAT_VECTOR,
"string": pymilvus.DataType.STRING,
"varchar": pymilvus.DataType.VARCHAR,
"json": pymilvus.DataType.JSON,
}


def handle_exceptions(method_name: str, error_message: str) -> typing.Callable:
"""
Decorator function to handle exceptions and log errors.

Parameters
----------
method_name : str
Name of the method being decorated.
error_message : str
Error message to log in case of an exception.

Returns
-------
typing.Callable
Decorated function.
"""

def decorator(method):

def wrapper(*args, **kwargs):
try:
return method(*args, **kwargs)
except Exception as ex:
logger.error("%s - Failed to execute %s.", error_message, method_name)
raise ex from ex

return wrapper

return decorator


class MilvusClient(pymilvus.MilvusClient):
"""
Extension of the `pymilvus.MilvusClient` class with custom functions.

Parameters
----------
uri : str
URI for connecting to Milvus server.
user : str
User name for authentication.
password : str
Password for authentication.
db_name : str
Name of the Milvus database.
token : str
Token for authentication.
**kwargs : dict
Additional keyword arguments for the MilvusClient constructor.
"""

def __init__(self, uri: str, user: str, password: str, db_name: str, token: str, **kwargs: dict[str, typing.Any]):
super().__init__(uri=uri, user=user, password=password, db_name=db_name, token=token, **kwargs)

@handle_exceptions("has_collection", "Error checking collection existence")
def has_collection(self, collection_name: str) -> bool:
"""
Check if a collection exists in the database.

Parameters
----------
collection_name : str
Name of the collection to check.

Returns
-------
bool
True if the collection exists, False otherwise.
"""
conn = self._get_connection()
return conn.has_collection(collection_name)

@handle_exceptions("create_partition", "Error creating partition")
def create_partition(self, collection_name: str, partition_name: str, timeout: float = 1.0) -> None:
"""
Create a partition within a collection.

Parameters
----------
collection_name : str
Name of the collection.
partition_name : str
Name of the partition to create.
timeout : float, optional
Timeout for the operation in seconds (default is 1.0).
"""
conn = self._get_connection()
conn.create_partition(collection_name=collection_name, partition_name=partition_name, timeout=timeout)

@handle_exceptions("load_collection", "Error loading collection")
def load_collection(self, collection_name: str) -> None:
"""
Load a collection into memory.

Parameters
----------
collection_name : str
Name of the collection to load.
"""
conn = self._get_connection()
conn.load_collection(collection_name=collection_name)

@handle_exceptions("release_collection", "Error releasing collection")
def release_collection(self, collection_name: str) -> None:
"""
Release a loaded collection from memory.

Parameters
----------
collection_name : str
Name of the collection to release.
"""
conn = self._get_connection()
conn.release_collection(collection_name=collection_name)

@handle_exceptions("upsert", "Error upserting collection entities")
def upsert(self, collection_name: str, entities: list, **kwargs: dict[str, typing.Any]) -> typing.Any:
"""
Upsert entities into a collection.

Parameters
----------
collection_name : str
Name of the collection to upsert into.
entities : list
List of entities to upsert.
**kwargs : dict
Additional keyword arguments for the upsert operation.

Returns
-------
typing.Any
Result of the upsert operation.
"""
conn = self._get_connection()
return conn.upsert(collection_name=collection_name, entities=entities, **kwargs)

@handle_exceptions("delete_by_expr", "Error deleting collection entities")
def delete_by_expr(self, collection_name: str, expression: str, **kwargs: dict[str, typing.Any]) -> None:
"""
Delete entities from a collection using an expression.

Parameters
----------
collection_name : str
Name of the collection to delete from.
expression : str
Deletion expression.
**kwargs : dict
Additional keyword arguments for the delete operation.
"""
conn = self._get_connection()
conn.delete(collection_name=collection_name, expression=expression, **kwargs)

@handle_exceptions("has_partition", "Error checking partition existence")
def has_partition(self, collection_name: str, partition_name: str) -> bool:
"""
Check if a partition exists within a collection.

Parameters
----------
collection_name : str
Name of the collection.
partition_name : str
Name of the partition to check.

Returns
-------
bool
True if the partition exists, False otherwise.
"""
conn = self._get_connection()
return conn.has_partition(collection_name=collection_name, partition_name=partition_name)

@handle_exceptions("drop_partition", "Error dropping partition")
def drop_partition(self, collection_name: str, partition_name: str) -> None:
"""
Drop a partition from a collection.

Parameters
----------
collection_name : str
Name of the collection.
partition_name : str
Name of the partition to drop.
"""
conn = self._get_connection()
conn.drop(collection_name=collection_name, partition_name=partition_name)

@handle_exceptions("drop_index", "Error dropping index")
def drop_index(self, collection_name: str, field_name: str, index_name: str) -> None:
"""
Drop an index from a collection.

Parameters
----------
collection_name : str
Name of the collection.
field_name : str
Name of the field associated with the index.
index_name : str
Name of the index to drop.
"""
conn = self._get_connection()
conn.drop_index(collection_name=collection_name, field_name=field_name, index_name=index_name)

@handle_exceptions("get_collection", "Error getting collection object")
def get_collection(self, collection_name: str, **kwargs: dict[str, typing.Any]) -> pymilvus.Collection:
"""
Returns `pymilvus.Collection` object associated with the given collection name.

Parameters
----------
collection_name : str
Name of the collection to delete from.
**kwargs : dict
Additional keyword arguments to get Collection instance.

Returns
-------
pymilvus.Collection
Returns pymilvus Collection instance.
"""
collection = pymilvus.Collection(name=collection_name, using=self._using, **kwargs)
return collection
Loading
Loading