Skip to content

Commit

Permalink
[air] add to_air_checkpoint method for inference only workload. (ray-…
Browse files Browse the repository at this point in the history
…project#25444)

Follow up on our last discussion for supporting piecemeal fashion air users.
Only did for tensorflow for now, want to collect some feedback on API naming, package structure etc and I will add others.
  • Loading branch information
xwjiang2010 authored Jun 7, 2022
1 parent 3257994 commit 76b34d4
Show file tree
Hide file tree
Showing 19 changed files with 270 additions and 34 deletions.
1 change: 1 addition & 0 deletions doc/source/_toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ parts:
- file: ray-air/examples/upload_to_wandb
- file: ray-air/examples/serving_guide
- file: ray-air/deployment
- file: ray-air/use-pretrained-model
- file: ray-air/examples/index
sections:
- file: ray-air/examples/torch_image_example
Expand Down
32 changes: 32 additions & 0 deletions doc/source/ray-air/doc_code/use_pretrained_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# flake8: noqa

# __use_pretrained_model_start__
import ray
import tensorflow as tf
from ray.air.batch_predictor import BatchPredictor
from ray.air.predictors.integrations.tensorflow import (
to_air_checkpoint,
TensorflowPredictor,
)


# to simulate having a pretrained model.
def build_model() -> tf.keras.Model:
model = tf.keras.Sequential(
[
tf.keras.layers.InputLayer(input_shape=(1,)),
tf.keras.layers.Dense(1),
]
)
return model


model = build_model()
checkpoint = to_air_checkpoint(model)
batch_predictor = BatchPredictor(
checkpoint, TensorflowPredictor, model_definition=build_model
)
predict_dataset = ray.data.range(3)
predictions = batch_predictor.predict(predict_dataset)

# __use_pretrained_model_end__
22 changes: 22 additions & 0 deletions doc/source/ray-air/use-pretrained-model.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
.. _use-pretrained-model:

Use a pretrained model for batch or online inference
=====================================================

Ray Air moves end to end machine learning workloads seamlessly through the construct of ``Checkpoint``. ``Checkpoint``
is the output of training and tuning as well as the input to downstream inference tasks.

Having said that, it is entirely possible and supported to use Ray Air in a piecemeal fashion.

Say you already have a model trained elsewhere, you can use Ray Air for downstream tasks such as batch and
online inference. To do that, you would need to convert the pretrained model together with any preprocessing
steps into ``Checkpoint``.

To facilitate this, we have prepared framework specific ``to_air_checkpoint`` helper function.

Examples:

.. literalinclude:: doc_code/use_pretrained_model.py
:language: python
:start-after: __use_pretrained_model_start__
:end-before: __use_pretrained_model_end__
6 changes: 3 additions & 3 deletions python/ray/air/_internal/checkpointing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from pathlib import Path
from typing import Optional
from typing import Optional, Union

import ray.cloudpickle as cpickle
from ray.air.preprocessor import Preprocessor
Expand All @@ -9,8 +9,8 @@

def save_preprocessor_to_dir(
preprocessor: Preprocessor,
parent_dir: os.PathLike,
) -> os.PathLike:
parent_dir: Union[os.PathLike, str],
) -> None:
"""Save preprocessor to file. Returns path saved to."""
parent_dir = Path(parent_dir)
with open(parent_dir.joinpath(PREPROCESSOR_KEY), "wb") as f:
Expand Down
3 changes: 2 additions & 1 deletion python/ray/air/predictors/integrations/lightgbm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ray.air.predictors.integrations.lightgbm.lightgbm_predictor import (
LightGBMPredictor,
)
from ray.air.predictors.integrations.lightgbm.utils import to_air_checkpoint

__all__ = ["LightGBMPredictor"]
__all__ = ["LightGBMPredictor", "to_air_checkpoint"]
36 changes: 36 additions & 0 deletions python/ray/air/predictors/integrations/lightgbm/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Optional

import os
import lightgbm

from ray.air.checkpoint import Checkpoint
from ray.air.constants import MODEL_KEY
from ray.air.preprocessor import Preprocessor
from ray.air._internal.checkpointing import (
save_preprocessor_to_dir,
)


def to_air_checkpoint(
path: str,
booster: lightgbm.Booster,
preprocessor: Optional[Preprocessor] = None,
) -> Checkpoint:
"""Convert a pretrained model to AIR checkpoint for serve or inference.
Args:
path: The directory path where model and preprocessor steps are stored to.
booster: A pretrained lightgbm model.
preprocessor: A fitted preprocessor. The preprocessing logic will
be applied to serve/inference.
Returns:
A Ray Air checkpoint.
"""
booster.save_model(os.path.join(path, MODEL_KEY))

if preprocessor:
save_preprocessor_to_dir(preprocessor, path)

checkpoint = Checkpoint.from_directory(path)

return checkpoint
3 changes: 2 additions & 1 deletion python/ray/air/predictors/integrations/sklearn/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ray.air.predictors.integrations.sklearn.sklearn_predictor import (
SklearnPredictor,
)
from ray.air.predictors.integrations.sklearn.utils import to_air_checkpoint

__all__ = ["SklearnPredictor"]
__all__ = ["SklearnPredictor", "to_air_checkpoint"]
38 changes: 38 additions & 0 deletions python/ray/air/predictors/integrations/sklearn/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Optional

import os
from sklearn.base import BaseEstimator

from ray.air.checkpoint import Checkpoint
from ray.air.constants import MODEL_KEY
from ray.air.preprocessor import Preprocessor
from ray.air._internal.checkpointing import (
save_preprocessor_to_dir,
)
import ray.cloudpickle as cpickle


def to_air_checkpoint(
path: str,
estimator: BaseEstimator,
preprocessor: Optional[Preprocessor] = None,
) -> Checkpoint:
"""Convert a pretrained model to AIR checkpoint for serve or inference.
Args:
path: The directory path where model and preprocessor steps are stored to.
estimator: A pretrained model.
preprocessor: A fitted preprocessor. The preprocessing logic will
be applied to serve/inference.
Returns:
A Ray Air checkpoint.
"""
with open(os.path.join(path, MODEL_KEY), "wb") as f:
cpickle.dump(estimator, f)

if preprocessor:
save_preprocessor_to_dir(preprocessor, path)

checkpoint = Checkpoint.from_directory(path)

return checkpoint
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ray.air.predictors.integrations.tensorflow.tensorflow_predictor import (
TensorflowPredictor,
)
from ray.air.predictors.integrations.tensorflow.utils import to_air_checkpoint

__all__ = ["TensorflowPredictor"]
__all__ = ["TensorflowPredictor", "to_air_checkpoint"]
25 changes: 25 additions & 0 deletions python/ray/air/predictors/integrations/tensorflow/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Optional

from tensorflow import keras

from ray.air.checkpoint import Checkpoint
from ray.air.constants import MODEL_KEY, PREPROCESSOR_KEY
from ray.air.preprocessor import Preprocessor


def to_air_checkpoint(
model: keras.Model, preprocessor: Optional[Preprocessor] = None
) -> Checkpoint:
"""Convert a pretrained model to AIR checkpoint for serve or inference.
Args:
model: A pretrained model.
preprocessor: A fitted preprocessor. The preprocessing logic will
be applied to serve/inference.
Returns:
A Ray Air checkpoint.
"""
checkpoint = Checkpoint.from_dict(
{PREPROCESSOR_KEY: preprocessor, MODEL_KEY: model.get_weights()}
)
return checkpoint
3 changes: 2 additions & 1 deletion python/ray/air/predictors/integrations/torch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ray.air.predictors.integrations.torch.torch_predictor import TorchPredictor
from ray.air.predictors.integrations.torch.utils import to_air_checkpoint

__all__ = ["TorchPredictor"]
__all__ = ["TorchPredictor", "to_air_checkpoint"]
25 changes: 25 additions & 0 deletions python/ray/air/predictors/integrations/torch/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Optional

import torch

from ray.air.checkpoint import Checkpoint
from ray.air.constants import MODEL_KEY, PREPROCESSOR_KEY
from ray.air.preprocessor import Preprocessor


def to_air_checkpoint(
model: torch.nn.Module, preprocessor: Optional[Preprocessor] = None
) -> Checkpoint:
"""Convert a pretrained model to AIR checkpoint for serve or inference.
Args:
model: A pretrained model.
preprocessor: A fitted preprocessor. The preprocessing logic will
be applied to serve/inference.
Returns:
A Ray Air checkpoint.
"""
checkpoint = Checkpoint.from_dict(
{PREPROCESSOR_KEY: preprocessor, MODEL_KEY: model}
)
return checkpoint
3 changes: 2 additions & 1 deletion python/ray/air/predictors/integrations/xgboost/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ray.air.predictors.integrations.xgboost.xgboost_predictor import XGBoostPredictor
from ray.air.predictors.integrations.xgboost.utils import to_air_checkpoint

__all__ = ["XGBoostPredictor"]
__all__ = ["XGBoostPredictor", "to_air_checkpoint"]
36 changes: 36 additions & 0 deletions python/ray/air/predictors/integrations/xgboost/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Optional

import os
import xgboost

from ray.air.checkpoint import Checkpoint
from ray.air.constants import MODEL_KEY
from ray.air.preprocessor import Preprocessor
from ray.air._internal.checkpointing import (
save_preprocessor_to_dir,
)


def to_air_checkpoint(
path: str,
booster: xgboost.Booster,
preprocessor: Optional[Preprocessor] = None,
) -> Checkpoint:
"""Convert a pretrained model to AIR checkpoint for serve or inference.
Args:
path: The directory path where model and preprocessor steps are stored to.
booster: A pretrained xgboost model.
preprocessor: A fitted preprocessor. The preprocessing logic will
be applied to serve/inference.
Returns:
A Ray Air checkpoint.
"""
booster.save_model(os.path.join(path, MODEL_KEY))

if preprocessor:
save_preprocessor_to_dir(preprocessor, path)

checkpoint = Checkpoint.from_directory(path)

return checkpoint
16 changes: 6 additions & 10 deletions python/ray/air/tests/test_lightgbm_predictor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import pytest

from ray.air.predictors.integrations.lightgbm import LightGBMPredictor
from ray.air.predictors.integrations.lightgbm import (
LightGBMPredictor,
to_air_checkpoint,
)
from ray.air.preprocessor import Preprocessor
from ray.air.checkpoint import Checkpoint
from ray.air.constants import MODEL_KEY
Expand Down Expand Up @@ -88,16 +91,9 @@ def test_predict_feature_columns_pandas():
assert hasattr(predictor.preprocessor, "_batch_transformed")


def test_predict_no_preprocessor():
def test_predict_no_preprocessor_no_training():
with tempfile.TemporaryDirectory() as tmpdir:
# This somewhat convoluted procedure is the same as in the
# Trainers. The reason for saving model to disk instead
# of directly to the dict as bytes is due to all callbacks
# following save to disk logic. GBDT models are small
# enough that IO should not be an issue.
model.save_model(os.path.join(tmpdir, MODEL_KEY))

checkpoint = Checkpoint.from_directory(tmpdir)
checkpoint = to_air_checkpoint(tmpdir, booster=model)
predictor = LightGBMPredictor.from_checkpoint(checkpoint)

data_batch = np.array([[1, 2], [3, 4], [5, 6]])
Expand Down
13 changes: 12 additions & 1 deletion python/ray/air/tests/test_sklearn_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import ray
import ray.cloudpickle as cpickle
from ray.air.predictors.integrations.sklearn import SklearnPredictor
from ray.air.predictors.integrations.sklearn import SklearnPredictor, to_air_checkpoint
from ray.air.preprocessor import Preprocessor
from ray.air.checkpoint import Checkpoint
from ray.air.constants import MODEL_KEY
Expand Down Expand Up @@ -139,6 +139,17 @@ def test_batch_prediction_with_set_cpus(ray_start_4_cpus):
)


def test_sklearn_predictor_no_training():
with tempfile.TemporaryDirectory() as tmpdir:
checkpoint = to_air_checkpoint(path=tmpdir, estimator=model)
batch_predictor = BatchPredictor.from_checkpoint(checkpoint, SklearnPredictor)
test_dataset = ray.data.from_pandas(
pd.DataFrame(dummy_data, columns=["A", "B"])
)
predictions = batch_predictor.predict(test_dataset)
assert len(predictions.to_pandas()) == 3


if __name__ == "__main__":
import sys

Expand Down
20 changes: 18 additions & 2 deletions python/ray/air/tests/test_tensorflow_predictor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from ray.air.predictors.integrations.tensorflow import TensorflowPredictor
from ray.air.preprocessor import Preprocessor
import ray
from ray.air.batch_predictor import BatchPredictor
from ray.air.checkpoint import Checkpoint
from ray.air.constants import PREPROCESSOR_KEY, MODEL_KEY
from ray.air.predictors.integrations.tensorflow import (
TensorflowPredictor,
to_air_checkpoint,
)
from ray.air.preprocessor import Preprocessor

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -93,6 +98,17 @@ def test_predict_dataframe_with_feature_columns():
assert predictions.to_numpy().flatten().tolist() == [1, 3]


def test_tensorflow_predictor_no_training():
model = build_model()
checkpoint = to_air_checkpoint(model)
batch_predictor = BatchPredictor.from_checkpoint(
checkpoint, TensorflowPredictor, model_definition=build_model
)
predict_dataset = ray.data.range(3)
predictions = batch_predictor.predict(predict_dataset)
assert predictions.count() == 3


if __name__ == "__main__":
import pytest
import sys
Expand Down
6 changes: 3 additions & 3 deletions python/ray/air/tests/test_torch_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pandas as pd
import torch

from ray.air.predictors.integrations.torch import TorchPredictor
from ray.air.predictors.integrations.torch import TorchPredictor, to_air_checkpoint
from ray.air.preprocessor import Preprocessor
from ray.air.checkpoint import Checkpoint
from ray.air.constants import PREPROCESSOR_KEY, MODEL_KEY
Expand Down Expand Up @@ -112,8 +112,8 @@ def test_predict_dataframe_with_feature_columns():
assert predictions.to_numpy().flatten().tolist() == [0.0, 0.0, 0.0]


def test_predict_array_from_checkpoint(model):
checkpoint = Checkpoint.from_dict({MODEL_KEY: model})
def test_predict_array_no_training(model):
checkpoint = to_air_checkpoint(model)
predictor = TorchPredictor.from_checkpoint(checkpoint)

data_batch = np.array([[1], [2], [3]])
Expand Down
Loading

0 comments on commit 76b34d4

Please sign in to comment.