Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## [1.2.28]

* **fix: Upldate milvus to work with latest pymilvus**

## [1.2.27]

* **feat: Add AWS IAM authentication and full async support to OpenSearch connector**
Expand Down
2 changes: 1 addition & 1 deletion unstructured_ingest/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.2.27" # pragma: no cover
__version__ = "1.2.28" # pragma: no cover
107 changes: 54 additions & 53 deletions unstructured_ingest/processes/connectors/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,23 +172,24 @@ class MilvusUploader(Uploader):
upload_config: MilvusUploaderConfig
connector_type: str = CONNECTOR_TYPE

def has_dynamic_fields_enabled(self) -> bool:
def _has_dynamic_fields_enabled(self, client: "MilvusClient") -> bool:
"""Check if the target collection has dynamic fields enabled."""
try:
with self.get_client() as client:
collection_info = client.describe_collection(self.upload_config.collection_name)

# Check if dynamic field is enabled
# The schema info should contain enable_dynamic_field or enableDynamicField
schema_info = collection_info.get(
"enable_dynamic_field",
collection_info.get("enableDynamicField", False),
)
return bool(schema_info)
collection_info = client.describe_collection(self.upload_config.collection_name)
schema_info = collection_info.get(
"enable_dynamic_field",
collection_info.get("enableDynamicField", False),
)
return bool(schema_info)
except Exception as e:
logger.warning(f"Could not determine if collection has dynamic fields enabled: {e}")
return False

def has_dynamic_fields_enabled(self) -> bool:
"""Check if the target collection has dynamic fields enabled."""
with self.get_client() as client:
return self._has_dynamic_fields_enabled(client)

@DestinationConnectionError.wrap
def precheck(self):
from pymilvus import MilvusException
Expand All @@ -212,33 +213,31 @@ def get_client(self) -> Generator["MilvusClient", None, None]:
client.using_database(db_name=db_name)
yield client

def delete_by_record_id(self, file_data: FileData) -> None:
def _delete_by_record_id(self, client: "MilvusClient", file_data: FileData) -> None:
"""Delete records by record ID using provided client."""
logger.info(
f"deleting any content with metadata {RECORD_ID_LABEL}={file_data.identifier} "
f"from milvus collection {self.upload_config.collection_name}"
)
with self.get_client() as client:
delete_filter = f'{self.upload_config.record_id_key} == "{file_data.identifier}"'
resp = client.delete(
collection_name=self.upload_config.collection_name, filter=delete_filter
)
logger.info(
"deleted {} records from milvus collection {}".format(
resp["delete_count"], self.upload_config.collection_name
)
delete_filter = f'{self.upload_config.record_id_key} == "{file_data.identifier}"'
resp = client.delete(
collection_name=self.upload_config.collection_name, filter=delete_filter
)
logger.info(
"deleted {} records from milvus collection {}".format(
resp["delete_count"], self.upload_config.collection_name
)
)

@requires_dependencies(["pymilvus"], extras="milvus")
def _prepare_data_for_insert(self, data: list[dict]) -> list[dict]:
"""
Conforms the provided data to the schema of the target Milvus collection.
- If dynamic fields are enabled, it ensures JSON-stringified fields are decoded.
- If dynamic fields are disabled, it filters out any fields not present in the schema.
"""
def delete_by_record_id(self, file_data: FileData) -> None:
"""Delete records by record ID."""
with self.get_client() as client:
self._delete_by_record_id(client, file_data)

dynamic_fields_enabled = self.has_dynamic_fields_enabled()
def _prepare_data_for_insert(self, client: "MilvusClient", data: list[dict]) -> list[dict]:
"""Conform data to the schema of the target Milvus collection."""
dynamic_fields_enabled = self._has_dynamic_fields_enabled(client)

# If dynamic fields are enabled, 'languages' field needs to be a list
if dynamic_fields_enabled:
logger.debug("Dynamic fields enabled, ensuring 'languages' field is a list.")
prepared_data = []
Expand All @@ -255,51 +254,53 @@ def _prepare_data_for_insert(self, data: list[dict]) -> list[dict]:
prepared_data.append(new_item)
return prepared_data

# If dynamic fields are not enabled, we need to filter out the metadata fields
# to avoid insertion errors for fields not defined in the schema
with self.get_client() as client:
collection_info = client.describe_collection(
self.upload_config.collection_name,
)
# Dynamic fields disabled - filter out fields not in schema
collection_info = client.describe_collection(self.upload_config.collection_name)
schema_fields = {
field["name"]
for field in collection_info.get("fields", [])
if not field.get("auto_id", False)
}
# Remove metadata fields that are not part of the base schema
filtered_data = []
for item in data:
filtered_item = {key: value for key, value in item.items() if key in schema_fields}
filtered_data.append(filtered_item)
return filtered_data

@requires_dependencies(["pymilvus"], extras="milvus")
def insert_results(self, data: list[dict]):
def _insert_results(self, client: "MilvusClient", data: list[dict]) -> None:
"""Insert data using provided client."""
from pymilvus import MilvusException

logger.info(
f"uploading {len(data)} entries to {self.connection_config.db_name} "
f"db in collection {self.upload_config.collection_name}"
)

prepared_data = self._prepare_data_for_insert(data=data)
prepared_data = self._prepare_data_for_insert(client, data)

try:
res = client.insert(
collection_name=self.upload_config.collection_name, data=prepared_data
)
except MilvusException as milvus_exception:
raise WriteError(
f"failed to upload records to Milvus: {str(milvus_exception.message)}"
) from milvus_exception
if "err_count" in res and isinstance(res["err_count"], int) and res["err_count"] > 0:
err_count = res["err_count"]
raise WriteError(f"failed to upload {err_count} docs")

@requires_dependencies(["pymilvus"], extras="milvus")
def insert_results(self, data: list[dict]):
"""Insert data into Milvus collection."""
with self.get_client() as client:
try:
res = client.insert(
collection_name=self.upload_config.collection_name, data=prepared_data
)
except MilvusException as milvus_exception:
raise WriteError(
f"failed to upload records to Milvus: {str(milvus_exception.message)}"
) from milvus_exception
if "err_count" in res and isinstance(res["err_count"], int) and res["err_count"] > 0:
err_count = res["err_count"]
raise WriteError(f"failed to upload {err_count} docs")
self._insert_results(client, data)

def run_data(self, data: list[dict], file_data: FileData, **kwargs: Any) -> None:
self.delete_by_record_id(file_data=file_data)
self.insert_results(data=data)
# Use single client for entire operation to avoid gRPC channel issues
with self.get_client() as client:
self._delete_by_record_id(client, file_data)
self._insert_results(client, data)


milvus_destination_entry = DestinationRegistryEntry(
Expand Down
Loading