Skip to content

Commit

Permalink
added split of batched params (#925)
Browse files Browse the repository at this point in the history
fixes #923
  • Loading branch information
saeid93 authored Jan 26, 2023
1 parent 18a54b8 commit 69833dc
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 3 deletions.
57 changes: 54 additions & 3 deletions mlserver/batching/requests.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import defaultdict, OrderedDict
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional, Union, Any, DefaultDict

from ..types import (
InferenceRequest,
Expand All @@ -16,6 +16,22 @@ def _get_data(payload: Union[RequestInput, ResponseOutput]):
return getattr(payload.data, "__root__", payload.data)


def _get_parameters(payload: ResponseOutput) -> DefaultDict[Any, Any]:
parameters = defaultdict(list)
if payload.parameters is not None:
payload_parameters = payload.parameters.dict()
for param_name, param_values in payload_parameters.items():
if param_name in ["content_type", "headers"]:
continue
for param_value in param_values:
parameters[param_name].append(param_value)
if "content_type" in payload_parameters.keys():
parameters["content_type"] = payload_parameters["content_type"]
if "headers" in payload_parameters.keys():
parameters["headers"] = payload_parameters["headers"]
return parameters


def _merge_parameters(
all_params: dict,
parametrised_obj: Union[
Expand Down Expand Up @@ -206,6 +222,10 @@ def _split_response_output(
) -> Dict[str, ResponseOutput]:

all_data = self._split_data(response_output)
if response_output.parameters is not None:
all_parameters = self._split_parameters(response_output)
else:
all_parameters = None
response_outputs = {}
for internal_id, data in all_data.items():
shape = Shape(response_output.shape)
Expand All @@ -215,12 +235,14 @@ def _split_response_output(
shape=shape.to_list(),
data=data,
datatype=response_output.datatype,
parameters=response_output.parameters,
parameters=all_parameters
if all_parameters is None
else all_parameters[internal_id],
)

return response_outputs

def _split_data(self, response_output: ResponseOutput) -> Dict[str, ResponseOutput]:
def _split_data(self, response_output: ResponseOutput) -> Dict[str, Any]:
merged_shape = Shape(response_output.shape)
element_size = merged_shape.elem_size
merged_data = _get_data(response_output)
Expand All @@ -234,3 +256,32 @@ def _split_data(self, response_output: ResponseOutput) -> Dict[str, ResponseOutp
all_data[internal_id] = data

return all_data

def _split_parameters(
self, response_output: ResponseOutput
) -> Dict[str, Parameters]:
merged_parameters = _get_parameters(response_output)
idx = 0

all_parameters = {}
# TODO: Don't rely on array to have been flattened
for internal_id, minibatch_size in self._minibatch_sizes.items():
parameter_args = {}
for parameter_name, parameter_values in merged_parameters.items():
if parameter_name in ["content_type", "headers"]:
continue
try:
parameter_value = parameter_values[idx]
if parameter_value != []:
parameter_args[parameter_name] = str(parameter_value)
except IndexError:
pass
if "content_type" in merged_parameters.keys():
parameter_args["content_type"] = merged_parameters["content_type"]
if "headers" in merged_parameters.keys():
parameter_args["headers"] = merged_parameters["headers"]
parameter_obj = Parameters(**parameter_args)
all_parameters[internal_id] = parameter_obj
idx += minibatch_size

return all_parameters
37 changes: 37 additions & 0 deletions tests/batching/test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,43 @@ def test_merged_request(
@pytest.mark.parametrize(
"minibatch_sizes, response_output, expected",
[
(
{"req-1": 1, "req-2": 1, "req-3": 1},
ResponseOutput(
name="foo",
datatype="INT32",
shape=[3, 3],
data=[1, 2, 3, 4, 5, 6, 7, 8, 9],
parameters=Parameters(
content_type="np",
foo=["foo_1", "foo_2"],
bar=["bar_1", "bar_2", "bar_3"],
),
),
[
ResponseOutput(
name="foo",
datatype="INT32",
shape=[1, 3],
data=[1, 2, 3],
parameters=Parameters(content_type="np", foo="foo_1", bar="bar_1"),
),
ResponseOutput(
name="foo",
datatype="INT32",
shape=[1, 3],
data=[4, 5, 6],
parameters=Parameters(content_type="np", foo="foo_2", bar="bar_2"),
),
ResponseOutput(
name="foo",
datatype="INT32",
shape=[1, 3],
data=[7, 8, 9],
parameters=Parameters(content_type="np", bar="bar_3"),
),
],
),
(
{"req-1": 1, "req-2": 1, "req-3": 1},
ResponseOutput(
Expand Down

0 comments on commit 69833dc

Please sign in to comment.