diff --git a/.github/workflows/rolling_batch_integration.yml b/.github/workflows/rolling_batch_integration.yml index 574544c24..2acc397df 100644 --- a/.github/workflows/rolling_batch_integration.yml +++ b/.github/workflows/rolling_batch_integration.yml @@ -28,7 +28,7 @@ jobs: gpu_instance_id: ${{ steps.create_gpu.outputs.action_g5_instance_id }} - single-gpu-test: + scheduler-single-gpu-test: runs-on: [ self-hosted, g5 ] timeout-minutes: 60 needs: create-runners @@ -98,7 +98,7 @@ jobs: name: rb-single-gpu-logs path: tests/integration/logs/ - multi-gpu-test: + scheduler-multi-gpu-test: runs-on: [ self-hosted, g5 ] timeout-minutes: 60 needs: create-runners @@ -243,7 +243,7 @@ jobs: stop-runners: if: always() runs-on: [ self-hosted, scheduler ] - needs: [ create-runners, single-gpu-test, multi-gpu-test, lmi-dist-test ] + needs: [ create-runners, scheduler-single-gpu-test, scheduler-multi-gpu-test, lmi-dist-test ] steps: - name: Stop all instances run: | diff --git a/engines/python/setup/djl_python/huggingface.py b/engines/python/setup/djl_python/huggingface.py index 4935cd79c..29f605134 100644 --- a/engines/python/setup/djl_python/huggingface.py +++ b/engines/python/setup/djl_python/huggingface.py @@ -173,6 +173,8 @@ def initialize(self, properties: dict): self.device = int(os.getenv("LOCAL_RANK", 0)) _rolling_batch_cls = get_rolling_batch_class_from_str( self.rolling_batch_type, is_mpi, self.model_config) + + # TODO: Allow user to set output formatter self.rolling_batch = _rolling_batch_cls(model_id_or_path, self.device, properties, **kwargs) @@ -236,11 +238,11 @@ def inference(self, inputs): if self.rolling_batch_type: result = self.rolling_batch.inference(input_data, parameters) for i in range(inputs.get_batch_size()): - res = result[i] - encode(outputs, - res, - accept, - key=inputs.get_content().key_at(i)) + outputs.add(result[i], key="data", batch_index=i) + + content_type = self.rolling_batch.get_content_type() + if content_type: + outputs.add_property("content-type", content_type) return outputs elif self.enable_streaming: diff --git a/engines/python/setup/djl_python/rolling_batch/rolling_batch.py b/engines/python/setup/djl_python/rolling_batch/rolling_batch.py index 74f22a8aa..8ac013b9d 100644 --- a/engines/python/setup/djl_python/rolling_batch/rolling_batch.py +++ b/engines/python/setup/djl_python/rolling_batch/rolling_batch.py @@ -10,10 +10,22 @@ # or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" # BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. See the License for # the specific language governing permissions and limitations under the License. +import json import logging from abc import ABC, abstractmethod +def _default_output_formatter(token_texts: list): + """ + Default output formatter + + :return: formatted output + """ + token_texts = {"outputs": token_texts} + json_encoded_str = json.dumps(token_texts) + "\n" + return json_encoded_str + + class Request(object): """ This class represents each request that comes to the handler. @@ -24,16 +36,23 @@ class Request(object): """ - def __init__(self, id: int, input_text: str, parameters: dict): + def __init__(self, + id: int, + input_text: str, + parameters: dict, + output_formatter=_default_output_formatter): """ Initialize a request :param id: request id :param input_text: request's input text + :param parameters: list of parameters + :param output_formatter: output formatter """ self.id = id self.input_text = input_text self.parameters = parameters + self.output_formatter = output_formatter self.next_token = None self.last_token = False @@ -44,7 +63,10 @@ def set_next_token(self, next_token: str, last_token: bool = False): :param next_token: next token to be set. :param last_token: whether this token is the last of the sequence. """ - self.next_token = next_token + if self.output_formatter is None: + self.next_token = next_token + else: # output only supports size one now + self.next_token = self.output_formatter([next_token]) self.last_token = last_token def get_next_token(self) -> str: @@ -63,6 +85,12 @@ def is_last_token(self) -> bool: """ return self.last_token + def get_content_type(self): + # TODO: find a way to return content-type for custom output formatter + if self.output_formatter == _default_output_formatter: + return "application/jsonlines" + return None + def stop_on_any_exception(func): diff --git a/tests/integration/llm/client.py b/tests/integration/llm/client.py index 16ed3022b..89e838257 100644 --- a/tests/integration/llm/client.py +++ b/tests/integration/llm/client.py @@ -297,27 +297,32 @@ def get_model_name(): "gpt-neox-20b": { "max_memory_per_gpu": [15.0], "batch_size": [1], - "seq_length": [64, 128, 256] + "seq_length": [64, 128, 256], + "stream_output": True }, "falcon-7b": { "max_memory_per_gpu": [20.0], "batch_size": [1], - "seq_length": [64, 128, 256] + "seq_length": [64, 128, 256], + "stream_output": True }, "open-llama-7b": { "max_memory_per_gpu": [8.0], "batch_size": [1], - "seq_length": [64, 128, 256] + "seq_length": [64, 128, 256], + "stream_output": True }, "flan-t5-xxl": { "max_memory_per_gpu": [12.0], "batch_size": [1], - "seq_length": [64, 128, 256] + "seq_length": [64, 128, 256], + "stream_output": True }, "gpt2": { - "max_memory_per_gpu": [8.0], + "max_memory_per_gpu": [5.0], "batch_size": [1], - "seq_length": [64, 128, 256] + "seq_length": [64, 128, 256], + "stream_output": True } } @@ -703,44 +708,6 @@ def test_transformers_neuronx_handler(model, model_spec): assert len(result) == batch_size -def test_lmi_dist_handler(model, model_spec): - if model not in model_spec: - raise ValueError( - f"{args.model} is not one of the supporting models {list(model_spec.keys())}" - ) - spec = model_spec[args.model] - if "worker" in spec: - check_worker_number(spec["worker"]) - for i, batch_size in enumerate(spec["batch_size"]): - for seq_length in spec["seq_length"]: - if "t5" in model: - req = {"inputs": t5_batch_generation(batch_size)} - else: - req = {"inputs": batch_generation(batch_size)} - params = {"max_new_tokens": seq_length} - req["parameters"] = params - logging.info(f"req {req}") - res = send_json(req) - if spec.get("stream_output", False): - logging.info(f"res: {res.content}") - result = res.content.decode().split("\n")[:-1] - assert len( - result - ) <= seq_length, "generated more tokens than max_new_tokens" - result_0 = json.loads(result[0])['outputs'] - assert len( - result_0 - ) == batch_size, "batch size number of tokens are not generated" - else: - res = res.text - logging.info(f"res {res}") - assert res is not None - memory_usage = get_gpu_memory() - logging.info(memory_usage) - for memory in memory_usage: - assert float(memory) / 1024.0 < spec["max_memory_per_gpu"][i] - - if __name__ == "__main__": if args.handler == "deepspeed_raw": test_ds_raw_model(args.model, ds_raw_model_spec) @@ -765,7 +732,7 @@ def test_lmi_dist_handler(model, model_spec): test_transformers_neuronx_raw(args.model, transformers_neuronx_raw_model_spec) elif args.handler == "lmi_dist": - test_lmi_dist_handler(args.model, lmi_dist_model_spec) + test_handler(args.model, lmi_dist_model_spec) elif args.handler == "performance": test_performance() else: