Skip to content

Commit

Permalink
Revert to working tritonserver call in notebook using testbooks (NVID…
Browse files Browse the repository at this point in the history
…IA-Merlin#619)

* revert to now working tritonserver call in notebook using testbooks

* fix convert to triton inputs call to use workflow input schema instead of just column names
  • Loading branch information
jperez999 authored Sep 14, 2022
1 parent 6250172 commit 9013783
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@
"source": [
"\n",
"# create inputs and outputs\n",
"inputs = convert_df_to_triton_input(workflow.input_schema.column_names, batch.fillna(0), grpcclient.InferInput)\n",
"inputs = convert_df_to_triton_input(workflow.input_schema, batch.fillna(0), grpcclient.InferInput)\n",
"output_cols = ensemble.graph.output_schema.column_names\n",
"outputs = [\n",
" grpcclient.InferRequestedOutput(col)\n",
Expand Down Expand Up @@ -372,7 +372,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
"version": "3.9.7"
},
"vscode": {
"interpreter": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@
from testbook import testbook

from tests.conftest import REPO_ROOT
from merlin.core.dispatch import get_lib
from merlin.systems.triton.utils import run_ensemble_on_tritonserver

import pytest

pytest.importorskip("tensorflow")
pytest.importorskip("feast")
pytest.importorskip("faiss")
from merlin.models.loader.tf_utils import configure_tensorflow

# flake8: noqa


Expand Down Expand Up @@ -82,20 +78,26 @@ def test_func():
top_k = tb2.ref("top_k")
outputs = tb2.ref("outputs")
assert outputs[0] == "ordered_ids"

df_lib = get_lib()

# read in data for request
batch = df_lib.read_parquet(
os.path.join("/tmp/data/processed/retrieval/", "train", "part_0.parquet"),
num_rows=1,
columns=["user_id"],
)
configure_tensorflow()

response = run_ensemble_on_tritonserver(
"/tmp/examples/poc_ensemble/", outputs, batch, "ensemble_model"
tb2.inject(
"""
import shutil
from merlin.core.dispatch import get_lib
from merlin.models.loader.tf_utils import configure_tensorflow
configure_tensorflow()
df_lib = get_lib()
batch = df_lib.read_parquet(
os.path.join("/tmp/data/processed/retrieval/", "train", "part_0.parquet"),
num_rows=1,
columns=["user_id"],
)
from merlin.systems.triton.utils import run_ensemble_on_tritonserver
response = run_ensemble_on_tritonserver(
"/tmp/examples/poc_ensemble", ensemble.graph.input_schema, batch, outputs, "ensemble_model"
)
response = [x.tolist()[0] for x in response["ordered_ids"]]
shutil.rmtree("/tmp/examples/", ignore_errors=True)
"""
)
response = response["ordered_ids"]

tb2.execute_cell(NUM_OF_CELLS - 2)
response = tb2.ref("response")
assert len(response) == top_k
39 changes: 21 additions & 18 deletions tests/unit/examples/test_building_deploying_multi_stage_RecSys.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
from testbook import testbook

from tests.conftest import REPO_ROOT
from merlin.core.dispatch import get_lib
from merlin.systems.triton.utils import run_ensemble_on_tritonserver

import pytest

pytest.importorskip("tensorflow")
pytest.importorskip("feast")
pytest.importorskip("faiss")
from merlin.models.loader.tf_utils import configure_tensorflow

# flake8: noqa

Expand Down Expand Up @@ -63,20 +60,26 @@ def test_func():
top_k = tb2.ref("top_k")
outputs = tb2.ref("outputs")
assert outputs[0] == "ordered_ids"

df_lib = get_lib()

# read in data for request
batch = df_lib.read_parquet(
os.path.join("/tmp/data/processed/retrieval/", "train", "part_0.parquet"),
num_rows=1,
columns=["user_id"],
)
configure_tensorflow()

response = run_ensemble_on_tritonserver(
"/tmp/examples/poc_ensemble/", outputs, batch, "ensemble_model"
tb2.inject(
"""
import shutil
from merlin.core.dispatch import get_lib
from merlin.models.loader.tf_utils import configure_tensorflow
configure_tensorflow()
df_lib = get_lib()
batch = df_lib.read_parquet(
os.path.join("/tmp/data/processed/retrieval/", "train", "part_0.parquet"),
num_rows=1,
columns=["user_id"],
)
from merlin.systems.triton.utils import run_ensemble_on_tritonserver
response = run_ensemble_on_tritonserver(
"/tmp/examples/poc_ensemble", ensemble.graph.input_schema, batch, outputs, "ensemble_model"
)
response = [x.tolist()[0] for x in response["ordered_ids"]]
shutil.rmtree("/tmp/examples/", ignore_errors=True)
"""
)
response = response["ordered_ids"]

tb2.execute_cell(NUM_OF_CELLS - 2)
response = tb2.ref("response")
assert len(response) == top_k
40 changes: 16 additions & 24 deletions tests/unit/examples/test_scaling_criteo_merlin_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@
import pytest
from testbook import testbook
from tests.conftest import REPO_ROOT
from merlin.core.dispatch import get_lib

pytest.importorskip("tensorflow")

from merlin.models.loader.tf_utils import configure_tensorflow # noqa: E402
from merlin.systems.triton.utils import run_ensemble_on_tritonserver # noqa: E402


def test_func():
with testbook(
Expand Down Expand Up @@ -84,24 +80,20 @@ def test_func():
)
NUM_OF_CELLS = len(tb3.cells)
tb3.execute_cell(list(range(0, NUM_OF_CELLS - 5)))
input_cols = tb3.ref("input_cols")
outputs = tb3.ref("output_cols")
# read in data for request
df_lib = get_lib()
in_dtypes = {}
for col in input_cols:
if col.startswith("C"):
in_dtypes[col] = "int64"
if col.startswith("I"):
in_dtypes[col] = "float64"
batch = df_lib.read_parquet(
os.path.join("/tmp/output/criteo/", "valid", "part_0.parquet"),
num_rows=3,
columns=input_cols,
)
batch = batch.astype(in_dtypes)
configure_tensorflow()
response = run_ensemble_on_tritonserver(
"/tmp/output/criteo/ensemble/", outputs, batch, "ensemble_model"
tb3.inject(
"""
import shutil
from merlin.systems.triton.utils import run_ensemble_on_tritonserver
outputs = ensemble.graph.output_schema.column_names
response = run_ensemble_on_tritonserver(
"/tmp/output/criteo/ensemble/",workflow.input_schema, batch.fillna(0),
outputs, "ensemble_model"
)
response = [x.tolist()[0] for x in response["label/binary_classification_task"]]
shutil.rmtree("/tmp/input/criteo", ignore_errors=True)
shutil.rmtree("/tmp/output/criteo", ignore_errors=True)
"""
)
assert len(response["label/binary_classification_task"]) == 3
tb3.execute_cell(NUM_OF_CELLS - 4)
response = tb3.ref("response")
assert len(response) == 3

0 comments on commit 9013783

Please sign in to comment.