Skip to content

Commit

Permalink
[Handler] formalize all engines with same settings (#1077)
Browse files Browse the repository at this point in the history
  • Loading branch information
Qing Lan authored Sep 19, 2023
1 parent eeb3db1 commit d036bb2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 55 deletions.
124 changes: 70 additions & 54 deletions engines/python/setup/djl_python/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
AutoModelForTokenClassification, pipeline,
Conversation, SquadExample)
import deepspeed

from djl_python.encode_decode import decode, encode
from djl_python.inputs import Input
from djl_python.outputs import Output
from djl_python.streaming_utils import StreamingUtils
Expand Down Expand Up @@ -313,52 +315,64 @@ def format_input_for_task(self, input_values):
batch_inputs += [current_input]
return batch_inputs

def inference(self, inputs: Input):
def parse_input(self, inputs):
input_data = []
input_size = []
model_kwargs = {}
parameters = []
errors = {}
batch = inputs.get_batches()
# TODO: deal with batch specific issues
content_type = batch[0].get_property("Content-Type")
if content_type is not None and content_type.startswith(
"application/json"):
first = True
for item in batch:
json_input = item.get_as_json()
if isinstance(json_input, dict):
input_size.append(len(json_input.get("inputs")))
input_data.extend(
self.format_input_for_task(json_input.pop("inputs")))
if first:
model_kwargs = json_input.pop("parameters", {})
first = False
else:
if model_kwargs != json_input.pop("parameters", {}):
return Output().error(
"In order to enable dynamic batching, all input batches must have the same parameters"
)
first = True
for i, item in enumerate(batch):
try:
content_type = item.get_property("Content-Type")
input_map = decode(item, content_type)
_inputs = input_map.pop("inputs", input_map)
if first:
parameters.append(input_map.pop("parameters", {}))
first = False
else:
input_size.append(len(json_input))
input_data.extend(json_input)
else:
for item in batch:
input_size.append(1)
input_data.extend(item.get_as_string())
param = input_map.pop("parameters", {})
if parameters[0] != param:
logging.warning(
f"expected param: {parameters}, actual: {param}")
raise ValueError(
"In order to enable dynamic batching, all input batches must have the same parameters"
)
if isinstance(_inputs, list):
input_data.extend(_inputs)
input_size.append(len(_inputs))
else:
input_data.append(_inputs)
input_size.append(1)
except Exception as e: # pylint: disable=broad-except
logging.exception(f"Parse input failed: {i}")
errors[i] = str(e)

return input_data, input_size, parameters, errors, batch

def inference(self, inputs: Input):

input_data, input_size, parameters, errors, batch = self.parse_input(
inputs)
parameters = parameters[0]

outputs = Output()
if self.enable_streaming:
if batch > 1:
raise NotImplementedError(
"Dynamic batch not supported for generic streaming")
outputs.add_property("content-type", "application/jsonlines")
if self.enable_streaming == "huggingface":
outputs.add_stream_content(
StreamingUtils.use_hf_default_streamer(
self.model, self.tokenizer, input_data, self.device,
**model_kwargs))
**parameters))
else:
stream_generator = StreamingUtils.get_stream_generator(
"DeepSpeed")
outputs.add_stream_content(
stream_generator(self.model, self.tokenizer, input_data,
self.device, **model_kwargs))
self.device, **parameters))
return outputs
if self.task == "text-generation":
tokenized_inputs = self.tokenizer(input_data,
Expand All @@ -369,33 +383,35 @@ def inference(self, inputs: Input):
output_tokens = self.model.generate(
input_ids=tokenized_inputs.input_ids,
attention_mask=tokenized_inputs.attention_mask,
**model_kwargs)
generated_text = self.tokenizer.batch_decode(
output_tokens, skip_special_tokens=True)
outputs.add_property("content-type", "application/json")
offset = 0
for i in range(inputs.get_batch_size()):
result = [{
"generated_text": s
} for s in generated_text[offset:offset + input_size[i]]]
outputs.add(result, key=inputs.get_content().key_at(i))
offset += input_size[i]
return outputs
**parameters)
prediction = self.tokenizer.batch_decode(output_tokens,
skip_special_tokens=True)
else:
prediction = self.pipeline(input_data, **parameters)

result = self.pipeline(input_data, **model_kwargs)
offset = 0
for i in range(inputs.get_batch_size()):
res = result[offset:offset + input_size[i]]
if self.task == "conversational":
res = [{
"generated_text": s.generated_responses[-1],
"conversation": {
"past_user_inputs": s.past_user_inputs,
"generated_responses": s.generated_responses,
},
} for s in res]
outputs.add(res, key=inputs.get_content().key_at(i))
offset += input_size[i]
for i, item in enumerate(batch):
content_type = item.get_property("Content-Type")
accept = item.get_property("Accept")
if not accept:
content_type = content_type if content_type else "application/json"
accept = content_type if content_type.startswith(
"tensor/") else "application/json"
elif "*/*" in accept:
accept = "application/json"

err = errors.get(i)
if err:
encode(outputs,
err,
accept,
key=inputs.get_content().key_at(i))
else:
encode(outputs,
prediction[offset:offset + input_size[i]],
accept,
key=inputs.get_content().key_at(i))
offset += input_size[i]

outputs.add_property("content-type", "application/json")

Expand Down
4 changes: 3 additions & 1 deletion engines/python/setup/djl_python/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ def inference(self, inputs):
outputs.add_property("content-type", content_type)
return outputs
elif self.enable_streaming:
# TODO support dynamic batch
if batch > 1:
raise NotImplementedError(
"Dynamic batch not supported for generic streaming")
outputs.add_property("content-type", "application/jsonlines")
if self.enable_streaming == "huggingface":
outputs.add_stream_content(
Expand Down

0 comments on commit d036bb2

Please sign in to comment.