From d036bb239919113ebe722efb3ac329cc06407883 Mon Sep 17 00:00:00 2001 From: Qing Lan Date: Tue, 19 Sep 2023 10:34:23 -0700 Subject: [PATCH] [Handler] formalize all engines with same settings (#1077) --- engines/python/setup/djl_python/deepspeed.py | 124 ++++++++++-------- .../python/setup/djl_python/huggingface.py | 4 +- 2 files changed, 73 insertions(+), 55 deletions(-) diff --git a/engines/python/setup/djl_python/deepspeed.py b/engines/python/setup/djl_python/deepspeed.py index e657d38aa..b34a04932 100644 --- a/engines/python/setup/djl_python/deepspeed.py +++ b/engines/python/setup/djl_python/deepspeed.py @@ -21,6 +21,8 @@ AutoModelForTokenClassification, pipeline, Conversation, SquadExample) import deepspeed + +from djl_python.encode_decode import decode, encode from djl_python.inputs import Input from djl_python.outputs import Output from djl_python.streaming_utils import StreamingUtils @@ -313,52 +315,64 @@ def format_input_for_task(self, input_values): batch_inputs += [current_input] return batch_inputs - def inference(self, inputs: Input): + def parse_input(self, inputs): input_data = [] input_size = [] - model_kwargs = {} + parameters = [] + errors = {} batch = inputs.get_batches() - # TODO: deal with batch specific issues - content_type = batch[0].get_property("Content-Type") - if content_type is not None and content_type.startswith( - "application/json"): - first = True - for item in batch: - json_input = item.get_as_json() - if isinstance(json_input, dict): - input_size.append(len(json_input.get("inputs"))) - input_data.extend( - self.format_input_for_task(json_input.pop("inputs"))) - if first: - model_kwargs = json_input.pop("parameters", {}) - first = False - else: - if model_kwargs != json_input.pop("parameters", {}): - return Output().error( - "In order to enable dynamic batching, all input batches must have the same parameters" - ) + first = True + for i, item in enumerate(batch): + try: + content_type = item.get_property("Content-Type") + input_map = decode(item, content_type) + _inputs = input_map.pop("inputs", input_map) + if first: + parameters.append(input_map.pop("parameters", {})) + first = False else: - input_size.append(len(json_input)) - input_data.extend(json_input) - else: - for item in batch: - input_size.append(1) - input_data.extend(item.get_as_string()) + param = input_map.pop("parameters", {}) + if parameters[0] != param: + logging.warning( + f"expected param: {parameters}, actual: {param}") + raise ValueError( + "In order to enable dynamic batching, all input batches must have the same parameters" + ) + if isinstance(_inputs, list): + input_data.extend(_inputs) + input_size.append(len(_inputs)) + else: + input_data.append(_inputs) + input_size.append(1) + except Exception as e: # pylint: disable=broad-except + logging.exception(f"Parse input failed: {i}") + errors[i] = str(e) + + return input_data, input_size, parameters, errors, batch + + def inference(self, inputs: Input): + + input_data, input_size, parameters, errors, batch = self.parse_input( + inputs) + parameters = parameters[0] outputs = Output() if self.enable_streaming: + if batch > 1: + raise NotImplementedError( + "Dynamic batch not supported for generic streaming") outputs.add_property("content-type", "application/jsonlines") if self.enable_streaming == "huggingface": outputs.add_stream_content( StreamingUtils.use_hf_default_streamer( self.model, self.tokenizer, input_data, self.device, - **model_kwargs)) + **parameters)) else: stream_generator = StreamingUtils.get_stream_generator( "DeepSpeed") outputs.add_stream_content( stream_generator(self.model, self.tokenizer, input_data, - self.device, **model_kwargs)) + self.device, **parameters)) return outputs if self.task == "text-generation": tokenized_inputs = self.tokenizer(input_data, @@ -369,33 +383,35 @@ def inference(self, inputs: Input): output_tokens = self.model.generate( input_ids=tokenized_inputs.input_ids, attention_mask=tokenized_inputs.attention_mask, - **model_kwargs) - generated_text = self.tokenizer.batch_decode( - output_tokens, skip_special_tokens=True) - outputs.add_property("content-type", "application/json") - offset = 0 - for i in range(inputs.get_batch_size()): - result = [{ - "generated_text": s - } for s in generated_text[offset:offset + input_size[i]]] - outputs.add(result, key=inputs.get_content().key_at(i)) - offset += input_size[i] - return outputs + **parameters) + prediction = self.tokenizer.batch_decode(output_tokens, + skip_special_tokens=True) + else: + prediction = self.pipeline(input_data, **parameters) - result = self.pipeline(input_data, **model_kwargs) offset = 0 - for i in range(inputs.get_batch_size()): - res = result[offset:offset + input_size[i]] - if self.task == "conversational": - res = [{ - "generated_text": s.generated_responses[-1], - "conversation": { - "past_user_inputs": s.past_user_inputs, - "generated_responses": s.generated_responses, - }, - } for s in res] - outputs.add(res, key=inputs.get_content().key_at(i)) - offset += input_size[i] + for i, item in enumerate(batch): + content_type = item.get_property("Content-Type") + accept = item.get_property("Accept") + if not accept: + content_type = content_type if content_type else "application/json" + accept = content_type if content_type.startswith( + "tensor/") else "application/json" + elif "*/*" in accept: + accept = "application/json" + + err = errors.get(i) + if err: + encode(outputs, + err, + accept, + key=inputs.get_content().key_at(i)) + else: + encode(outputs, + prediction[offset:offset + input_size[i]], + accept, + key=inputs.get_content().key_at(i)) + offset += input_size[i] outputs.add_property("content-type", "application/json") diff --git a/engines/python/setup/djl_python/huggingface.py b/engines/python/setup/djl_python/huggingface.py index 97cf51b88..491108da4 100644 --- a/engines/python/setup/djl_python/huggingface.py +++ b/engines/python/setup/djl_python/huggingface.py @@ -281,7 +281,9 @@ def inference(self, inputs): outputs.add_property("content-type", content_type) return outputs elif self.enable_streaming: - # TODO support dynamic batch + if batch > 1: + raise NotImplementedError( + "Dynamic batch not supported for generic streaming") outputs.add_property("content-type", "application/jsonlines") if self.enable_streaming == "huggingface": outputs.add_stream_content(