Skip to content

Commit

Permalink
[air/data] Concatenator preprocessor (ray-project#26526)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardliaw authored Jul 14, 2022
1 parent a322ac4 commit a0ce3c1
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 58 deletions.
41 changes: 12 additions & 29 deletions doc/source/ray-air/examples/pytorch_tabular_starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,13 @@
import numpy as np
import pandas as pd

from ray.data.preprocessors import BatchMapper, Chain

# Get the training data schema
schema_order = [k for k in train_dataset.schema().names if k != "target"]


def concat_for_tensor(dataframe):
# Concatenate the dataframe into a single tensor.
from ray.data.extensions import TensorArray

result = {}
input_data = dataframe[schema_order].to_numpy(dtype=np.float32)
result["input"] = TensorArray(input_data)
if "target" in dataframe:
target_data = dataframe["target"].to_numpy(dtype=np.float32)
result["target"] = TensorArray(target_data)
return pd.DataFrame(result)

from ray.data.preprocessors import Concatenator, Chain

# Chain the preprocessors together.
preprocessor = Chain(preprocessor, BatchMapper(concat_for_tensor))
preprocessor = Chain(
preprocessor,
Concatenator(exclude=["target"], dtype=np.float32),
)
# __air_pytorch_preprocess_end__


Expand Down Expand Up @@ -92,9 +78,12 @@ def to_tensor_iterator(dataset, batch_size):
data_iterator = dataset.iter_batches(
batch_format="numpy", batch_size=batch_size
)

for d in data_iterator:
yield torch.Tensor(d["input"]).float(), torch.Tensor(d["target"]).float()
# "concat_out" is the output column of the Concatenator.
yield (
torch.Tensor(d["concat_out"]).float(),
torch.Tensor(d["target"]).float(),
)

# Create model.
model = create_model(num_features)
Expand All @@ -114,24 +103,18 @@ def to_tensor_iterator(dataset, batch_size):
session.report({"loss": loss}, checkpoint=to_air_checkpoint(model))


num_features = len(schema_order)
num_features = len(train_dataset.schema().names) - 1

trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
train_loop_config={
# Training batch size
"batch_size": 128,
# Number of epochs to train each task for.
"num_epochs": 20,
# Number of columns of datset
"num_features": num_features,
# Optimizer args.
"lr": 0.001,
},
scaling_config={
# Number of workers to use for data parallelism.
"num_workers": 3,
# Whether to use GPU acceleration.
"num_workers": 3, # Number of data parallel training workers.
"use_gpu": False,
# trainer_resources=0 so that the example works on Colab.
"trainer_resources": {"CPU": 0},
Expand Down
37 changes: 9 additions & 28 deletions doc/source/ray-air/examples/tf_tabular_starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,13 @@
import numpy as np
import pandas as pd

from ray.data.preprocessors import BatchMapper, Chain

# Get the training data schema
schema_order = [k for k in train_dataset.schema().names if k != "target"]


def concat_for_tensor(dataframe):
# Concatenate the dataframe into a single tensor.
from ray.data.extensions import TensorArray

result = {}
input_data = dataframe[schema_order].to_numpy(dtype=np.float32)
result["input"] = TensorArray(input_data)
if "target" in dataframe:
target_data = dataframe["target"].to_numpy(dtype=np.float32)
result["target"] = TensorArray(target_data)
return pd.DataFrame(result)

from ray.data.preprocessors import Concatenator, Chain

# Chain the preprocessors together.
preprocessor = Chain(preprocessor, BatchMapper(concat_for_tensor))
preprocessor = Chain(
preprocessor,
Concatenator(exclude=["target"], dtype=np.float32),
)
# __air_tf_preprocess_end__


Expand Down Expand Up @@ -91,7 +77,8 @@ def to_tensor_iterator():
)
for d in data_iterator:
yield (
tf.convert_to_tensor(d["input"], dtype=tf.float32),
# "concat_out" is the output column of the Concatenator.
tf.convert_to_tensor(d["concat_out"], dtype=tf.float32),
tf.convert_to_tensor(d["target"], dtype=tf.float32),
)

Expand Down Expand Up @@ -140,24 +127,18 @@ def train_loop_per_worker(config):
return results


num_features = len(schema_order)
num_features = len(train_dataset.schema().names) - 1

trainer = TensorflowTrainer(
train_loop_per_worker=train_loop_per_worker,
train_loop_config={
# Training batch size
"batch_size": 128,
# Number of epochs to train each task for.
"num_epochs": 50,
# Number of columns of datset
"num_features": num_features,
# Optimizer args.
"lr": 0.0001,
},
scaling_config={
# Number of workers to use for data parallelism.
"num_workers": 2,
# Whether to use GPU acceleration.
"num_workers": 2, # Number of data parallel training workers
"use_gpu": False,
# trainer_resources=0 so that the example works on Colab.
"trainer_resources": {"CPU": 0},
Expand Down
2 changes: 1 addition & 1 deletion doc/source/ray-air/preprocessors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ Ray AIR provides a handful of ``Preprocessor``\s that you can use out of the box
.. tabbed:: Tabular

#. :class:`Categorizer <ray.data.preprocessors.Categorizer>`
#. :class:`Concatenator <ray.data.preprocessors.Concatenator>`
#. :class:`FeatureHasher <ray.data.preprocessors.FeatureHasher>`
#. :class:`LabelEncoder <ray.data.preprocessors.LabelEncoder>`
#. :class:`MaxAbsScaler <ray.data.preprocessors.MaxAbsScaler>`
Expand All @@ -146,7 +147,6 @@ Ray AIR provides a handful of ``Preprocessor``\s that you can use out of the box
#. :class:`RobustScaler <ray.data.preprocessors.RobustScaler>`
#. :class:`SimpleImputer <ray.data.preprocessors.SimpleImputer>`
#. :class:`StandardScaler <ray.data.preprocessors.StandardScaler>`
#. :class:`SimpleImputer <ray.data.preprocessors.SimpleImputer>`

.. tabbed:: Text

Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/preprocessors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
MaxAbsScaler,
RobustScaler,
)
from ray.data.preprocessors.concatenator import Concatenator
from ray.data.preprocessors.tokenizer import Tokenizer
from ray.data.preprocessors.transformer import PowerTransformer
from ray.data.preprocessors.vectorizer import CountVectorizer, HashingVectorizer
Expand All @@ -40,5 +41,6 @@
"RobustScaler",
"SimpleImputer",
"StandardScaler",
"Concatenator",
"Tokenizer",
]
102 changes: 102 additions & 0 deletions python/ray/data/preprocessors/concatenator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from typing import List, Optional
import numpy as np
import pandas as pd

from ray.data.extensions import TensorArray
from ray.data.preprocessor import Preprocessor


class Concatenator(Preprocessor):
"""Creates a tensor column via concatenation.
A tensor column is a column consisting of ndarrays as elements.
The tensor column will be generated from the provided list
of columns and will take on the provided "output" label.
Columns that are included in the concatenation
will be dropped, while columns that are not included in concatenation
will be preserved.
Example:
>>> import ray
>>> import pandas as pd
>>> from ray.data.preprocessors import Concatenator
>>> df = pd.DataFrame({"a": [1, 2, 3, 4], "b": [1, 2, 3, 4],})
>>> ds = ray.data.from_pandas(df) # doctest: +SKIP
>>> prep = Concatenator(output_column_name="c") # doctest: +SKIP
>>> new_ds = prep.transform(ds) # doctest: +SKIP
>>> assert set(new_ds.take(1)[0]) == {"c"} # doctest: +SKIP
Args:
output_column_name: output_column_name is a string that represents the
name of the outputted, concatenated tensor column. Defaults to
"concat_out".
include: A list of column names to be included for
concatenation. If None, then all columns will be included.
Included columns will be dropped after concatenation.
exclude: List of column names to be excluded
from concatenation. Exclude takes precedence over include.
dtype: Optional. The dtype to convert the output column array to.
raise_if_missing: Optional. If True, an error will be raised if any
of the columns to in 'include' or 'exclude' are
not present in the dataset schema.
Raises:
ValueError if `raise_if_missing=True` and any column name in
`include` or `exclude` does not exist in the dataset columns.
"""

_is_fittable = False

def __init__(
self,
output_column_name: str = "concat_out",
include: Optional[List[str]] = None,
exclude: Optional[List[str]] = None,
dtype: Optional[np.dtype] = None,
raise_if_missing: bool = False,
):
self.output_column_name = output_column_name
self.included_columns = include
self.excluded_columns = exclude or []
self.dtype = dtype
self.raise_if_missing = raise_if_missing

def _validate(self, df: pd.DataFrame):
total_columns = set(df)
if self.excluded_columns and self.raise_if_missing:
missing_columns = set(self.excluded_columns) - total_columns.intersection(
set(self.excluded_columns)
)
if missing_columns:
raise ValueError(
f"Missing columns specified in 'exclude': {missing_columns}"
)
if self.included_columns and self.raise_if_missing:
missing_columns = set(self.included_columns) - total_columns.intersection(
set(self.included_columns)
)
if missing_columns:
raise ValueError(
f"Missing columns specified in 'include': {missing_columns}"
)

def _transform_pandas(self, df: pd.DataFrame):
self._validate(df)

included_columns = set(df)
if self.included_columns: # subset of included columns
included_columns = set(self.included_columns)

columns_to_concat = list(included_columns - set(self.excluded_columns))
concatenated = df[columns_to_concat].to_numpy(dtype=self.dtype)
df = df.drop(columns=columns_to_concat)
df[self.output_column_name] = TensorArray(concatenated)
return df

def __repr__(self):
return (
f"Concatenator(output_column_name={self.output_column_name}, "
f"include={self.included_columns}, "
f"exclude={self.excluded_columns}, "
f"dtype={self.dtype})"
)
55 changes: 55 additions & 0 deletions python/ray/data/tests/test_preprocessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from ray.data.preprocessors.hasher import FeatureHasher
from ray.data.preprocessors.normalizer import Normalizer
from ray.data.preprocessors.scaler import MaxAbsScaler, RobustScaler
from ray.data.preprocessors.concatenator import Concatenator
from ray.data.preprocessors.tokenizer import Tokenizer
from ray.data.preprocessors.transformer import PowerTransformer
from ray.data.preprocessors.utils import simple_hash, simple_split_tokenizer
Expand Down Expand Up @@ -1244,6 +1245,60 @@ def test_power_transformer():
assert out_df.equals(expected_df)


def test_concatenator():
"""Tests basic Concatenator functionality."""
df = pd.DataFrame(
{
"a": [1, 2, 3, 4],
"b": [1, 2, 3, 4],
}
)
ds = ray.data.from_pandas(df)
prep = Concatenator(output_column_name="c")
new_ds = prep.transform(ds)
for i, row in enumerate(new_ds.take()):
assert np.array_equal(row["c"].to_numpy(), np.array([i + 1, i + 1]))

# Test repr
assert "c" in prep.__repr__()
assert "include" in prep.__repr__()
assert "exclude" in prep.__repr__()

df = pd.DataFrame({"a": [1, 2, 3, 4]})
ds = ray.data.from_pandas(df)
prep = Concatenator(output_column_name="c", exclude=["b"], raise_if_missing=True)

with pytest.raises(ValueError, match="'b'"):
prep.transform(ds)

# Test exclude working
df = pd.DataFrame({"a": [1, 2, 3, 4], "b": [2, 3, 4, 5], "c": [3, 4, 5, 6]})
ds = ray.data.from_pandas(df)
prep = Concatenator(exclude=["b"])
new_ds = prep.transform(ds)
for i, row in enumerate(new_ds.take()):
assert set(row) == {"concat_out", "b"}

# Test include working
prep = Concatenator(include=["a", "b"])
new_ds = prep.transform(ds)
for i, row in enumerate(new_ds.take()):
assert set(row) == {"concat_out", "c"}

# Test exclude overrides include
prep = Concatenator(include=["a", "b"], exclude=["b"])
new_ds = prep.transform(ds)
for i, row in enumerate(new_ds.take()):
assert set(row) == {"concat_out", "b", "c"}

# check it works with string types
df = pd.DataFrame({"a": ["string", "string2", "string3"]})
ds = ray.data.from_pandas(df)
prep = Concatenator(output_column_name="huh")
new_ds = prep.transform(ds)
assert "huh" in set(new_ds.schema().names)


def test_tokenizer():
"""Tests basic Tokenizer functionality."""

Expand Down

0 comments on commit a0ce3c1

Please sign in to comment.