Skip to content

Commit

Permalink
[python] JSON format for rolling batch (deepjavalibrary#899)
Browse files Browse the repository at this point in the history
* [python] JSON format for rolling batch

* Review change

* Update integration test

* Review change

* set content-type for rolling batch

---------

Co-authored-by: Frank Liu <frankfliu2000@gmail.com>
  • Loading branch information
2 people authored and KexinFeng committed Aug 16, 2023
1 parent 128697c commit f741bdd
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 55 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/rolling_batch_integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
gpu_instance_id: ${{ steps.create_gpu.outputs.action_g5_instance_id }}


single-gpu-test:
scheduler-single-gpu-test:
runs-on: [ self-hosted, g5 ]
timeout-minutes: 60
needs: create-runners
Expand Down Expand Up @@ -98,7 +98,7 @@ jobs:
name: rb-single-gpu-logs
path: tests/integration/logs/

multi-gpu-test:
scheduler-multi-gpu-test:
runs-on: [ self-hosted, g5 ]
timeout-minutes: 60
needs: create-runners
Expand Down Expand Up @@ -243,7 +243,7 @@ jobs:
stop-runners:
if: always()
runs-on: [ self-hosted, scheduler ]
needs: [ create-runners, single-gpu-test, multi-gpu-test, lmi-dist-test ]
needs: [ create-runners, scheduler-single-gpu-test, scheduler-multi-gpu-test, lmi-dist-test ]
steps:
- name: Stop all instances
run: |
Expand Down
12 changes: 7 additions & 5 deletions engines/python/setup/djl_python/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ def initialize(self, properties: dict):
self.device = int(os.getenv("LOCAL_RANK", 0))
_rolling_batch_cls = get_rolling_batch_class_from_str(
self.rolling_batch_type, is_mpi, self.model_config)

# TODO: Allow user to set output formatter
self.rolling_batch = _rolling_batch_cls(model_id_or_path,
self.device, properties,
**kwargs)
Expand Down Expand Up @@ -236,11 +238,11 @@ def inference(self, inputs):
if self.rolling_batch_type:
result = self.rolling_batch.inference(input_data, parameters)
for i in range(inputs.get_batch_size()):
res = result[i]
encode(outputs,
res,
accept,
key=inputs.get_content().key_at(i))
outputs.add(result[i], key="data", batch_index=i)

content_type = self.rolling_batch.get_content_type()
if content_type:
outputs.add_property("content-type", content_type)

return outputs
elif self.enable_streaming:
Expand Down
32 changes: 30 additions & 2 deletions engines/python/setup/djl_python/rolling_batch/rolling_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,22 @@
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS"
# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. See the License for
# the specific language governing permissions and limitations under the License.
import json
import logging
from abc import ABC, abstractmethod


def _default_output_formatter(token_texts: list):
"""
Default output formatter
:return: formatted output
"""
token_texts = {"outputs": token_texts}
json_encoded_str = json.dumps(token_texts) + "\n"
return json_encoded_str


class Request(object):
"""
This class represents each request that comes to the handler.
Expand All @@ -24,16 +36,23 @@ class Request(object):
"""

def __init__(self, id: int, input_text: str, parameters: dict):
def __init__(self,
id: int,
input_text: str,
parameters: dict,
output_formatter=_default_output_formatter):
"""
Initialize a request
:param id: request id
:param input_text: request's input text
:param parameters: list of parameters
:param output_formatter: output formatter
"""
self.id = id
self.input_text = input_text
self.parameters = parameters
self.output_formatter = output_formatter
self.next_token = None
self.last_token = False

Expand All @@ -44,7 +63,10 @@ def set_next_token(self, next_token: str, last_token: bool = False):
:param next_token: next token to be set.
:param last_token: whether this token is the last of the sequence.
"""
self.next_token = next_token
if self.output_formatter is None:
self.next_token = next_token
else: # output only supports size one now
self.next_token = self.output_formatter([next_token])
self.last_token = last_token

def get_next_token(self) -> str:
Expand All @@ -63,6 +85,12 @@ def is_last_token(self) -> bool:
"""
return self.last_token

def get_content_type(self):
# TODO: find a way to return content-type for custom output formatter
if self.output_formatter == _default_output_formatter:
return "application/jsonlines"
return None


def stop_on_any_exception(func):

Expand Down
57 changes: 12 additions & 45 deletions tests/integration/llm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,27 +297,32 @@ def get_model_name():
"gpt-neox-20b": {
"max_memory_per_gpu": [15.0],
"batch_size": [1],
"seq_length": [64, 128, 256]
"seq_length": [64, 128, 256],
"stream_output": True
},
"falcon-7b": {
"max_memory_per_gpu": [20.0],
"batch_size": [1],
"seq_length": [64, 128, 256]
"seq_length": [64, 128, 256],
"stream_output": True
},
"open-llama-7b": {
"max_memory_per_gpu": [8.0],
"batch_size": [1],
"seq_length": [64, 128, 256]
"seq_length": [64, 128, 256],
"stream_output": True
},
"flan-t5-xxl": {
"max_memory_per_gpu": [12.0],
"batch_size": [1],
"seq_length": [64, 128, 256]
"seq_length": [64, 128, 256],
"stream_output": True
},
"gpt2": {
"max_memory_per_gpu": [8.0],
"max_memory_per_gpu": [5.0],
"batch_size": [1],
"seq_length": [64, 128, 256]
"seq_length": [64, 128, 256],
"stream_output": True
}
}

Expand Down Expand Up @@ -703,44 +708,6 @@ def test_transformers_neuronx_handler(model, model_spec):
assert len(result) == batch_size


def test_lmi_dist_handler(model, model_spec):
if model not in model_spec:
raise ValueError(
f"{args.model} is not one of the supporting models {list(model_spec.keys())}"
)
spec = model_spec[args.model]
if "worker" in spec:
check_worker_number(spec["worker"])
for i, batch_size in enumerate(spec["batch_size"]):
for seq_length in spec["seq_length"]:
if "t5" in model:
req = {"inputs": t5_batch_generation(batch_size)}
else:
req = {"inputs": batch_generation(batch_size)}
params = {"max_new_tokens": seq_length}
req["parameters"] = params
logging.info(f"req {req}")
res = send_json(req)
if spec.get("stream_output", False):
logging.info(f"res: {res.content}")
result = res.content.decode().split("\n")[:-1]
assert len(
result
) <= seq_length, "generated more tokens than max_new_tokens"
result_0 = json.loads(result[0])['outputs']
assert len(
result_0
) == batch_size, "batch size number of tokens are not generated"
else:
res = res.text
logging.info(f"res {res}")
assert res is not None
memory_usage = get_gpu_memory()
logging.info(memory_usage)
for memory in memory_usage:
assert float(memory) / 1024.0 < spec["max_memory_per_gpu"][i]


if __name__ == "__main__":
if args.handler == "deepspeed_raw":
test_ds_raw_model(args.model, ds_raw_model_spec)
Expand All @@ -765,7 +732,7 @@ def test_lmi_dist_handler(model, model_spec):
test_transformers_neuronx_raw(args.model,
transformers_neuronx_raw_model_spec)
elif args.handler == "lmi_dist":
test_lmi_dist_handler(args.model, lmi_dist_model_spec)
test_handler(args.model, lmi_dist_model_spec)
elif args.handler == "performance":
test_performance()
else:
Expand Down

0 comments on commit f741bdd

Please sign in to comment.