Skip to content

Commit

Permalink
feat: support creating optimized online store with private service co…
Browse files Browse the repository at this point in the history
…nnect

PiperOrigin-RevId: 662289750
  • Loading branch information
vertex-sdk-bot authored and copybara-github committed Aug 13, 2024
1 parent c68d559 commit 659ba3f
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 42 deletions.
7 changes: 6 additions & 1 deletion tests/unit/vertexai/feature_store_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@
_TEST_PSC_OPTIMIZED_FOS = types.feature_online_store_v1.FeatureOnlineStore(
name=_TEST_PSC_OPTIMIZED_FOS_PATH,
optimized=types.feature_online_store_v1.FeatureOnlineStore.Optimized(),
dedicated_serving_endpoint=types.feature_online_store_v1.FeatureOnlineStore.DedicatedServingEndpoint(),
dedicated_serving_endpoint=types.feature_online_store_v1.FeatureOnlineStore.DedicatedServingEndpoint(
private_service_connect_config=types.service_networking_v1.PrivateServiceConnectConfig(
enable_private_service_connect=True,
project_allowlist=_TEST_PSC_PROJECT_ALLOWLIST,
),
),
labels=_TEST_PSC_OPTIMIZED_FOS_LABELS,
)

Expand Down
157 changes: 121 additions & 36 deletions tests/unit/vertexai/test_feature_online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,61 +15,61 @@
# limitations under the License.
#

from unittest.mock import call
import re
from typing import Dict
from unittest import mock
from unittest.mock import call
from unittest.mock import patch
from typing import Dict

from google.api_core import operation as ga_operation
from google.cloud import aiplatform
from google.cloud.aiplatform import base
from google.cloud.aiplatform.compat import types
from vertexai.resources.preview import (
FeatureOnlineStore,
FeatureOnlineStoreType,
FeatureViewBigQuerySource,
IndexConfig,
DistanceMeasureType,
TreeAhConfig,
)
from vertexai.resources.preview.feature_store import (
feature_online_store,
)
from google.cloud.aiplatform.compat.services import (
feature_online_store_admin_service_client,
)
import pytest

from test_feature_view import fv_eq
from feature_store_constants import (
_TEST_PROJECT,
_TEST_LOCATION,
_TEST_PARENT,
_TEST_BIGTABLE_FOS1_ID,
_TEST_BIGTABLE_FOS1_PATH,
_TEST_BIGTABLE_FOS1_LABELS,
_TEST_BIGTABLE_FOS1_PATH,
_TEST_BIGTABLE_FOS2_ID,
_TEST_BIGTABLE_FOS2_PATH,
_TEST_BIGTABLE_FOS2_LABELS,
_TEST_BIGTABLE_FOS2_PATH,
_TEST_BIGTABLE_FOS3_ID,
_TEST_BIGTABLE_FOS3_PATH,
_TEST_BIGTABLE_FOS3_LABELS,
_TEST_BIGTABLE_FOS3_PATH,
_TEST_ESF_OPTIMIZED_FOS_ID,
_TEST_ESF_OPTIMIZED_FOS_PATH,
_TEST_ESF_OPTIMIZED_FOS_LABELS,
_TEST_PSC_OPTIMIZED_FOS_ID,
_TEST_PSC_OPTIMIZED_FOS_LABELS,
_TEST_PSC_PROJECT_ALLOWLIST,
_TEST_ESF_OPTIMIZED_FOS_PATH,
_TEST_FOS_LIST,
_TEST_FV1_ID,
_TEST_FV1_PATH,
_TEST_FV1_LABELS,
_TEST_FV1_BQ_URI,
_TEST_FV1_ENTITY_ID_COLUMNS,
_TEST_FV1_ID,
_TEST_FV1_LABELS,
_TEST_FV1_PATH,
_TEST_LOCATION,
_TEST_OPTIMIZED_EMBEDDING_FV_ID,
_TEST_OPTIMIZED_EMBEDDING_FV_PATH,
_TEST_PARENT,
_TEST_PROJECT,
_TEST_PSC_OPTIMIZED_FOS_ID,
_TEST_PSC_OPTIMIZED_FOS_LABELS,
_TEST_PSC_OPTIMIZED_FOS_PATH,
_TEST_PSC_PROJECT_ALLOWLIST,
)
from test_feature_view import fv_eq
from vertexai.resources.preview import (
DistanceMeasureType,
FeatureOnlineStore,
FeatureOnlineStoreType,
FeatureViewBigQuerySource,
IndexConfig,
TreeAhConfig,
)
from vertexai.resources.preview.feature_store import (
feature_online_store,
)
import pytest


@pytest.fixture
Expand Down Expand Up @@ -277,24 +277,109 @@ def test_create_esf_optimized_store(
)


@pytest.mark.parametrize("create_request_timeout", [None, 1.0])
def test_create_psc_optimized_store(
create_request_timeout,
):
def test_create_psc_optimized_store_no_project_allowlist_raises_error():
aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION)

with pytest.raises(
ValueError,
match=re.escape("private_service_connect is not supported"),
match=re.escape(
"`project_allowlist` cannot be empty when `enable_private_service_connect` is"
" set to true."
),
):
FeatureOnlineStore.create_optimized_store(
_TEST_PSC_OPTIMIZED_FOS_ID,
labels=_TEST_PSC_OPTIMIZED_FOS_LABELS,
create_request_timeout=create_request_timeout,
enable_private_service_connect=True,
project_allowlist=_TEST_PSC_PROJECT_ALLOWLIST,
)


def test_create_psc_optimized_store_empty_project_allowlist_raises_error():
aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION)

with pytest.raises(
ValueError,
match=re.escape(
"`project_allowlist` cannot be empty when `enable_private_service_connect` is"
" set to true."
),
):
FeatureOnlineStore.create_optimized_store(
_TEST_PSC_OPTIMIZED_FOS_ID,
enable_private_service_connect=True,
project_allowlist=[],
)


@pytest.mark.parametrize("create_request_timeout", [None, 1.0])
@pytest.mark.parametrize("sync", [True, False])
def test_create_psc_optimized_store(
create_psc_optimized_fos_mock,
get_psc_optimized_fos_mock,
fos_logger_mock,
create_request_timeout,
sync,
):
aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION)
fos = FeatureOnlineStore.create_optimized_store(
_TEST_PSC_OPTIMIZED_FOS_ID,
labels=_TEST_PSC_OPTIMIZED_FOS_LABELS,
create_request_timeout=create_request_timeout,
enable_private_service_connect=True,
project_allowlist=_TEST_PSC_PROJECT_ALLOWLIST,
)

if not sync:
fos.wait()

expected_feature_online_store = types.feature_online_store_v1.FeatureOnlineStore(
optimized=types.feature_online_store_v1.FeatureOnlineStore.Optimized(),
dedicated_serving_endpoint=types.feature_online_store_v1.FeatureOnlineStore.DedicatedServingEndpoint(
private_service_connect_config=types.service_networking_v1.PrivateServiceConnectConfig(
enable_private_service_connect=True,
project_allowlist=_TEST_PSC_PROJECT_ALLOWLIST,
)
),
labels=_TEST_PSC_OPTIMIZED_FOS_LABELS,
)
create_psc_optimized_fos_mock.assert_called_once_with(
parent=_TEST_PARENT,
feature_online_store=expected_feature_online_store,
feature_online_store_id=_TEST_PSC_OPTIMIZED_FOS_ID,
metadata=(),
timeout=create_request_timeout,
)

fos_logger_mock.assert_has_calls(
[
call("Creating FeatureOnlineStore"),
call(
"Create FeatureOnlineStore backing LRO:"
f" {create_psc_optimized_fos_mock.return_value.operation.name}"
),
call(
"FeatureOnlineStore created. Resource name:"
" projects/test-project/locations/us-central1/featureOnlineStores/my_psc_optimized_fos"
),
call("To use this FeatureOnlineStore in another session:"),
call(
"feature_online_store ="
" aiplatform.FeatureOnlineStore('projects/test-project/locations/us-central1/featureOnlineStores/my_psc_optimized_fos')"
),
]
)

fos_eq(
fos,
name=_TEST_PSC_OPTIMIZED_FOS_ID,
resource_name=_TEST_PSC_OPTIMIZED_FOS_PATH,
project=_TEST_PROJECT,
location=_TEST_LOCATION,
labels=_TEST_PSC_OPTIMIZED_FOS_LABELS,
type=FeatureOnlineStoreType.OPTIMIZED,
)


def test_list(list_fos_mock):
aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION)

Expand Down
33 changes: 28 additions & 5 deletions vertexai/resources/preview/feature_store/feature_online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
from google.cloud.aiplatform.compat.types import (
feature_online_store as gca_feature_online_store,
service_networking as gca_service_networking,
feature_view as gca_feature_view,
)
from vertexai.resources.preview.feature_store.feature_view import (
Expand Down Expand Up @@ -245,18 +246,30 @@ def create_optimized_store(
Example Usage:
```
# Create optimized store with public endpoint.
my_fos = vertexai.preview.FeatureOnlineStore.create_optimized_store('my_fos')
```
```
# Create optimized online store with private service connect.
my_fos = vertexai.preview.FeatureOnlineStore.create_optimized_store(
'my_fos',
enable_private_service_connect=True,
project_allowlist=['my-project'],
)
```
Args:
name: The name of the feature online store.
enable_private_service_connect (bool):
enable_private_service_connect:
Optional. If true, expose the optimized online store
via private service connect. Otherwise the optimized online
store will be accessible through public endpoint
project_allowlist (MutableSequence[str]):
store will be accessible through public endpoint.
project_allowlist:
A list of Projects from which the forwarding
rule will target the service attachment. Only needed when
enable_private_service_connect is set to true.
`enable_private_service_connect` is set to true.
labels:
The labels with user-defined metadata to organize your feature
online store. Label keys and values can be no longer than 64
Expand Down Expand Up @@ -290,7 +303,17 @@ def create_optimized_store(
FeatureOnlineStore - the FeatureOnlineStore resource object.
"""
if enable_private_service_connect:
raise ValueError("private_service_connect is not supported")
if not project_allowlist:
raise ValueError(
"`project_allowlist` cannot be empty when `enable_private_service_connect` is set to true."
)

dedicated_serving_endpoint = gca_feature_online_store.FeatureOnlineStore.DedicatedServingEndpoint(
private_service_connect_config=gca_service_networking.PrivateServiceConnectConfig(
enable_private_service_connect=True,
project_allowlist=project_allowlist,
),
)
else:
dedicated_serving_endpoint = (
gca_feature_online_store.FeatureOnlineStore.DedicatedServingEndpoint()
Expand Down

0 comments on commit 659ba3f

Please sign in to comment.