From 7076fe0483de50af21fe7d7e7da192823f66c3da Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Tue, 19 Apr 2022 15:01:38 -0400 Subject: [PATCH] fix: Fix DynamoDB fetches when there are entities that are not found (#2573) * fix: Fix DynamoDB fetches when there are entities that are not found Signed-off-by: Danny Chiao * remove sort_keys from dynamo since they must be sorted. Add better test for different unknowns Signed-off-by: Danny Chiao --- .../feast/infra/online_stores/dynamodb.py | 28 ++++++------- .../feast/infra/online_stores/online_store.py | 6 +-- .../test_dynamodb_online_store.py | 42 +++++++++++++++++-- 3 files changed, 55 insertions(+), 21 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 01562ad900..406bee525f 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -59,9 +59,6 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): region: StrictStr """AWS Region Name""" - sort_response: bool = True - """Whether or not to sort BatchGetItem response.""" - table_name_template: StrictStr = "{project}.{table_name}" """DynamoDB table name template""" @@ -204,9 +201,6 @@ def online_read( """ Retrieve feature values from the online DynamoDB store. - Note: This method is currently not optimized to retrieve a lot of data at a time - as it does sequential gets from the DynamoDB table. - Args: config: The RepoConfig for the current FeatureStore. table: Feast FeatureView. @@ -224,7 +218,6 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys] batch_size = online_config.batch_size - sort_response = online_config.sort_response entity_ids_iter = iter(entity_ids) while True: batch = list(itertools.islice(entity_ids_iter, batch_size)) @@ -243,20 +236,27 @@ def online_read( response = response.get("Responses") table_responses = response.get(table_instance.name) if table_responses: - if sort_response: - table_responses = self._sort_dynamodb_response( - table_responses, entity_ids - ) + table_responses = self._sort_dynamodb_response( + table_responses, entity_ids + ) + entity_idx = 0 for tbl_res in table_responses: + entity_id = tbl_res["entity_id"] + while entity_id != batch[entity_idx]: + result.append((None, None)) + entity_idx += 1 res = {} for feature_name, value_bin in tbl_res["values"].items(): val = ValueProto() val.ParseFromString(value_bin.value) res[feature_name] = val result.append((datetime.fromisoformat(tbl_res["event_ts"]), res)) - else: - batch_size_nones = ((None, None),) * len(batch) - result.extend(batch_size_nones) + entity_idx += 1 + + # Not all entities in a batch may have responses + # Pad with remaining values in batch that were not found + batch_size_nones = ((None, None),) * (len(batch) - len(result)) + result.extend(batch_size_nones) return result def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None): diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 1f177996de..04c6a065fb 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -76,9 +76,9 @@ def online_read( entity_keys: a list of entity keys that should be read from the FeatureStore. requested_features: (Optional) A subset of the features that should be read from the FeatureStore. Returns: - Data is returned as a list, one item per entity key. Each item in the list is a tuple - of event_ts for the row, and the feature data as a dict from feature names to values. - Values are returned as Value proto message. + Data is returned as a list, one item per entity key in the original order as the entity_keys argument. + Each item in the list is a tuple of event_ts for the row, and the feature data as a dict from feature names + to values. Values are returned as Value proto message. """ ... diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py index 7d6da0dc06..e1be890e57 100644 --- a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py @@ -11,6 +11,8 @@ DynamoDBOnlineStoreConfig, DynamoDBTable, ) +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import RepoConfig from tests.utils.online_store_utils import ( _create_n_customer_test_samples, @@ -49,7 +51,6 @@ def test_online_store_config_default(): assert dynamodb_store_config.batch_size == 40 assert dynamodb_store_config.endpoint_url is None assert dynamodb_store_config.region == aws_region - assert dynamodb_store_config.sort_response is True assert dynamodb_store_config.table_name_template == "{project}.{table_name}" @@ -70,20 +71,17 @@ def test_online_store_config_custom_params(): aws_region = "us-west-2" batch_size = 20 endpoint_url = "http://localhost:8000" - sort_response = False table_name_template = "feast_test.dynamodb_table" dynamodb_store_config = DynamoDBOnlineStoreConfig( region=aws_region, batch_size=batch_size, endpoint_url=endpoint_url, - sort_response=sort_response, table_name_template=table_name_template, ) assert dynamodb_store_config.type == "dynamodb" assert dynamodb_store_config.batch_size == batch_size assert dynamodb_store_config.endpoint_url == endpoint_url assert dynamodb_store_config.region == aws_region - assert dynamodb_store_config.sort_response == sort_response assert dynamodb_store_config.table_name_template == table_name_template @@ -175,6 +173,42 @@ def test_online_read(repo_config, n_samples): assert [item[1] for item in returned_items] == list(features) +@mock_dynamodb2 +def test_online_read_unknown_entity(repo_config): + """Test DynamoDBOnlineStore online_read method.""" + n_samples = 2 + _create_test_table(PROJECT, f"{TABLE_NAME}_{n_samples}", REGION) + data = _create_n_customer_test_samples(n=n_samples) + _insert_data_test_table(data, PROJECT, f"{TABLE_NAME}_{n_samples}", REGION) + + entity_keys, features, *rest = zip(*data) + # Append a nonsensical entity to search for + entity_keys = list(entity_keys) + features = list(features) + dynamodb_store = DynamoDBOnlineStore() + + # Have the unknown entity be in the beginning, middle, and end of the list of entities. + for pos in range(len(entity_keys)): + entity_keys_with_unknown = deepcopy(entity_keys) + entity_keys_with_unknown.insert( + pos, + EntityKeyProto( + join_keys=["customer"], entity_values=[ValueProto(string_val="12359")] + ), + ) + features_with_none = deepcopy(features) + features_with_none.insert(pos, None) + returned_items = dynamodb_store.online_read( + config=repo_config, + table=MockFeatureView(name=f"{TABLE_NAME}_{n_samples}"), + entity_keys=entity_keys_with_unknown, + ) + assert len(returned_items) == len(entity_keys_with_unknown) + assert [item[1] for item in returned_items] == list(features_with_none) + # The order should match the original entity key order + assert returned_items[pos] == (None, None) + + @mock_dynamodb2 def test_write_batch_non_duplicates(repo_config): """Test DynamoDBOnline Store deduplicate write batch request items."""