From 9013783a8bd92a8dda4c2b95d9303a68bc39de63 Mon Sep 17 00:00:00 2001 From: Julio Perez <37191411+jperez999@users.noreply.github.com> Date: Wed, 14 Sep 2022 16:00:01 -0400 Subject: [PATCH] Revert to working tritonserver call in notebook using testbooks (#619) * revert to now working tritonserver call in notebook using testbooks * fix convert to triton inputs call to use workflow input schema instead of just column names --- ...erence-with-Merlin-Models-TensorFlow.ipynb | 4 +- ...i_building_deploying_multi_stage_RecSys.py | 40 ++++++++++--------- ...t_building_deploying_multi_stage_RecSys.py | 39 +++++++++--------- .../test_scaling_criteo_merlin_models.py | 40 ++++++++----------- 4 files changed, 60 insertions(+), 63 deletions(-) diff --git a/examples/scaling-criteo/04-Triton-Inference-with-Merlin-Models-TensorFlow.ipynb b/examples/scaling-criteo/04-Triton-Inference-with-Merlin-Models-TensorFlow.ipynb index d6bae8c0a..e49c54def 100644 --- a/examples/scaling-criteo/04-Triton-Inference-with-Merlin-Models-TensorFlow.ipynb +++ b/examples/scaling-criteo/04-Triton-Inference-with-Merlin-Models-TensorFlow.ipynb @@ -292,7 +292,7 @@ "source": [ "\n", "# create inputs and outputs\n", - "inputs = convert_df_to_triton_input(workflow.input_schema.column_names, batch.fillna(0), grpcclient.InferInput)\n", + "inputs = convert_df_to_triton_input(workflow.input_schema, batch.fillna(0), grpcclient.InferInput)\n", "output_cols = ensemble.graph.output_schema.column_names\n", "outputs = [\n", " grpcclient.InferRequestedOutput(col)\n", @@ -372,7 +372,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.10" + "version": "3.9.7" }, "vscode": { "interpreter": { diff --git a/tests/integration/examples/test_ci_building_deploying_multi_stage_RecSys.py b/tests/integration/examples/test_ci_building_deploying_multi_stage_RecSys.py index 743b08484..e062cdb83 100644 --- a/tests/integration/examples/test_ci_building_deploying_multi_stage_RecSys.py +++ b/tests/integration/examples/test_ci_building_deploying_multi_stage_RecSys.py @@ -3,16 +3,12 @@ from testbook import testbook from tests.conftest import REPO_ROOT -from merlin.core.dispatch import get_lib -from merlin.systems.triton.utils import run_ensemble_on_tritonserver import pytest pytest.importorskip("tensorflow") pytest.importorskip("feast") pytest.importorskip("faiss") -from merlin.models.loader.tf_utils import configure_tensorflow - # flake8: noqa @@ -82,20 +78,26 @@ def test_func(): top_k = tb2.ref("top_k") outputs = tb2.ref("outputs") assert outputs[0] == "ordered_ids" - - df_lib = get_lib() - - # read in data for request - batch = df_lib.read_parquet( - os.path.join("/tmp/data/processed/retrieval/", "train", "part_0.parquet"), - num_rows=1, - columns=["user_id"], - ) - configure_tensorflow() - - response = run_ensemble_on_tritonserver( - "/tmp/examples/poc_ensemble/", outputs, batch, "ensemble_model" + tb2.inject( + """ + import shutil + from merlin.core.dispatch import get_lib + from merlin.models.loader.tf_utils import configure_tensorflow + configure_tensorflow() + df_lib = get_lib() + batch = df_lib.read_parquet( + os.path.join("/tmp/data/processed/retrieval/", "train", "part_0.parquet"), + num_rows=1, + columns=["user_id"], + ) + from merlin.systems.triton.utils import run_ensemble_on_tritonserver + response = run_ensemble_on_tritonserver( + "/tmp/examples/poc_ensemble", ensemble.graph.input_schema, batch, outputs, "ensemble_model" + ) + response = [x.tolist()[0] for x in response["ordered_ids"]] + shutil.rmtree("/tmp/examples/", ignore_errors=True) + """ ) - response = response["ordered_ids"] - + tb2.execute_cell(NUM_OF_CELLS - 2) + response = tb2.ref("response") assert len(response) == top_k diff --git a/tests/unit/examples/test_building_deploying_multi_stage_RecSys.py b/tests/unit/examples/test_building_deploying_multi_stage_RecSys.py index da9a97647..ca790d3ff 100644 --- a/tests/unit/examples/test_building_deploying_multi_stage_RecSys.py +++ b/tests/unit/examples/test_building_deploying_multi_stage_RecSys.py @@ -3,15 +3,12 @@ from testbook import testbook from tests.conftest import REPO_ROOT -from merlin.core.dispatch import get_lib -from merlin.systems.triton.utils import run_ensemble_on_tritonserver import pytest pytest.importorskip("tensorflow") pytest.importorskip("feast") pytest.importorskip("faiss") -from merlin.models.loader.tf_utils import configure_tensorflow # flake8: noqa @@ -63,20 +60,26 @@ def test_func(): top_k = tb2.ref("top_k") outputs = tb2.ref("outputs") assert outputs[0] == "ordered_ids" - - df_lib = get_lib() - - # read in data for request - batch = df_lib.read_parquet( - os.path.join("/tmp/data/processed/retrieval/", "train", "part_0.parquet"), - num_rows=1, - columns=["user_id"], - ) - configure_tensorflow() - - response = run_ensemble_on_tritonserver( - "/tmp/examples/poc_ensemble/", outputs, batch, "ensemble_model" + tb2.inject( + """ + import shutil + from merlin.core.dispatch import get_lib + from merlin.models.loader.tf_utils import configure_tensorflow + configure_tensorflow() + df_lib = get_lib() + batch = df_lib.read_parquet( + os.path.join("/tmp/data/processed/retrieval/", "train", "part_0.parquet"), + num_rows=1, + columns=["user_id"], + ) + from merlin.systems.triton.utils import run_ensemble_on_tritonserver + response = run_ensemble_on_tritonserver( + "/tmp/examples/poc_ensemble", ensemble.graph.input_schema, batch, outputs, "ensemble_model" + ) + response = [x.tolist()[0] for x in response["ordered_ids"]] + shutil.rmtree("/tmp/examples/", ignore_errors=True) + """ ) - response = response["ordered_ids"] - + tb2.execute_cell(NUM_OF_CELLS - 2) + response = tb2.ref("response") assert len(response) == top_k diff --git a/tests/unit/examples/test_scaling_criteo_merlin_models.py b/tests/unit/examples/test_scaling_criteo_merlin_models.py index 3ab36f0c2..ee29c5148 100644 --- a/tests/unit/examples/test_scaling_criteo_merlin_models.py +++ b/tests/unit/examples/test_scaling_criteo_merlin_models.py @@ -3,13 +3,9 @@ import pytest from testbook import testbook from tests.conftest import REPO_ROOT -from merlin.core.dispatch import get_lib pytest.importorskip("tensorflow") -from merlin.models.loader.tf_utils import configure_tensorflow # noqa: E402 -from merlin.systems.triton.utils import run_ensemble_on_tritonserver # noqa: E402 - def test_func(): with testbook( @@ -84,24 +80,20 @@ def test_func(): ) NUM_OF_CELLS = len(tb3.cells) tb3.execute_cell(list(range(0, NUM_OF_CELLS - 5))) - input_cols = tb3.ref("input_cols") - outputs = tb3.ref("output_cols") - # read in data for request - df_lib = get_lib() - in_dtypes = {} - for col in input_cols: - if col.startswith("C"): - in_dtypes[col] = "int64" - if col.startswith("I"): - in_dtypes[col] = "float64" - batch = df_lib.read_parquet( - os.path.join("/tmp/output/criteo/", "valid", "part_0.parquet"), - num_rows=3, - columns=input_cols, - ) - batch = batch.astype(in_dtypes) - configure_tensorflow() - response = run_ensemble_on_tritonserver( - "/tmp/output/criteo/ensemble/", outputs, batch, "ensemble_model" + tb3.inject( + """ + import shutil + from merlin.systems.triton.utils import run_ensemble_on_tritonserver + outputs = ensemble.graph.output_schema.column_names + response = run_ensemble_on_tritonserver( + "/tmp/output/criteo/ensemble/",workflow.input_schema, batch.fillna(0), + outputs, "ensemble_model" + ) + response = [x.tolist()[0] for x in response["label/binary_classification_task"]] + shutil.rmtree("/tmp/input/criteo", ignore_errors=True) + shutil.rmtree("/tmp/output/criteo", ignore_errors=True) + """ ) - assert len(response["label/binary_classification_task"]) == 3 + tb3.execute_cell(NUM_OF_CELLS - 4) + response = tb3.ref("response") + assert len(response) == 3