Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Add experimental pipeline docs #20292

Merged
merged 14 commits into from
Nov 17, 2021
1 change: 1 addition & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ Papers
serve/http-servehandle.rst
serve/deployment.rst
serve/ml-models.rst
serve/pipeline.rst
serve/performance.rst
serve/architecture.rst
serve/tutorials/index.rst
Expand Down
8 changes: 6 additions & 2 deletions doc/source/serve/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ Ray Serve is an easy-to-use scalable model serving library built on Ray. Ray Se
:ref:`Tensorflow, and Keras <serve-tensorflow-tutorial>`, to :ref:`Scikit-Learn <serve-sklearn-tutorial>` models, to arbitrary Python business logic.
- **Python-first**: Configure your model serving declaratively in pure Python, without needing YAML or JSON configs.

Ray Serve enables :ref:`seamless multi-models inference pipeline (also known as model composition) <serve-pipeline-api>`. You can
write your inference pipeline all in code and integrate business logic with ML.

Since Ray Serve is built on Ray, it allows you to easily scale to many machines, both in your datacenter and in the cloud.

Ray Serve can be used in two primary ways to deploy your models at scale:
Expand All @@ -31,12 +34,13 @@ Ray Serve can be used in two primary ways to deploy your models at scale:

2. Alternatively, call them from :ref:`within your existing Python web server <serve-web-server-integration-tutorial>` using the Python-native :ref:`servehandle-api`.


.. note::
Serve recently added an experimental first-class API for model composition (pipelines).
Please take a look at the :ref:`Pipeline API <serve-pipeline-api>` and try it out!

.. tip::
Chat with Ray Serve users and developers on our `forum <https://discuss.ray.io/>`_!


Ray Serve Quickstart
====================

Expand Down
4 changes: 4 additions & 0 deletions doc/source/serve/ml-models.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ dive.
Model Composition
=================

.. note::
Serve recently added an experimental first-class API for model composition (pipelines).
Please take a look at the :ref:`Pipeline API <serve-pipeline-api>` and try it out!

Ray Serve supports composing individually scalable models into a single model
out of the box. For instance, you can combine multiple models to perform
stacking or ensembles.
Expand Down
7 changes: 7 additions & 0 deletions doc/source/serve/package-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,10 @@ ServeHandle API
Batching Requests
-----------------
.. autofunction:: ray.serve.batch(max_batch_size=10, batch_wait_timeout_s=0.0)

Serve Pipeline API
------------------

.. autoclass:: ray.serve.pipeline.PipelineStep

.. autoclass:: ray.serve.pipeline.ExecutionMode
162 changes: 162 additions & 0 deletions doc/source/serve/pipeline.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
.. _serve-pipeline-api:

Pipeline API (Experimental)
===========================

This section should help you:

- understand the experimental pipeline API.
- build on top of the API to construct your multi-model inference pipelines.


.. note::
This API is experimental and the API is subject to change.
We are actively looking for feedback via the Ray `Forum`_ or `GitHub Issues`_

Serve Pipeline is a new experimental package purposely built to help developing
and deploying multi-models inference pipelines, also known as model composition.

Model composition is common in real-world ML applications. In many cases, you need to:

- Split CPU bounded preprocessing and GPU bounded model inference to scale each phase separately.
- Chain multiple models together for a single tasks.
- Combine the output from multiple models to create ensemble result.
- Dynamically select models based on attribute of the input data.

The Serve Pipeline has the following features:

- It has a python based, declarative API for constructing pipeline DAG.
- It gives you visibility into the whole pipeline without losing the flexibility
of coding arbitrary graph using code.
- You can develop and test pipeline locally with local execution mode.
- Each model in the DAG can be scaled to many replicas across the Ray cluster.
You can fine-tune the resource usage to achieve maximum throughput and utilization.

Compare to ServeHandle, Serve Pipeline is more explicit about the dependencies
of each model in the pipeline and let you deploy the entire DAG at once.

Compare to KServe (formerly KFServing), Serve Pipeline enables writing pipeline
as code and arbitrary control flow operation using Python.

Compare to building your own orchestration micro-services, Serve Pipeline helps
you to be productive in building scalable pipeline in hours.


Basic API
---------

Serve Pipeline is a standalone package that can be used without Ray Serve.
However, the expected usage is to use it inside your Serve deployment.

You can import it as:

.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py
:language: python
:start-after: __import_start__
:end-before: __import_end__

You can decorate any function or class using ``pipeline.step``. You can then
combine these steps into a pipeline by calling the decorated steps. In
the example below, we have a single step that takes the special node ``pipeline.INPUT``,
, which is a placeholder for the arguments that will be passed into the pipeline.

Once you have defined the pipeline by combining one or more steps, you can call ``.deploy()`` to instantiate it.
Once you have instantiated the pipeline, you can call ``.call(inp)`` to invoke synchronously.

.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py
:language: python
:start-after: __simple_pipeline_start__
:end-before: __simple_pipeline_end__

The input to a pipeline node can be the ``pipeline.INPUT`` special node or
one or more other pipeline nodes. Here is an example of simple chaining pipeline.

.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py
:language: python
:start-after: __simple_chain_start__
:end-before: __simple_chain_end__

For classes, you need to instantiate them with init args first, then pass in their
upstream nodes. This allows you to have the same code definition but pass different
arguments, like URIs for model weights (you can see an example of this in the
:ref:`ensemble example <serve-pipeline-ensemble-api>` section.)

.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py
:language: python
:start-after: __class_node_start__
:end-before: __class_node_end__

The decorator also takes two arguments to configure where the node will be executed.

.. autofunction:: ray.serve.pipeline.step

Here is an example pipeline that uses actors instead of local execution mode. The local
execution mode is the default running mode. It runs the node directly within the process
instead of distributing them out. This mode is useful for local testing and development.

.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py
:language: python
:start-after: __pipeline_configuration_start__
:end-before: __pipeline_configuration_end__


Chaining Example
----------------

In this section, we show how to implement a two stage pipeline that's common
for computer vision tasks. For workloads like image classification, the preprocessing
steps are CPU bounded and hard to parallelize. The actual inference steps can run
on GPU and batched (batching helps improving throughput without sacrificing latency,
you can learn more in our :ref:`batching tutorial <serve-batch-tutorial>`).
They are often split up into separate stages and scaled separately to increase throughput.


.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py
:language: python
:start-after: __preprocessing_pipeline_example_start__
:end-before: __preprocessing_pipeline_example_end__


.. _serve-pipeline-ensemble-api:


Ensemble Example
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a section after this listing more potential use cases?

Also, can we add a simple example of "combining business logic + ML"? This could be just calling out that you can do anything in the "combine output" step

----------------

We will now expand on previous example to construct an ensemble pipeline. In
the previous example, our pipeline looks like: preprocess -> resnet18. What if we
want to aggregate the output from many different models? You can build this scatter-gather
pattern easily with Pipeline. The below code snippet shows how to construct a pipeline
looks like: preprocess -> [resnet18, resnet34] -> combine_output.


.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py
:language: python
:start-after: __ensemble_pipeline_example_start__
:end-before: __ensemble_pipeline_example_end__

More Use Case Examples
----------------------

There are even more use cases for Serve Pipeline.

.. note::
Please feel free to suggest more use cases and contribute your examples by
sending a `Github Pull Requests`_!

Combining business logic + ML models
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Based off the previous ensemble example, you can put arbitrary business logic
in your pipeline step.

.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_pipeline.py
:language: python
:start-after: __biz_logic_start__
:end-before: __biz_logic_end__



.. _`Forum`: https://discuss.ray.io/
.. _`GitHub Issues`: https://github.com/ray-project/ray/issues
.. _`GitHub Pull Requests`: https://github.com/ray-project/ray/pulls
169 changes: 169 additions & 0 deletions python/ray/serve/examples/doc/snippet_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# flake8: noqa

# __import_start__
from ray.serve import pipeline
# __import_end__

import ray
ray.init(num_cpus=16)


# __simple_pipeline_start__
@pipeline.step
def echo(inp):
return inp


my_node = echo(pipeline.INPUT)
my_pipeline = my_node.deploy()
assert my_pipeline.call(42) == 42
# __simple_pipeline_end__

del my_pipeline


# __simple_chain_start__
@pipeline.step
def add_one(inp):
return inp + 1


@pipeline.step
def double(inp):
return inp**2


my_node = double(add_one(pipeline.INPUT))
my_pipeline = my_node.deploy()
assert my_pipeline.call(1) == 4

# __simple_chain_end__

del my_pipeline


# __class_node_start__
@pipeline.step
class Adder:
def __init__(self, value):
self.value = value

def __call__(self, inp):
return self.value + inp


my_pipeline = Adder(2)(pipeline.INPUT).deploy()
assert my_pipeline.call(2) == 4
# __class_node_end__

del my_pipeline


# __pipeline_configuration_start__
@pipeline.step(execution_mode="actors", num_replicas=2)
def echo(inp):
return inp


my_pipeline = echo(pipeline.INPUT).deploy()
assert my_pipeline.call(42) == 42
# __pipeline_configuration_end__

del my_pipeline


# __preprocessing_pipeline_example_start__
@pipeline.step(execution_mode="tasks")
def preprocess(img_bytes):
from torchvision import transforms
import PIL.Image
import io

preprocessor = transforms.Compose([
transforms.Resize(224),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Lambda(lambda t: t[:3, ...]), # remove alpha channel
transforms.Normalize(
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
return preprocessor(PIL.Image.open(io.BytesIO(img_bytes))).unsqueeze(0)


@pipeline.step(execution_mode="actors", num_replicas=2)
class ClassificationModel:
def __init__(self, model_name):
import torchvision.models.resnet
self.model = getattr(torchvision.models.resnet,
model_name)(pretrained=True)

def __call__(self, inp_tensor):
import torch
with torch.no_grad():
output = self.model(inp_tensor).squeeze()
sorted_value, sorted_idx = output.sort()
return {
"top_5_categories": sorted_idx.numpy().tolist()[-5:],
"top_5_scores": sorted_value.numpy().tolist()[-5:]
}


import PIL.Image
import io
import numpy as np

# Generate dummy input
_buffer = io.BytesIO()
PIL.Image.fromarray(
np.zeros((720, 720, 3), int), mode="RGB").save(_buffer, "png")
dummy_png_bytes = _buffer.getvalue()

sequential_pipeline = (ClassificationModel("resnet18")(preprocess(
pipeline.INPUT)).deploy())
result = sequential_pipeline.call(dummy_png_bytes)
assert result["top_5_categories"] == [898, 412, 600, 731, 463]
# __preprocessing_pipeline_example_end__

# __cleanup_example_start__
del sequential_pipeline
# __cleanup_example_end__


# __ensemble_pipeline_example_start__
@pipeline.step(execution_mode="tasks")
def combine_output(*classifier_outputs):
# Here will we will just concatenate the result from multiple models
# You can easily extend this to other ensemble techniques like voting
# or weighted average.
return sum([out["top_5_categories"] for out in classifier_outputs], [])


preprocess_node = preprocess(pipeline.INPUT)
model_nodes = [
ClassificationModel(model)(preprocess_node)
for model in ["resnet18", "resnet34"]
]
ensemble_pipeline = combine_output(*model_nodes).deploy()
result = ensemble_pipeline.call(dummy_png_bytes)
assert result == [898, 412, 600, 731, 463, 899, 618, 733, 463, 600]

# __ensemble_pipeline_example_end__

del ensemble_pipeline


# __biz_logic_start__
@pipeline.step(execution_mode="tasks")
def dynamic_weighting_combine(*classifier_outputs):
# Pseudo-code:
# Example of bringing in custom business logic and arbitrary Python code.
# You can issue database queries, log metrics, and run complex computation.
my_weights = my_db.get("dynamic_weights")
weighted_output = average(classifier_outputs, my_weights)
my_logger.log(weighted_output)
my_api_response = my_response_model.reshape(
[out.astype("int") for out in weighted_output])
return my_api_response


# __biz_logic_end__
2 changes: 1 addition & 1 deletion python/ray/serve/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from ray.serve.pipeline.common import ExecutionMode # noqa:F401
from ray.serve.pipeline.node import INPUT # noqa:F401
from ray.serve.pipeline.step import step # noqa:F401
from ray.serve.pipeline.step import step, PipelineStep # noqa:F401
Loading