Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable linting and formatting for e2e tests #832

Merged
merged 5 commits into from
Jun 28, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,16 @@ test-python:

format-python:
cd ${ROOT_DIR}/sdk/python; isort -rc feast tests
cd ${ROOT_DIR}/tests/e2e; isort -rc .
cd ${ROOT_DIR}/sdk/python; black --target-version py37 feast tests
cd ${ROOT_DIR}/tests/e2e; black --target-version py37 .

lint-python:
cd ${ROOT_DIR}/sdk/python; mypy feast/ tests/
cd ${ROOT_DIR}/sdk/python; flake8 feast/ tests/
cd ${ROOT_DIR}/tests/e2e; flake8 .
cd ${ROOT_DIR}/sdk/python; black --check feast tests
cd ${ROOT_DIR}/tests/e2e; black --check .

# Go SDK

Expand Down
158 changes: 86 additions & 72 deletions tests/e2e/bq/bq-batch-retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@
from google.protobuf.duration_pb2 import Duration
from pandavro import to_avro

import tensorflow_data_validation as tfdv
from bq.testutils import assert_stats_equal, clear_unsupported_fields
from feast.client import Client
from feast.core.CoreService_pb2 import ListStoresRequest
from feast.core.IngestionJob_pb2 import IngestionJobStatus
from feast.entity import Entity
from feast.feature import Feature
from feast.feature_set import FeatureSet
from feast.type_map import ValueType
from google.cloud import storage, bigquery
from google.cloud.storage import Blob
from google.protobuf.duration_pb2 import Duration
from pandavro import to_avro
import tensorflow_data_validation as tfdv
from bq.testutils import *

pd.set_option("display.max_columns", None)

Expand Down Expand Up @@ -184,7 +180,9 @@ def test_batch_get_batch_features_with_file(client):
client.ingest(file_fs1, features_1_df, timeout=480)

# Rename column (datetime -> event_timestamp)
features_1_df['datetime'] + pd.Timedelta(seconds=1) # adds buffer to avoid rounding errors
features_1_df["datetime"] + pd.Timedelta(
seconds=1
) # adds buffer to avoid rounding errors
features_1_df = features_1_df.rename(columns={"datetime": "event_timestamp"})

to_avro(
Expand Down Expand Up @@ -230,7 +228,9 @@ def test_batch_get_batch_features_with_gs_path(client, gcs_path):
client.ingest(gcs_fs1, features_1_df, timeout=360)

# Rename column (datetime -> event_timestamp)
features_1_df['datetime'] + pd.Timedelta(seconds=1) # adds buffer to avoid rounding errors
features_1_df["datetime"] + pd.Timedelta(
seconds=1
) # adds buffer to avoid rounding errors
features_1_df = features_1_df.rename(columns={"datetime": "event_timestamp"})

# Output file to local
Expand Down Expand Up @@ -342,17 +342,22 @@ def check():
feature_refs=["feature_value4"],
project=PROJECT_NAME,
)
output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(by=["entity_id"])
output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(
by=["entity_id"]
)
print(output.head(10))

assert np.allclose(
output["additional_float_col"], entity_df["additional_float_col"]
)
assert (
output["additional_string_col"].to_list()
== entity_df["additional_string_col"].to_list()
output["additional_string_col"].to_list()
== entity_df["additional_string_col"].to_list()
)
assert (
output["feature_value4"].to_list()
== features_df["feature_value4"].to_list()
)
assert output["feature_value4"].to_list() == features_df["feature_value4"].to_list()
clean_up_remote_files(feature_retrieval_job.get_avro_files())

wait_for(check, timedelta(minutes=5))
Expand All @@ -368,11 +373,11 @@ def test_batch_point_in_time_correctness_join(client):
historical_df = pd.DataFrame(
{
"datetime": [
time_offset - timedelta(seconds=50),
time_offset - timedelta(seconds=30),
time_offset - timedelta(seconds=10),
]
* N_EXAMPLES,
time_offset - timedelta(seconds=50),
time_offset - timedelta(seconds=30),
time_offset - timedelta(seconds=10),
]
* N_EXAMPLES,
"entity_id": [i for i in range(N_EXAMPLES) for _ in range(3)],
"feature_value5": ["WRONG", "WRONG", "CORRECT"] * N_EXAMPLES,
}
Expand Down Expand Up @@ -440,10 +445,7 @@ def test_batch_multiple_featureset_joins(client):
def check():
feature_retrieval_job = client.get_batch_features(
entity_rows=entity_df,
feature_refs=[
"feature_value6",
"feature_set_2:other_feature_value7",
],
feature_refs=["feature_value6", "feature_set_2:other_feature_value7"],
project=PROJECT_NAME,
)
output = feature_retrieval_job.to_dataframe(timeout_sec=180)
Expand All @@ -453,7 +455,8 @@ def check():
int(i) for i in output["feature_value6"].to_list()
]
assert (
output["other_entity_id"].to_list() == output["feature_set_2__other_feature_value7"].to_list()
output["other_entity_id"].to_list()
== output["feature_set_2__other_feature_value7"].to_list()
)
clean_up_remote_files(feature_retrieval_job.get_avro_files())

Expand Down Expand Up @@ -516,15 +519,15 @@ def infra_teardown(pytestconfig, core_url, serving_url):
print("Cleaning up not required")


'''
This suite of tests tests the apply feature set - update feature set - retrieve
event sequence. It ensures that when a feature set is updated, tombstoned features
"""
This suite of tests tests the apply feature set - update feature set - retrieve
event sequence. It ensures that when a feature set is updated, tombstoned features
are no longer retrieved, and added features are null for previously ingested
rows.

It is marked separately because of the length of time required
to perform this test, due to bigquery schema caching for streaming writes.
'''
"""


@pytest.fixture(scope="module")
Expand All @@ -546,7 +549,7 @@ def update_featureset_dataframe():
@pytest.mark.fs_update
@pytest.mark.run(order=20)
def test_update_featureset_apply_featureset_and_ingest_first_subset(
client, update_featureset_dataframe
client, update_featureset_dataframe
):
subset_columns = ["datetime", "entity_id", "update_feature1", "update_feature2"]
subset_df = update_featureset_dataframe.iloc[:5][subset_columns]
Expand All @@ -563,18 +566,23 @@ def test_update_featureset_apply_featureset_and_ingest_first_subset(
def check():
feature_retrieval_job = client.get_batch_features(
entity_rows=update_featureset_dataframe[["datetime", "entity_id"]].iloc[:5],
feature_refs=[
"update_feature1",
"update_feature2",
],
project=PROJECT_NAME
feature_refs=["update_feature1", "update_feature2"],
project=PROJECT_NAME,
)

output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(by=["entity_id"])
output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(
by=["entity_id"]
)
print(output.head())

assert output["update_feature1"].to_list() == subset_df["update_feature1"].to_list()
assert output["update_feature2"].to_list() == subset_df["update_feature2"].to_list()
assert (
output["update_feature1"].to_list()
== subset_df["update_feature1"].to_list()
)
assert (
output["update_feature2"].to_list()
== subset_df["update_feature2"].to_list()
)

clean_up_remote_files(feature_retrieval_job.get_avro_files())

Expand All @@ -585,7 +593,7 @@ def check():
@pytest.mark.timeout(600)
@pytest.mark.run(order=21)
def test_update_featureset_update_featureset_and_ingest_second_subset(
client, update_featureset_dataframe
client, update_featureset_dataframe
):
subset_columns = [
"datetime",
Expand Down Expand Up @@ -621,20 +629,27 @@ def test_update_featureset_update_featureset_and_ingest_second_subset(
def check():
feature_retrieval_job = client.get_batch_features(
entity_rows=update_featureset_dataframe[["datetime", "entity_id"]].iloc[5:],
feature_refs=[
"update_feature1",
"update_feature3",
"update_feature4",
],
feature_refs=["update_feature1", "update_feature3", "update_feature4"],
project=PROJECT_NAME,
)

output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(by=["entity_id"])
output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(
by=["entity_id"]
)
print(output.head())

assert output["update_feature1"].to_list() == subset_df["update_feature1"].to_list()
assert output["update_feature3"].to_list() == subset_df["update_feature3"].to_list()
assert output["update_feature4"].to_list() == subset_df["update_feature4"].to_list()
assert (
output["update_feature1"].to_list()
== subset_df["update_feature1"].to_list()
)
assert (
output["update_feature3"].to_list()
== subset_df["update_feature3"].to_list()
)
assert (
output["update_feature4"].to_list()
== subset_df["update_feature4"].to_list()
)
clean_up_remote_files(feature_retrieval_job.get_avro_files())

wait_for(check, timedelta(minutes=5))
Expand Down Expand Up @@ -662,19 +677,17 @@ def test_update_featureset_retrieve_all_fields(client, update_featureset_datafra
def test_update_featureset_retrieve_valid_fields(client, update_featureset_dataframe):
feature_retrieval_job = client.get_batch_features(
entity_rows=update_featureset_dataframe[["datetime", "entity_id"]],
feature_refs=[
"update_feature1",
"update_feature3",
"update_feature4",
],
feature_refs=["update_feature1", "update_feature3", "update_feature4"],
project=PROJECT_NAME,
)
output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(by=["entity_id"])
output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(
by=["entity_id"]
)
clean_up_remote_files(feature_retrieval_job.get_avro_files())
print(output.head(10))
assert (
output["update_feature1"].to_list()
== update_featureset_dataframe["update_feature1"].to_list()
output["update_feature1"].to_list()
== update_featureset_dataframe["update_feature1"].to_list()
)
# we have to convert to float because the column contains np.NaN
assert [math.isnan(i) for i in output["update_feature3"].to_list()[:5]] == [
Expand All @@ -684,8 +697,8 @@ def test_update_featureset_retrieve_valid_fields(client, update_featureset_dataf
float(i) for i in update_featureset_dataframe["update_feature3"].to_list()[5:]
]
assert (
output["update_feature4"].to_list()
== [None] * 5 + update_featureset_dataframe["update_feature4"].to_list()[5:]
output["update_feature4"].to_list()
== [None] * 5 + update_featureset_dataframe["update_feature4"].to_list()[5:]
)


Expand All @@ -697,49 +710,50 @@ def test_batch_dataset_statistics(client):
fs2 = client.get_feature_set(name="feature_set_2")
id_offset = 20

N_ROWS = 21
n_rows = 21
time_offset = datetime.utcnow().replace(tzinfo=pytz.utc)
features_1_df = pd.DataFrame(
{
"datetime": [time_offset] * N_ROWS,
"entity_id": [id_offset + i for i in range(N_ROWS)],
"feature_value6": ["a" for i in range(N_ROWS)],
"datetime": [time_offset] * n_rows,
"entity_id": [id_offset + i for i in range(n_rows)],
"feature_value6": ["a" for i in range(n_rows)],
}
)
ingestion_id1 = client.ingest(fs1, features_1_df)

features_2_df = pd.DataFrame(
{
"datetime": [time_offset] * N_ROWS,
"other_entity_id": [id_offset + i for i in range(N_ROWS)],
"other_feature_value7": [int(i) % 10 for i in range(0, N_ROWS)],
"datetime": [time_offset] * n_rows,
"other_entity_id": [id_offset + i for i in range(n_rows)],
"other_feature_value7": [int(i) % 10 for i in range(0, n_rows)],
}
)
ingestion_id2 = client.ingest(fs2, features_2_df)

entity_df = pd.DataFrame(
{
"datetime": [time_offset] * N_ROWS,
"entity_id": [id_offset + i for i in range(N_ROWS)],
"other_entity_id": [id_offset + i for i in range(N_ROWS)],
"datetime": [time_offset] * n_rows,
"entity_id": [id_offset + i for i in range(n_rows)],
"other_entity_id": [id_offset + i for i in range(n_rows)],
}
)

time.sleep(15) # wait for rows to get written to bq
while True:
rows_ingested1 = get_rows_ingested(client, fs1, ingestion_id1)
rows_ingested2 = get_rows_ingested(client, fs2, ingestion_id2)
if rows_ingested1 == len(features_1_df) and rows_ingested2 == len(features_2_df):
print(f"Number of rows successfully ingested: {rows_ingested1}, {rows_ingested2}. Continuing.")
if rows_ingested1 == len(features_1_df) and rows_ingested2 == len(
features_2_df
):
print(
f"Number of rows successfully ingested: {rows_ingested1}, {rows_ingested2}. Continuing."
)
break
time.sleep(30)

feature_retrieval_job = client.get_batch_features(
entity_rows=entity_df,
feature_refs=[
"feature_value6",
"feature_set_2:other_feature_value7",
],
feature_refs=["feature_value6", "feature_set_2:other_feature_value7"],
project=PROJECT_NAME,
compute_statistics=True,
)
Expand All @@ -765,7 +779,7 @@ def test_batch_dataset_statistics(client):


def get_rows_ingested(
client: Client, feature_set: FeatureSet, ingestion_id: str
client: Client, feature_set: FeatureSet, ingestion_id: str
) -> int:
response = client._core_service_stub.ListStores(
ListStoresRequest(filter=ListStoresRequest.Filter(name="historical"))
Expand Down
Loading