Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9525d5f
exmaples+website+sdks/python: update docs and exmaples for milvus tra…
mohamedawnallah Jun 27, 2025
f746a78
Merge remote-tracking branch 'upstream/master' into updateDocsAndAddE…
mohamedawnallah Jun 30, 2025
2e1ae71
Merge remote-tracking branch 'upstream/master' into updateDocsAndAddE…
mohamedawnallah Aug 16, 2025
d482464
examples: update jupyter notebook example
mohamedawnallah Aug 16, 2025
104868e
CHANGES.md: add release note
mohamedawnallah Aug 16, 2025
9951901
sdks/python: update import err exception
mohamedawnallah Aug 16, 2025
62c6a5e
sdks/python: experiment with setting milvus as extra dependency this way
mohamedawnallah Aug 16, 2025
508a8ad
sdks/python: revert pytest marker to use test containers
mohamedawnallah Aug 16, 2025
bc79236
.github: trigger postcommit python
mohamedawnallah Aug 16, 2025
ab170a3
Merge remote-tracking branch 'upstream/master' into updateDocsAndAddE…
mohamedawnallah Sep 16, 2025
435d9e8
sdks/python: undo `require_docker_in_docker` pytest marker
mohamedawnallah Sep 16, 2025
084687b
sdks/python: fix formatting issues
mohamedawnallah Sep 16, 2025
924282a
python: mark `test_enrichment_with_milvus` with require_docker_in_docker
mohamedawnallah Sep 16, 2025
c687fcf
sdks/python: test milvus example
mohamedawnallah Sep 16, 2025
733c233
sdks/python: update jupyter notebook example
mohamedawnallah Sep 16, 2025
2c78633
CHANGES.md: update release notes
mohamedawnallah Sep 16, 2025
e889882
sdks/python: fix linting issues
mohamedawnallah Sep 16, 2025
116a95c
sdks/python: properly skip milvus test on any container startup failures
mohamedawnallah Sep 24, 2025
3a3b03a
sdks/python: properly skip sql tests on any container startup failure
mohamedawnallah Sep 24, 2025
8bab624
sdks/python: fix linting issues
mohamedawnallah Sep 24, 2025
f8af037
examples: address comments on milvus jupyter notebook
mohamedawnallah Oct 18, 2025
c764314
Merge remote-tracking branch 'upstream/master' into updateDocsAndAddE…
mohamedawnallah Oct 18, 2025
b7c064f
ml/rag: enforce running etcd in milvus itests in standalone mode
mohamedawnallah Oct 19, 2025
aa50907
examples: update jupyter notebook mainly to pin milvus db version
mohamedawnallah Oct 19, 2025
fa80a1f
website: remove `Related transforms` section
mohamedawnallah Oct 19, 2025
e30ab53
sdks/python: pin milvus db version in py examples
mohamedawnallah Oct 19, 2025
a0299e9
sdks/python: skip validation if there's no enrichment data
mohamedawnallah Oct 20, 2025
d96a884
sdks/python: pin milvus db version `v2.5.10`
mohamedawnallah Oct 20, 2025
98c5ef9
milvus: add descriptive comments about updating db version in tests
mohamedawnallah Oct 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Python examples added for Milvus search enrichment handler on [Beam Website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-milvus/)
including jupyter notebook example (Python) ([#36176](https://github.com/apache/beam/issues/36176)).

## Breaking Changes

Expand Down
2,657 changes: 2,657 additions & 0 deletions examples/notebooks/beam-ml/milvus_enrichment_transform.ipynb

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,15 @@ def enrichment_with_google_cloudsql_pg():
where_clause_template=where_clause_template,
where_clause_fields=where_clause_fields)

cloudsql_handler = CloudSQLEnrichmentHandler(
handler = CloudSQLEnrichmentHandler(
connection_config=connection_config,
table_id=table_id,
query_config=query_config)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
|
"Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment(cloudsql_handler)
| "Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment(handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_google_cloudsql_pg]

Expand Down Expand Up @@ -327,3 +326,75 @@ def enrichment_with_external_sqlserver():
| "Enrich W/ Unmanaged SQL Server" >> Enrichment(cloudsql_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_external_sqlserver]


def enrichment_with_milvus():
# [START enrichment_with_milvus]
import os
import apache_beam as beam
from apache_beam.ml.rag.types import Content
from apache_beam.ml.rag.types import Chunk
from apache_beam.ml.rag.types import Embedding
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.ml.rag.enrichment.milvus_search import (
MilvusSearchEnrichmentHandler,
MilvusConnectionParameters,
MilvusSearchParameters,
MilvusCollectionLoadParameters,
VectorSearchParameters,
VectorSearchMetrics)

uri = os.environ.get("MILVUS_VECTOR_DB_URI")
user = os.environ.get("MILVUS_VECTOR_DB_USER")
password = os.environ.get("MILVUS_VECTOR_DB_PASSWORD")
db_id = os.environ.get("MILVUS_VECTOR_DB_ID")
token = os.environ.get("MILVUS_VECTOR_DB_TOKEN")
collection_name = os.environ.get("MILVUS_VECTOR_DB_COLLECTION_NAME")

data = [
Chunk(
id="query1",
embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]),
content=Content())
]

connection_parameters = MilvusConnectionParameters(
uri, user, password, db_id, token)

# The first condition (language == "en") excludes documents in other
# languages. Initially, this gives us two documents. After applying the second
# condition (cost < 50), only the first document returns in search results.
filter_expr = 'metadata["language"] == "en" AND cost < 50'

search_params = {"metric_type": VectorSearchMetrics.COSINE.value, "nprobe": 1}

vector_search_params = VectorSearchParameters(
anns_field="dense_embedding_cosine",
limit=3,
filter=filter_expr,
search_params=search_params)

search_parameters = MilvusSearchParameters(
collection_name=collection_name,
search_strategy=vector_search_params,
output_fields=["id", "content", "domain", "cost", "metadata"],
round_decimal=2)

# The collection load parameters are optional. They provide fine-graine
# control over how collections are loaded into memory. For simple use cases or
# when getting started, this parameter can be omitted to use default loading
# behavior. Consider using it in resource-constrained environments to optimize
# memory usage and query performance.
collection_load_parameters = MilvusCollectionLoadParameters()

milvus_search_handler = MilvusSearchEnrichmentHandler(
connection_parameters=connection_parameters,
search_parameters=search_parameters,
collection_load_parameters=collection_load_parameters)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Milvus" >> Enrichment(milvus_search_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_milvus]
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
enrichment_with_google_cloudsql_pg,
enrichment_with_external_pg,
enrichment_with_external_mysql,
enrichment_with_external_sqlserver)
enrichment_with_external_sqlserver,
enrichment_with_milvus)
from apache_beam.transforms.enrichment_handlers.cloudsql import (
DatabaseTypeAdapter)
from apache_beam.transforms.enrichment_handlers.cloudsql_it_test import (
Expand All @@ -51,9 +52,21 @@
ConnectionConfig,
CloudSQLConnectionConfig,
ExternalSQLDBConnectionConfig)
from apache_beam.ml.rag.enrichment.milvus_search import (
MilvusConnectionParameters)
from apache_beam.ml.rag.enrichment.milvus_search_it_test import (
MilvusEnrichmentTestHelper,
MilvusDBContainerInfo,
parse_chunk_strings,
assert_chunks_equivalent)
from apache_beam.io.requestresponse import RequestResponseIO
except ImportError as e:
raise unittest.SkipTest(f'RequestResponseIO dependencies not installed: {e}')
raise unittest.SkipTest(f'Examples dependencies are not installed: {str(e)}')


class TestContainerStartupError(Exception):
"""Raised when any test container fails to start."""
pass


def validate_enrichment_with_bigtable():
Expand Down Expand Up @@ -119,6 +132,13 @@ def validate_enrichment_with_external_sqlserver():
return expected


def validate_enrichment_with_milvus():
expected = '''[START enrichment_with_milvus]
Chunk(content=Content(text=None), id='query1', index=0, metadata={'enrichment_data': defaultdict(<class 'list'>, {'id': [1], 'distance': [1.0], 'fields': [{'content': 'This is a test document', 'cost': 49, 'domain': 'medical', 'id': 1, 'metadata': {'language': 'en'}}]})}, embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3], sparse_embedding=None))
[END enrichment_with_milvus]'''.splitlines()[1:-1]
return expected


@mock.patch('sys.stdout', new_callable=StringIO)
@pytest.mark.uses_testcontainer
class EnrichmentTest(unittest.TestCase):
Expand Down Expand Up @@ -148,48 +168,69 @@ def test_enrichment_with_vertex_ai_legacy(self, mock_stdout):
os.environ.get('ALLOYDB_PASSWORD'),
"ALLOYDB_PASSWORD environment var is not provided")
def test_enrichment_with_google_cloudsql_pg(self, mock_stdout):
db_adapter = DatabaseTypeAdapter.POSTGRESQL
with EnrichmentTestHelpers.sql_test_context(True, db_adapter):
try:
try:
db_adapter = DatabaseTypeAdapter.POSTGRESQL
with EnrichmentTestHelpers.sql_test_context(True, db_adapter):
enrichment_with_google_cloudsql_pg()
output = mock_stdout.getvalue().splitlines()
expected = validate_enrichment_with_google_cloudsql_pg()
self.assertEqual(output, expected)
except Exception as e:
self.fail(f"Test failed with unexpected error: {e}")
except Exception as e:
self.fail(f"Test failed with unexpected error: {e}")

def test_enrichment_with_external_pg(self, mock_stdout):
db_adapter = DatabaseTypeAdapter.POSTGRESQL
with EnrichmentTestHelpers.sql_test_context(False, db_adapter):
try:
try:
db_adapter = DatabaseTypeAdapter.POSTGRESQL
with EnrichmentTestHelpers.sql_test_context(False, db_adapter):
enrichment_with_external_pg()
output = mock_stdout.getvalue().splitlines()
expected = validate_enrichment_with_external_pg()
self.assertEqual(output, expected)
except Exception as e:
self.fail(f"Test failed with unexpected error: {e}")
except TestContainerStartupError as e:
raise unittest.SkipTest(str(e))
except Exception as e:
self.fail(f"Test failed with unexpected error: {e}")

def test_enrichment_with_external_mysql(self, mock_stdout):
db_adapter = DatabaseTypeAdapter.MYSQL
with EnrichmentTestHelpers.sql_test_context(False, db_adapter):
try:
try:
db_adapter = DatabaseTypeAdapter.MYSQL
with EnrichmentTestHelpers.sql_test_context(False, db_adapter):
enrichment_with_external_mysql()
output = mock_stdout.getvalue().splitlines()
expected = validate_enrichment_with_external_mysql()
self.assertEqual(output, expected)
except Exception as e:
self.fail(f"Test failed with unexpected error: {e}")
except TestContainerStartupError as e:
raise unittest.SkipTest(str(e))
except Exception as e:
self.fail(f"Test failed with unexpected error: {e}")

def test_enrichment_with_external_sqlserver(self, mock_stdout):
db_adapter = DatabaseTypeAdapter.SQLSERVER
with EnrichmentTestHelpers.sql_test_context(False, db_adapter):
try:
try:
db_adapter = DatabaseTypeAdapter.SQLSERVER
with EnrichmentTestHelpers.sql_test_context(False, db_adapter):
enrichment_with_external_sqlserver()
output = mock_stdout.getvalue().splitlines()
expected = validate_enrichment_with_external_sqlserver()
self.assertEqual(output, expected)
except Exception as e:
self.fail(f"Test failed with unexpected error: {e}")
except TestContainerStartupError as e:
raise unittest.SkipTest(str(e))
except Exception as e:
self.fail(f"Test failed with unexpected error: {e}")

def test_enrichment_with_milvus(self, mock_stdout):
try:
with EnrichmentTestHelpers.milvus_test_context():
enrichment_with_milvus()
output = mock_stdout.getvalue().splitlines()
expected = validate_enrichment_with_milvus()
self.maxDiff = None
output = parse_chunk_strings(output)
expected = parse_chunk_strings(expected)
assert_chunks_equivalent(output, expected)
except TestContainerStartupError as e:
raise unittest.SkipTest(str(e))
except Exception as e:
self.fail(f"Test failed with unexpected error: {e}")


@dataclass
Expand All @@ -201,6 +242,7 @@ class CloudSQLEnrichmentTestDataConstruct:


class EnrichmentTestHelpers:
@staticmethod
@contextmanager
def sql_test_context(is_cloudsql: bool, db_adapter: DatabaseTypeAdapter):
result: Optional[CloudSQLEnrichmentTestDataConstruct] = None
Expand All @@ -212,6 +254,17 @@ def sql_test_context(is_cloudsql: bool, db_adapter: DatabaseTypeAdapter):
if result:
EnrichmentTestHelpers.post_sql_enrichment_test(result)

@staticmethod
@contextmanager
def milvus_test_context():
db: Optional[MilvusDBContainerInfo] = None
try:
db = EnrichmentTestHelpers.pre_milvus_enrichment()
yield
finally:
if db:
EnrichmentTestHelpers.post_milvus_enrichment(db)

@staticmethod
def pre_sql_enrichment_test(
is_cloudsql: bool,
Expand Down Expand Up @@ -259,20 +312,25 @@ def pre_sql_enrichment_test(
password=password,
db_id=db_id)
else:
db = SQLEnrichmentTestHelper.start_sql_db_container(db_adapter)
os.environ['EXTERNAL_SQL_DB_HOST'] = db.host
os.environ['EXTERNAL_SQL_DB_PORT'] = str(db.port)
os.environ['EXTERNAL_SQL_DB_ID'] = db.id
os.environ['EXTERNAL_SQL_DB_USER'] = db.user
os.environ['EXTERNAL_SQL_DB_PASSWORD'] = db.password
os.environ['EXTERNAL_SQL_DB_TABLE_ID'] = table_id
connection_config = ExternalSQLDBConnectionConfig(
db_adapter=db_adapter,
host=db.host,
port=db.port,
user=db.user,
password=db.password,
db_id=db.id)
try:
db = SQLEnrichmentTestHelper.start_sql_db_container(db_adapter)
os.environ['EXTERNAL_SQL_DB_HOST'] = db.host
os.environ['EXTERNAL_SQL_DB_PORT'] = str(db.port)
os.environ['EXTERNAL_SQL_DB_ID'] = db.id
os.environ['EXTERNAL_SQL_DB_USER'] = db.user
os.environ['EXTERNAL_SQL_DB_PASSWORD'] = db.password
os.environ['EXTERNAL_SQL_DB_TABLE_ID'] = table_id
connection_config = ExternalSQLDBConnectionConfig(
db_adapter=db_adapter,
host=db.host,
port=db.port,
user=db.user,
password=db.password,
db_id=db.id)
except Exception as e:
db_name = db_adapter.value.lower()
raise TestContainerStartupError(
f"{db_name} container failed to start: {str(e)}")

conenctor = connection_config.get_connector_handler()
engine = create_engine(
Expand Down Expand Up @@ -311,6 +369,45 @@ def post_sql_enrichment_test(res: CloudSQLEnrichmentTestDataConstruct):
os.environ.pop('GOOGLE_CLOUD_SQL_DB_PASSWORD', None)
os.environ.pop('GOOGLE_CLOUD_SQL_DB_TABLE_ID', None)

@staticmethod
def pre_milvus_enrichment() -> MilvusDBContainerInfo:
try:
db = MilvusEnrichmentTestHelper.start_db_container()
except Exception as e:
raise TestContainerStartupError(
f"Milvus container failed to start: {str(e)}")

connection_params = MilvusConnectionParameters(
uri=db.uri,
user=db.user,
password=db.password,
db_id=db.id,
token=db.token)

collection_name = MilvusEnrichmentTestHelper.initialize_db_with_data(
connection_params)

# Setup environment variables for db and collection configuration. This will
# be used downstream by the milvus enrichment handler.
os.environ['MILVUS_VECTOR_DB_URI'] = db.uri
os.environ['MILVUS_VECTOR_DB_USER'] = db.user
os.environ['MILVUS_VECTOR_DB_PASSWORD'] = db.password
os.environ['MILVUS_VECTOR_DB_ID'] = db.id
os.environ['MILVUS_VECTOR_DB_TOKEN'] = db.token
os.environ['MILVUS_VECTOR_DB_COLLECTION_NAME'] = collection_name

return db

@staticmethod
def post_milvus_enrichment(db: MilvusDBContainerInfo):
MilvusEnrichmentTestHelper.stop_db_container(db)
os.environ.pop('MILVUS_VECTOR_DB_URI', None)
os.environ.pop('MILVUS_VECTOR_DB_USER', None)
os.environ.pop('MILVUS_VECTOR_DB_PASSWORD', None)
os.environ.pop('MILVUS_VECTOR_DB_ID', None)
os.environ.pop('MILVUS_VECTOR_DB_TOKEN', None)
os.environ.pop('MILVUS_VECTOR_DB_COLLECTION_NAME', None)


if __name__ == '__main__':
unittest.main()
Loading
Loading