Skip to content

Commit

Permalink
Copybara import of the project:
Browse files Browse the repository at this point in the history
--
2504858 by Bryan Smith <bryanesmith@gmail.com>:

add bq_dataset_id parameter to batch_serve_to_df, with unit test

--
d6e20f6 by Bryan Smith <bryanesmith@gmail.com>:

applied blacken formatting

--
7020556 by Bryan Smith <bryanesmith@gmail.com>:

doc strings for helper methods

--
a8f052e by Bryan Smith <bryanesmith@gmail.com>:

remove the timestamp column fix

COPYBARA_INTEGRATE_REVIEW=#1623 from bryanesmith:feature_bq_dataset_id 4f4762d
PiperOrigin-RevId: 485654375
  • Loading branch information
bryanesmith authored and copybara-github committed Nov 2, 2022
1 parent 7a4bfbe commit bb72562
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 23 deletions.
112 changes: 89 additions & 23 deletions google/cloud/aiplatform/featurestore/featurestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,7 @@ def batch_serve_to_df(
feature_destination_fields: Optional[Dict[str, str]] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
serve_request_timeout: Optional[float] = None,
bq_dataset_id: Optional[str] = None,
) -> "pd.DataFrame": # noqa: F821 - skip check for undefined name 'pd'
"""Batch serves feature values to pandas DataFrame
Expand Down Expand Up @@ -1176,6 +1177,11 @@ def batch_serve_to_df(
serve_request_timeout (float):
Optional. The timeout for the serve request in seconds.
bq_dataset_id (str):
Optional. The full dataset ID for the BigQuery dataset to use
for temporarily staging data. If specified, caller must have
`bigquery.tables.create` permissions for Dataset.
Returns:
pd.DataFrame: The pandas DataFrame containing feature values from batch serving.
Expand Down Expand Up @@ -1210,34 +1216,43 @@ def batch_serve_to_df(

self.wait()
featurestore_name_components = self._parse_resource_name(self.resource_name)
featurestore_id = featurestore_name_components["featurestore"]

temp_bq_dataset_name = f"temp_{featurestore_id}_{uuid.uuid4()}".replace(
"-", "_"
)

project_id = resource_manager_utils.get_project_id(
project_number=featurestore_name_components["project"],
credentials=self.credentials,
)
temp_bq_dataset_id = f"{project_id}.{temp_bq_dataset_name}"[:1024]
temp_bq_dataset = bigquery.Dataset(dataset_ref=temp_bq_dataset_id)
temp_bq_dataset.location = self.location
temp_bq_dataset = bigquery_client.create_dataset(temp_bq_dataset)
# if user didn't specify BigQuery dataset, create an ephemeral one
if bq_dataset_id is None:
temp_bq_full_dataset_id = self._get_ephemeral_bq_full_dataset_id(
featurestore_name_components["featurestore"],
featurestore_name_components["project"],
)
temp_bq_dataset = self._create_ephemeral_bq_dataset(
bigquery_client, temp_bq_full_dataset_id
)
temp_bq_batch_serve_table_name = "batch_serve"
temp_bq_read_instances_table_name = "read_instances"

# if user specified BigQuery dataset, create ephemeral tables
else:
temp_bq_full_dataset_id = bq_dataset_id
temp_bq_dataset = bigquery.Dataset(dataset_ref=temp_bq_full_dataset_id)
temp_bq_batch_serve_table_name = f"tmp_batch_serve_{uuid.uuid4()}".replace(
"-", "_"
)
temp_bq_read_instances_table_name = (
f"tmp_read_instances_{uuid.uuid4()}".replace("-", "_")
)

temp_bq_batch_serve_table_name = "batch_serve"
temp_bq_read_instances_table_name = "read_instances"
temp_bq_batch_serve_table_id = (
f"{temp_bq_dataset_id}.{temp_bq_batch_serve_table_name}"
f"{temp_bq_full_dataset_id}.{temp_bq_batch_serve_table_name}"
)

temp_bq_read_instances_table_id = (
f"{temp_bq_dataset_id}.{temp_bq_read_instances_table_name}"
f"{temp_bq_full_dataset_id}.{temp_bq_read_instances_table_name}"
)

try:

job = bigquery_client.load_table_from_dataframe(
dataframe=read_instances_df, destination=temp_bq_read_instances_table_id
dataframe=read_instances_df,
destination=temp_bq_read_instances_table_id,
)
job.result()

Expand All @@ -1259,7 +1274,7 @@ def batch_serve_to_df(
read_session=bigquery_storage.types.ReadSession(
table="projects/{project}/datasets/{dataset}/tables/{table}".format(
project=self.project,
dataset=temp_bq_dataset_name,
dataset=temp_bq_dataset.dataset_id,
table=temp_bq_batch_serve_table_name,
),
data_format=bigquery_storage.types.DataFormat.ARROW,
Expand All @@ -1273,9 +1288,60 @@ def batch_serve_to_df(
frames.append(message.to_dataframe())

finally:
bigquery_client.delete_dataset(
dataset=temp_bq_dataset.dataset_id,
delete_contents=True,
)
# clean up: if user didn't specify dataset, delete ephemeral dataset
if bq_dataset_id is None:
bigquery_client.delete_dataset(
dataset=temp_bq_dataset.dataset_id,
delete_contents=True,
)

# clean up: if user specified BigQuery dataset, delete ephemeral tables
else:
bigquery_client.delete_table(temp_bq_batch_serve_table_id)
bigquery_client.delete_table(temp_bq_read_instances_table_id)

return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame(frames)

def _get_ephemeral_bq_full_dataset_id(
self, featurestore_id: str, project_number: str
) -> str:
"""Helper method to generate an id for an ephemeral dataset in BigQuery
used to temporarily stage data.
Args:
featurestore_id (str):
Required. The ID to use for this featurestore.
project_number (str):
Required. Project to retrieve featurestore from.
Returns:
str - full BigQuery dataset ID
"""
temp_bq_dataset_name = f"temp_{featurestore_id}_{uuid.uuid4()}".replace(
"-", "_"
)

project_id = resource_manager_utils.get_project_id(
project_number=project_number,
credentials=self.credentials,
)

return f"{project_id}.{temp_bq_dataset_name}"[:1024]

def _create_ephemeral_bq_dataset(
self, bigquery_client: bigquery.Client, dataset_id: str
) -> "bigquery.Dataset":
"""Helper method to create an ephemeral dataset in BigQuery used to
temporarily stage data.
Args:
bigquery_client (bigquery.Client):
Required. BigQuery client to use to generate the BigQuery dataset.
dataset_id (str):
Required. Identifier to use for the BigQuery dataset.
Returns:
bigquery.Dataset - new BigQuery dataset used to temporarily stage data
"""
temp_bq_dataset = bigquery.Dataset(dataset_ref=dataset_id)
temp_bq_dataset.location = self.location

return bigquery_client.create_dataset(temp_bq_dataset)
103 changes: 103 additions & 0 deletions tests/unit/aiplatform/test_featurestores.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,12 @@ def bq_delete_dataset_mock(bq_client_mock):
yield bq_delete_dataset_mock


@pytest.fixture
def bq_delete_table_mock(bq_client_mock):
with patch.object(bq_client_mock, "delete_table") as bq_delete_table_mock:
yield bq_delete_table_mock


@pytest.fixture
def bqs_client_mock():
mock = MagicMock(bigquery_storage.BigQueryReadClient)
Expand Down Expand Up @@ -1701,6 +1707,103 @@ def test_batch_serve_to_df(self, batch_read_feature_values_mock):
timeout=None,
)

@pytest.mark.skipif(
_USE_BQ_STORAGE is False, reason="batch_serve_to_df requires bigquery_storage"
)
@pytest.mark.usefixtures(
"get_featurestore_mock",
"bq_init_client_mock",
"bq_init_dataset_mock",
"bq_create_dataset_mock",
"bq_load_table_from_dataframe_mock",
"bq_delete_dataset_mock",
"bq_delete_table_mock",
"bqs_init_client_mock",
"bqs_create_read_session",
"get_project_mock",
)
@patch("uuid.uuid4", uuid_mock)
def test_batch_serve_to_df_user_specified_bq_dataset(
self,
batch_read_feature_values_mock,
bq_create_dataset_mock,
bq_delete_dataset_mock,
bq_delete_table_mock,
):

aiplatform.init(project=_TEST_PROJECT_DIFF)

my_featurestore = aiplatform.Featurestore(
featurestore_name=_TEST_FEATURESTORE_NAME
)

read_instances_df = pd.DataFrame()

expected_temp_bq_dataset_name = "my_dataset_name"
expected_temp_bq_dataset_id = (
f"{_TEST_PROJECT}.{expected_temp_bq_dataset_name}"[:1024]
)
expected_temp_bq_batch_serve_table_name = (
f"tmp_batch_serve_{uuid.uuid4()}".replace("-", "_")
)
expected_temp_bq_batch_serve_table_id = (
f"{expected_temp_bq_dataset_id}.{expected_temp_bq_batch_serve_table_name}"
)
expected_temp_bq_read_instances_table_name = (
f"tmp_read_instances_{uuid.uuid4()}".replace("-", "_")
)
expected_temp_bq_read_instances_table_id = f"{expected_temp_bq_dataset_id}.{expected_temp_bq_read_instances_table_name}"

expected_entity_type_specs = [
_get_entity_type_spec_proto_with_feature_ids(
entity_type_id="my_entity_type_id_1",
feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"],
),
_get_entity_type_spec_proto_with_feature_ids(
entity_type_id="my_entity_type_id_2",
feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"],
),
]

expected_batch_read_feature_values_request = (
gca_featurestore_service.BatchReadFeatureValuesRequest(
featurestore=my_featurestore.resource_name,
destination=gca_featurestore_service.FeatureValueDestination(
bigquery_destination=gca_io.BigQueryDestination(
output_uri=f"bq://{expected_temp_bq_batch_serve_table_id}"
),
),
entity_type_specs=expected_entity_type_specs,
bigquery_read_instances=gca_io.BigQuerySource(
input_uri=f"bq://{expected_temp_bq_read_instances_table_id}"
),
)
)

my_featurestore.batch_serve_to_df(
serving_feature_ids=_TEST_SERVING_FEATURE_IDS,
read_instances_df=read_instances_df,
serve_request_timeout=None,
bq_dataset_id=expected_temp_bq_dataset_id,
)

batch_read_feature_values_mock.assert_called_once_with(
request=expected_batch_read_feature_values_request,
metadata=_TEST_REQUEST_METADATA,
timeout=None,
)

bq_delete_table_mock.assert_has_calls(
calls=[
mock.call(expected_temp_bq_batch_serve_table_id),
mock.call(expected_temp_bq_read_instances_table_id),
],
any_order=True,
)

bq_create_dataset_mock.assert_not_called()
bq_delete_dataset_mock.assert_not_called()


class TestEntityType:
def setup_method(self):
Expand Down

0 comments on commit bb72562

Please sign in to comment.