Skip to content

Commit

Permalink
[Train][release_test] Update release tests for xgboost and lightgbm t…
Browse files Browse the repository at this point in the history
…rainer to V2 (ray-project#49223)

This PR upgrades the release tests for `XGBoostTrainer` and
`LightGBMTrainer` from a previous version to a recent version. Both
trainers are now subclass of `DataParallelTrainer`, similar to
`TorchTrainer`


Signed-off-by: Hongpeng Guo <hpguo@anyscale.com>
  • Loading branch information
hongpeng-guo authored Dec 17, 2024
1 parent fe75957 commit f1b27b5
Showing 1 changed file with 72 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
from typing import Dict

import xgboost as xgb
import lightgbm as lgb

import ray
from ray import data
from ray.train.lightgbm import LightGBMTrainer
from ray.train.xgboost import XGBoostTrainer
from ray.train.lightgbm.v2 import LightGBMTrainer
from ray.train.xgboost.v2 import XGBoostTrainer
from ray.train.xgboost import RayTrainReportCallback as XGBoostReportCallback
from ray.train.lightgbm import RayTrainReportCallback as LightGBMReportCallback
from ray.train import RunConfig, ScalingConfig

_TRAINING_TIME_THRESHOLD = 600
Expand Down Expand Up @@ -39,8 +42,8 @@


class BasePredictor:
def __init__(self, trainer_cls, result: ray.train.Result):
self.model = trainer_cls.get_model(result.checkpoint)
def __init__(self, report_callback_cls, result: ray.train.Result):
self.model = report_callback_cls.get_model(result.checkpoint)

def __call__(self, data):
raise NotImplementedError
Expand All @@ -57,21 +60,74 @@ def __call__(self, data: pd.DataFrame) -> Dict[str, np.ndarray]:
return {"predictions": self.model.predict(data)}


def xgboost_train_loop_function(config: Dict):
# 1. Get the dataset shard for the worker and convert to a `xgboost.DMatrix`
train_ds_iter = ray.train.get_dataset_shard("train")
train_df = train_ds_iter.materialize().to_pandas()

label_column, params = config["label_column"], config["params"]
train_X, train_y = train_df.drop(label_column, axis=1), train_df[label_column]

dtrain = xgb.DMatrix(train_X, label=train_y)

# 2. Do distributed data-parallel training.
# Ray Train sets up the necessary coordinator processes and
# environment variables for your workers to communicate with each other.
report_callback = config["report_callback_cls"]
xgb.train(
params,
dtrain=dtrain,
num_boost_round=10,
callbacks=[report_callback()],
)


def lightgbm_train_loop_function(config: Dict):
# 1. Get the dataset shard for the worker and convert to a DataFrame
train_ds_iter = ray.train.get_dataset_shard("train")
train_df = train_ds_iter.materialize().to_pandas()

label_column, params = config["label_column"], config["params"]
train_X, train_y = train_df.drop(label_column, axis=1), train_df[label_column]
train_set = lgb.Dataset(train_X, label=train_y)

# 2. Do distributed data-parallel training.
# Ray Train sets up the necessary coordinator processes and
# environment variables for your workers to communicate with each other.
report_callback = config["report_callback_cls"]
lgb.train(
params,
train_set=train_set,
num_boost_round=10,
callbacks=[report_callback()],
)


_FRAMEWORK_PARAMS = {
"xgboost": {
"trainer_cls": XGBoostTrainer,
"predictor_cls": XGBoostPredictor,
"params": {
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
"train_loop_function": xgboost_train_loop_function,
"train_loop_config": {
"params": {
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
},
"label_column": "labels",
"report_callback_cls": XGBoostReportCallback,
},
},
"lightgbm": {
"trainer_cls": LightGBMTrainer,
"predictor_cls": LightGBMPredictor,
"params": {
"objective": "binary",
"metric": ["binary_logloss", "binary_error"],
"train_loop_function": lightgbm_train_loop_function,
"train_loop_config": {
"params": {
"objective": "binary",
"metric": ["binary_logloss", "binary_error"],
},
"label_column": "labels",
"report_callback_cls": LightGBMReportCallback,
},
},
}
Expand All @@ -84,15 +140,15 @@ def train(
framework_params = _FRAMEWORK_PARAMS[framework]

trainer_cls = framework_params["trainer_cls"]
framework_train_loop_fn = framework_params["train_loop_function"]

trainer = trainer_cls(
params=framework_params["params"],
train_loop_per_worker=framework_train_loop_fn,
train_loop_config=framework_params["train_loop_config"],
scaling_config=ScalingConfig(
num_workers=num_workers,
resources_per_worker={"CPU": cpus_per_worker},
trainer_resources={"CPU": 0},
),
label_column="labels",
datasets={"train": ds},
run_config=RunConfig(
storage_path="/mnt/cluster_storage", name=f"{framework}_benchmark"
Expand All @@ -117,7 +173,9 @@ def predict(framework: str, result: ray.train.Result, data_path: str):
batch_size=8192,
concurrency=concurrency,
fn_constructor_kwargs={
"trainer_cls": framework_params["trainer_cls"],
"report_callback_cls": framework_params["train_loop_config"][
"report_callback_cls"
],
"result": result,
},
batch_format="pandas",
Expand Down

0 comments on commit f1b27b5

Please sign in to comment.