-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Serve] Add experimental pipeline docs #20292
Merged
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
9f29a7a
wip
simon-mo 7fc4983
[Serve] Add docs for experimental pipeline
simon-mo af362fe
ensemble snippet
simon-mo fd8ed5f
polish
simon-mo f1478d2
Add doc page
simon-mo b6d9d0b
Merge branch 'master' of github.com:ray-project/ray into serve/pipeli…
simon-mo be5650c
lint
simon-mo d041242
Comments
simon-mo da4aa19
fix lint
simon-mo 09ac2ca
lint
simon-mo 01daeea
lint again
simon-mo e683755
lint again
simon-mo a37aee4
link
simon-mo 615572b
fix
simon-mo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
.. _serve-pipeline-api: | ||
|
||
Pipeline API (Experimental) | ||
=========================== | ||
|
||
This section should help you: | ||
|
||
- understand the experimental pipeline API. | ||
- build on top of the API to construct your multi-model inference pipelines. | ||
|
||
|
||
.. note:: | ||
This API is experimental and the API is subject to change. | ||
We are actively looking for feedback via the Ray `Forum`_ or `GitHub Issues`_ | ||
|
||
Serve Pipeline is a new experimental package purposely built to help developing | ||
and deploying multi-models inference pipelines, also known as model composition. | ||
|
||
Model composition is common in real-world ML applications. In many cases, you need to: | ||
|
||
- Split CPU bounded preprocessing and GPU bounded model inference to scale each phase separately. | ||
- Chain multiple models together for a single tasks. | ||
- Combine the output from multiple models to create ensemble result. | ||
- Dynamically select models based on attribute of the input data. | ||
|
||
The Serve Pipeline has the following features: | ||
|
||
- It has a python based, declarative API for constructing pipeline DAG. | ||
- It gives you visibility into the whole pipeline without losing the flexibility | ||
of coding arbitrary graph using code. | ||
- You can develop and test pipeline locally with local execution mode. | ||
- Each model in the DAG can be scaled to many replicas across the Ray cluster. | ||
You can fine-tune the resource usage to achieve maximum throughput and utilization. | ||
|
||
Compare to ServeHandle, Serve Pipeline is more explicit about the dependencies | ||
of each model in the pipeline and let you deploy the entire DAG at once. | ||
|
||
Compare to KServe (formerly KFServing), Serve Pipeline enables writing pipeline | ||
as code and arbitrary control flow operation using Python. | ||
|
||
Compare to building your own orchestration micro-services, Serve Pipeline helps | ||
you to be productive in building scalable pipeline in hours. | ||
|
||
|
||
Basic API | ||
--------- | ||
|
||
Serve Pipeline is a standalone package that can be used without Ray Serve. | ||
However, the expected usage is to use it inside your Serve deployment. | ||
|
||
You can import it as: | ||
|
||
.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py | ||
:language: python | ||
:start-after: __import_start__ | ||
:end-before: __import_end__ | ||
|
||
You can decorate any function or class using ``pipeline.step``. You can then | ||
combine these steps into a pipeline by calling the decorated steps. In | ||
the example below, we have a single step that takes the special node ``pipeline.INPUT``, | ||
, which is a placeholder for the arguments that will be passed into the pipeline. | ||
|
||
Once you have defined the pipeline by combining one or more steps, you can call ``.deploy()`` to instantiate it. | ||
Once you have instantiated the pipeline, you can call ``.call(inp)`` to invoke synchronously. | ||
|
||
.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py | ||
:language: python | ||
:start-after: __simple_pipeline_start__ | ||
:end-before: __simple_pipeline_end__ | ||
|
||
The input to a pipeline node can be the ``pipeline.INPUT`` special node or | ||
one or more other pipeline nodes. Here is an example of simple chaining pipeline. | ||
|
||
.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py | ||
:language: python | ||
:start-after: __simple_chain_start__ | ||
:end-before: __simple_chain_end__ | ||
|
||
For classes, you need to instantiate them with init args first, then pass in their | ||
upstream nodes. This allows you to have the same code definition but pass different | ||
arguments, like URIs for model weights (you can see an example of this in the | ||
:ref:`ensemble example <serve-pipeline-ensemble-api>` section.) | ||
|
||
.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py | ||
:language: python | ||
:start-after: __class_node_start__ | ||
:end-before: __class_node_end__ | ||
|
||
The decorator also takes two arguments to configure where the node will be executed. | ||
|
||
.. autofunction:: ray.serve.pipeline.step | ||
|
||
Here is an example pipeline that uses actors instead of local execution mode. The local | ||
execution mode is the default running mode. It runs the node directly within the process | ||
instead of distributing them out. This mode is useful for local testing and development. | ||
|
||
.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py | ||
:language: python | ||
:start-after: __pipeline_configuration_start__ | ||
:end-before: __pipeline_configuration_end__ | ||
|
||
|
||
Chaining Example | ||
---------------- | ||
|
||
In this section, we show how to implement a two stage pipeline that's common | ||
for computer vision tasks. For workloads like image classification, the preprocessing | ||
steps are CPU bounded and hard to parallelize. The actual inference steps can run | ||
on GPU and batched (batching helps improving throughput without sacrificing latency, | ||
you can learn more in our :ref:`batching tutorial <serve-batch-tutorial>`). | ||
They are often split up into separate stages and scaled separately to increase throughput. | ||
|
||
|
||
.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py | ||
:language: python | ||
:start-after: __preprocessing_pipeline_example_start__ | ||
:end-before: __preprocessing_pipeline_example_end__ | ||
|
||
|
||
.. _serve-pipeline-ensemble-api: | ||
|
||
|
||
Ensemble Example | ||
---------------- | ||
|
||
We will now expand on previous example to construct an ensemble pipeline. In | ||
the previous example, our pipeline looks like: preprocess -> resnet18. What if we | ||
want to aggregate the output from many different models? You can build this scatter-gather | ||
pattern easily with Pipeline. The below code snippet shows how to construct a pipeline | ||
looks like: preprocess -> [resnet18, resnet34] -> combine_output. | ||
|
||
|
||
.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py | ||
:language: python | ||
:start-after: __ensemble_pipeline_example_start__ | ||
:end-before: __ensemble_pipeline_example_end__ | ||
|
||
More Use Case Examples | ||
---------------------- | ||
|
||
There are even more use cases for Serve Pipeline. | ||
|
||
.. note:: | ||
Please feel free to suggest more use cases and contribute your examples by | ||
sending a `Github Pull Requests`_! | ||
|
||
Combining business logic + ML models | ||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
Based off the previous ensemble example, you can put arbitrary business logic | ||
in your pipeline step. | ||
|
||
.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py | ||
:language: python | ||
:start-after: __biz_logic_start__ | ||
:end-before: __biz_logic_end__ | ||
|
||
|
||
|
||
.. _`Forum`: https://discuss.ray.io/ | ||
.. _`GitHub Issues`: https://github.com/ray-project/ray/issues | ||
.. _`GitHub Pull Requests`: https://github.com/ray-project/ray/pulls |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
# flake8: noqa | ||
|
||
# __import_start__ | ||
from ray.serve import pipeline | ||
# __import_end__ | ||
|
||
import ray | ||
ray.init(num_cpus=16) | ||
|
||
|
||
# __simple_pipeline_start__ | ||
@pipeline.step | ||
def echo(inp): | ||
return inp | ||
|
||
|
||
my_node = echo(pipeline.INPUT) | ||
my_pipeline = my_node.deploy() | ||
assert my_pipeline.call(42) == 42 | ||
# __simple_pipeline_end__ | ||
|
||
del my_pipeline | ||
|
||
|
||
# __simple_chain_start__ | ||
@pipeline.step | ||
def add_one(inp): | ||
return inp + 1 | ||
|
||
|
||
@pipeline.step | ||
def double(inp): | ||
return inp**2 | ||
|
||
|
||
my_node = double(add_one(pipeline.INPUT)) | ||
my_pipeline = my_node.deploy() | ||
assert my_pipeline.call(1) == 4 | ||
|
||
# __simple_chain_end__ | ||
|
||
del my_pipeline | ||
|
||
|
||
# __class_node_start__ | ||
@pipeline.step | ||
class Adder: | ||
def __init__(self, value): | ||
self.value = value | ||
|
||
def __call__(self, inp): | ||
return self.value + inp | ||
|
||
|
||
my_pipeline = Adder(2)(pipeline.INPUT).deploy() | ||
assert my_pipeline.call(2) == 4 | ||
# __class_node_end__ | ||
|
||
del my_pipeline | ||
|
||
|
||
# __pipeline_configuration_start__ | ||
@pipeline.step(execution_mode="actors", num_replicas=2) | ||
def echo(inp): | ||
return inp | ||
|
||
|
||
my_pipeline = echo(pipeline.INPUT).deploy() | ||
assert my_pipeline.call(42) == 42 | ||
# __pipeline_configuration_end__ | ||
|
||
del my_pipeline | ||
|
||
|
||
# __preprocessing_pipeline_example_start__ | ||
@pipeline.step(execution_mode="tasks") | ||
def preprocess(img_bytes): | ||
from torchvision import transforms | ||
import PIL.Image | ||
import io | ||
|
||
preprocessor = transforms.Compose([ | ||
transforms.Resize(224), | ||
transforms.CenterCrop(224), | ||
transforms.ToTensor(), | ||
transforms.Lambda(lambda t: t[:3, ...]), # remove alpha channel | ||
transforms.Normalize( | ||
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | ||
]) | ||
return preprocessor(PIL.Image.open(io.BytesIO(img_bytes))).unsqueeze(0) | ||
|
||
|
||
@pipeline.step(execution_mode="actors", num_replicas=2) | ||
class ClassificationModel: | ||
def __init__(self, model_name): | ||
import torchvision.models.resnet | ||
self.model = getattr(torchvision.models.resnet, | ||
model_name)(pretrained=True) | ||
|
||
def __call__(self, inp_tensor): | ||
import torch | ||
with torch.no_grad(): | ||
output = self.model(inp_tensor).squeeze() | ||
sorted_value, sorted_idx = output.sort() | ||
return { | ||
"top_5_categories": sorted_idx.numpy().tolist()[-5:], | ||
"top_5_scores": sorted_value.numpy().tolist()[-5:] | ||
} | ||
|
||
|
||
import PIL.Image | ||
import io | ||
import numpy as np | ||
|
||
# Generate dummy input | ||
_buffer = io.BytesIO() | ||
PIL.Image.fromarray( | ||
np.zeros((720, 720, 3), int), mode="RGB").save(_buffer, "png") | ||
dummy_png_bytes = _buffer.getvalue() | ||
|
||
sequential_pipeline = (ClassificationModel("resnet18")(preprocess( | ||
pipeline.INPUT)).deploy()) | ||
result = sequential_pipeline.call(dummy_png_bytes) | ||
assert result["top_5_categories"] == [898, 412, 600, 731, 463] | ||
# __preprocessing_pipeline_example_end__ | ||
|
||
# __cleanup_example_start__ | ||
del sequential_pipeline | ||
# __cleanup_example_end__ | ||
|
||
|
||
# __ensemble_pipeline_example_start__ | ||
@pipeline.step(execution_mode="tasks") | ||
def combine_output(*classifier_outputs): | ||
# Here will we will just concatenate the result from multiple models | ||
# You can easily extend this to other ensemble techniques like voting | ||
# or weighted average. | ||
return sum([out["top_5_categories"] for out in classifier_outputs], []) | ||
|
||
|
||
preprocess_node = preprocess(pipeline.INPUT) | ||
model_nodes = [ | ||
ClassificationModel(model)(preprocess_node) | ||
for model in ["resnet18", "resnet34"] | ||
] | ||
ensemble_pipeline = combine_output(*model_nodes).deploy() | ||
result = ensemble_pipeline.call(dummy_png_bytes) | ||
assert result == [898, 412, 600, 731, 463, 899, 618, 733, 463, 600] | ||
|
||
# __ensemble_pipeline_example_end__ | ||
|
||
del ensemble_pipeline | ||
|
||
|
||
# __biz_logic_start__ | ||
@pipeline.step(execution_mode="tasks") | ||
def dynamic_weighting_combine(*classifier_outputs): | ||
# Pseudo-code: | ||
# Example of bringing in custom business logic and arbitrary Python code. | ||
# You can issue database queries, log metrics, and run complex computation. | ||
my_weights = my_db.get("dynamic_weights") | ||
weighted_output = average(classifier_outputs, my_weights) | ||
my_logger.log(weighted_output) | ||
my_api_response = my_response_model.reshape( | ||
[out.astype("int") for out in weighted_output]) | ||
return my_api_response | ||
|
||
|
||
# __biz_logic_end__ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,3 @@ | ||
from ray.serve.pipeline.common import ExecutionMode # noqa:F401 | ||
from ray.serve.pipeline.node import INPUT # noqa:F401 | ||
from ray.serve.pipeline.step import step # noqa:F401 | ||
from ray.serve.pipeline.step import step, PipelineStep # noqa:F401 |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a section after this listing more potential use cases?
Also, can we add a simple example of "combining business logic + ML"? This could be just calling out that you can do anything in the "combine output" step