Skip to content

Commit

Permalink
Merge branch 'fea-sherlock' of github.com:nv-morpheus/Morpheus into d…
Browse files Browse the repository at this point in the history
…avid-fea-sherlock-llm-engine-stage
  • Loading branch information
dagardner-nv committed Oct 27, 2023
2 parents 706209a + 9d26514 commit 30a26e4
Show file tree
Hide file tree
Showing 22 changed files with 636 additions and 35 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/github/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

function print_env_vars() {
rapids-logger "Environ:"
env | grep -v -E "AWS_ACCESS_KEY_ID|AWS_SECRET_ACCESS_KEY|GH_TOKEN" | sort
env | grep -v -E "AWS_ACCESS_KEY_ID|AWS_SECRET_ACCESS_KEY|GH_TOKEN|NGC_API_KEY" | sort
}

rapids-logger "Env Setup"
Expand Down
1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies:
- configargparse=1.5
- cuda-compiler=11.8
- cuda-nvml-dev=11.8
- cuda-python>=11.8,<11.8.3 # workaround for https://github.com/nv-morpheus/Morpheus/issues/1317
- cuda-toolkit=11.8
- cudf=23.06
- cupy>=12.0.0
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def cli(ctx: click.Context, log_level: int, use_cpp: bool):

morpheus_logger = logging.getLogger("morpheus")

logger = logging.getLogger(__name__)
logger = logging.getLogger('.'.join(__name__.split('.')[:-1]))

# Set the parent logger for all of the llm examples to use morpheus so we can take advantage of configure_logging
logger.parent = morpheus_logger
Expand Down
8 changes: 5 additions & 3 deletions examples/llm/completion/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.concat_df import concat_dataframes

logger = logging.getLogger(__name__)

Expand All @@ -52,15 +53,14 @@ def _build_engine():

engine.add_node("completion", inputs=["/prompts"], node=LLMGenerateNode(llm_client=llm_clinet))

engine.add_task_handler(inputs=["/extracter"], handler=SimpleTaskHandler())
engine.add_task_handler(inputs=["/completion"], handler=SimpleTaskHandler())

return engine


def pipeline(num_threads, pipeline_batch_size, model_max_batch_size, repeat_count: int):

config = Config()
config.mode = PipelineModes.OTHER

# Below properties are specified by the command line
config.num_threads = num_threads
Expand Down Expand Up @@ -107,6 +107,8 @@ def pipeline(num_threads, pipeline_batch_size, model_max_batch_size, repeat_coun

pipe.run()

logger.info("Pipeline complete. Received %s responses", len(sink.get_messages()))
messages = sink.get_messages()
responses = concat_dataframes(messages)
logger.info("Pipeline complete. Received %s responses\n%s", len(messages), responses['response'])

return start_time
5 changes: 1 addition & 4 deletions examples/llm/vdb_upload/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,7 @@ Before running the pipeline, we need to ensure that the following services are r

- From the Morpheus repo root directory, run the following to launch Triton and load the `all-MiniLM-L6-v2` model:
```bash
docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002
-v $PWD/models:/models nvcr.io/nvidia/tritonserver:23.06-py3 tritonserver
--model-repository=/models/triton-model-repo --exit-on-error=false --model-control-mode=explicit
--load-model all-MiniLM-L6-v2
docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v $PWD/models:/models nvcr.io/nvidia/tritonserver:23.06-py3 tritonserver --model-repository=/models/triton-model-repo --exit-on-error=false --model-control-mode=explicit --load-model all-MiniLM-L6-v2
```

This will launch Triton and only load the `all-MiniLM-L6-v2` model. Once Triton has loaded the model, the following
Expand Down
25 changes: 23 additions & 2 deletions morpheus/_lib/llm/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,29 @@ class LLMContext():
pass
class LLMNodeBase():
def __init__(self) -> None: ...
def execute(self, context: LLMContext) -> typing.Awaitable[LLMContext]: ...
def get_input_names(self) -> typing.List[str]: ...
def execute(self, context: LLMContext) -> typing.Awaitable[LLMContext]:
"""
Execute the current node with the given `context` instance.
All inputs for the given node should be fetched from the context, typically by calling either
`context.get_inputs` to fetch all inputs as a `dict`, or `context.get_input` to fetch a specific input.
Similarly the output of the node is written to the context using `context.set_output`.
Parameters
----------
context : `morpheus._lib.llm.LLMContext`
Context instance to use for the execution
"""
def get_input_names(self) -> typing.List[str]:
"""
Get the input names for the node.
Returns
-------
list[str]
The input names for the node
"""
pass
class LLMEngineStage(mrc.core.segment.SegmentObject):
def __init__(self, builder: mrc.core.segment.Builder, name: str, engine: LLMEngine) -> None: ...
Expand Down
29 changes: 27 additions & 2 deletions morpheus/_lib/llm/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,33 @@ PYBIND11_MODULE(llm, _module)

py::class_<LLMNodeBase, PyLLMNodeBase<>, std::shared_ptr<LLMNodeBase>>(_module, "LLMNodeBase")
.def(py::init_alias<>())
.def("get_input_names", &LLMNodeBase::get_input_names)
.def("execute", &LLMNodeBase::execute, py::arg("context"));
.def("get_input_names",
&LLMNodeBase::get_input_names,
R"pbdoc(
Get the input names for the node.
Returns
-------
list[str]
The input names for the node
)pbdoc")
.def("execute",
&LLMNodeBase::execute,
py::arg("context"),
R"pbdoc(
Execute the current node with the given `context` instance.
All inputs for the given node should be fetched from the context, typically by calling either
`context.get_inputs` to fetch all inputs as a `dict`, or `context.get_input` to fetch a specific input.
Similarly the output of the node is written to the context using `context.set_output`.
Parameters
----------
context : `morpheus._lib.llm.LLMContext`
Context instance to use for the execution
)pbdoc");

py::class_<LLMNodeRunner, std::shared_ptr<LLMNodeRunner>>(_module, "LLMNodeRunner")
.def_property_readonly("inputs", &LLMNodeRunner::inputs)
Expand Down
9 changes: 9 additions & 0 deletions morpheus/llm/nodes/llm_generate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@


class LLMGenerateNode(LLMNodeBase):
"""
Generates responses from an LLM using the provided `llm_client` instance based on prompts provided as input from
upstream nodes.
Parameters
----------
llm_client : LLMClient
The client instance to use to generate responses.
"""

def __init__(self, llm_client: LLMClient) -> None:
super().__init__()
Expand Down
53 changes: 52 additions & 1 deletion morpheus/llm/services/llm_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,84 @@
# limitations under the License.

import logging
import typing
from abc import ABC
from abc import abstractmethod

logger = logging.getLogger(__name__)


class LLMClient(ABC):
"""
Abstract interface for clients which are able to interact with LLM models. Concrete implementations of this class
will have an associated implementation of `LLMService` which is able to construct instances of this class.
"""

@abstractmethod
def generate(self, prompt: str) -> str:
"""
Issue a request to generate a response based on a given prompt.
Parameters
----------
prompt : str
The prompt to generate a response for.
"""
pass

@abstractmethod
async def generate_async(self, prompt: str) -> str:
"""
Issue an asynchronous request to generate a response based on a given prompt.
Parameters
----------
prompt : str
The prompt to generate a response for.
"""
pass

@abstractmethod
def generate_batch(self, prompts: list[str]) -> list[str]:
"""
Issue a request to generate a list of responses based on a list of prompts.
Parameters
----------
prompts : list[str]
The prompts to generate responses for.
"""
pass

@abstractmethod
async def generate_batch_async(self, prompts: list[str]) -> list[str]:
"""
Issue an asynchronous request to generate a list of responses based on a list of prompts.
Parameters
----------
prompts : list[str]
The prompts to generate responses for.
"""
pass


class LLMService(ABC):
"""
Abstract interface for services which are able to construct clients for interacting with LLM models.
"""

@abstractmethod
def get_client(self, model_name: str, **model_kwargs) -> LLMClient:
def get_client(self, model_name: str, **model_kwargs: dict[str, typing.Any]) -> LLMClient:
"""
Returns a client for interacting with a specific model.
Parameters
----------
model_name : str
The name of the model to create a client for.
model_kwargs : dict[str, typing.Any]
Additional keyword arguments to pass to the model.
"""
pass
Loading

0 comments on commit 30a26e4

Please sign in to comment.