Skip to content

Commit

Permalink
fix: make vaex.ml work with Arrow data (vaexio#1167)
Browse files Browse the repository at this point in the history
  • Loading branch information
maartenbreddels authored Jan 19, 2021
1 parent 817ab89 commit ce1397a
Show file tree
Hide file tree
Showing 17 changed files with 208 additions and 152 deletions.
2 changes: 2 additions & 0 deletions packages/vaex-core/vaex/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@ def map(thread_index, i1, i2, ar):
ar = _to_string_sequence(ar)
if not transient:
assert ar is previous_ar.string_sequence
else:
ar = vaex.array_types.to_numpy(ar)
if np.ma.isMaskedArray(ar):
mask = np.ma.getmaskarray(ar)
sets[thread_index].update(ar, mask)
Expand Down
6 changes: 3 additions & 3 deletions packages/vaex-ml/vaex/ml/catboost.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class CatBoostModel(state.HasState):
default_value='IntersectingCountersAverage', help="Strategy for summing up models. Only used when training in batches. See the CatBoost documentation for more info.")

def __call__(self, *args):
data2d = np.vstack([arg.astype(np.float64) for arg in args]).T.copy()
data2d = np.stack([np.asarray(arg, np.float64) for arg in args], axis=1)
dmatrix = catboost.Pool(data2d, **self.pool_params)
return self.booster.predict(dmatrix, prediction_type=self.prediction_type)

Expand Down Expand Up @@ -106,13 +106,13 @@ def fit(self, df, evals=None, early_stopping_rounds=None, verbose_eval=None, plo
if evals is not None:
for i, item in enumerate(evals):
data = item[self.features].values
target_data = item[self.target].values
target_data = item[self.target].to_numpy()
evals[i] = catboost.Pool(data=data, label=target_data, **self.pool_params)

# This does the actual training/fitting of the catboost model
if self.batch_size is None:
data = df[self.features].values
target_data = df[self.target].values
target_data = df[self.target].to_numpy()
dtrain = catboost.Pool(data=data, label=target_data, **self.pool_params)
model = catboost.train(params=self.params,
dtrain=dtrain,
Expand Down
4 changes: 3 additions & 1 deletion packages/vaex-ml/vaex/ml/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def __call__(self, *blocks):

def _calculate_distances_squared(self, *blocks):
N = len(blocks[0]) # they are all the same length
blocks = [np.asarray(k) for k in blocks]
centroids = np.array(self.cluster_centers)
k = centroids.shape[0]
dimensions = centroids.shape[1]
Expand All @@ -122,7 +123,7 @@ def _calculate_classes(self, *blocks):

def generate_cluster_centers_random(self, dataframe, rng):
indices = rng.randint(0, len(dataframe), self.n_clusters)
return [[dataframe.evaluate(feature, i1=i, i2=i+1)[0] for feature in self.features] for i in indices]
return [[dataframe.evaluate(feature, i1=i, i2=i+1, array_type='python')[0] for feature in self.features] for i in indices]

def transform(self, dataframe):
'''
Expand Down Expand Up @@ -201,6 +202,7 @@ def map(*blocks): # this will be called with a chunk of the data
sumpos = np.zeros((runs, clusters, dimensions))
counts = np.zeros((runs, clusters))
inertia = np.zeros((runs))
blocks = [np.asarray(k) for k in blocks]
if True:
centroid_stats(centroids, counts, sumpos, inertia, *blocks)
else:
Expand Down
8 changes: 5 additions & 3 deletions packages/vaex-ml/vaex/ml/lightgbm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import base64
import tempfile

from vaex.strings import array

import lightgbm

import numpy as np
Expand Down Expand Up @@ -66,7 +68,7 @@ class LightGBMModel(state.HasState):
prediction_name = traitlets.Unicode(default_value='lightgbm_prediction', help='The name of the virtual column housing the predictions.')

def __call__(self, *args):
data2d = np.vstack([arg.astype(np.float64) for arg in args]).T.copy()
data2d = np.stack([np.asarray(arg, np.float64) for arg in args], axis=1)
return self.booster.predict(data2d)

def transform(self, df):
Expand Down Expand Up @@ -103,10 +105,10 @@ def fit(self, df, valid_sets=None, valid_names=None, early_stopping_rounds=None,
If *verbose_eval* is True then the evaluation metric on the validation set is printed at each boosting stage.
"""

dtrain = lightgbm.Dataset(df[self.features].values, df.evaluate(self.target))
dtrain = lightgbm.Dataset(df[self.features].values, df[self.target].to_numpy())
if valid_sets is not None:
for i, item in enumerate(valid_sets):
valid_sets[i] = lightgbm.Dataset(item[self.features].values, item[self.target].values)
valid_sets[i] = lightgbm.Dataset(item[self.features].values, item[self.target].to_numpy())
else:
valid_sets = ()

Expand Down
6 changes: 3 additions & 3 deletions packages/vaex-ml/vaex/ml/sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Predictor(state.HasState):
# raise AttributeError(f'The specified sklearn model does not have a {prediction_type} attribute')

def __call__(self, *args):
X = np.vstack([arg.astype(np.float64) for arg in args]).T.copy()
X = np.stack([np.asarray(arg, np.float64) for arg in args], axis=1)
if self.prediction_type == 'predict':
return self.model.predict(X)
elif self.prediction_type == 'predict_proba':
Expand Down Expand Up @@ -181,7 +181,7 @@ class IncrementalPredictor(state.HasState):
partial_fit_kwargs = traitlets.Dict(default_value={}, help='A dictionary of key word arguments to be passed on to the `fit_predict` method of the `model`.')

def __call__(self, *args):
X = np.vstack([arg.astype(np.float64) for arg in args]).T.copy()
X = np.stack([np.asarray(arg, np.float64) for arg in args], axis=1)
if self.prediction_type == 'predict':
return self.model.predict(X)
elif self.prediction_type == 'predict_proba':
Expand Down Expand Up @@ -232,7 +232,7 @@ def fit(self, df, progress=None):
expressions = self.features + [self.target]

for epoch in range(self.num_epochs):
for i1, i2, chunks in df.evaluate_iterator(expressions, chunk_size=self.batch_size):
for i1, i2, chunks in df.evaluate_iterator(expressions, chunk_size=self.batch_size, array_type='numpy'):
progressbar((n_samples * epoch + i1) / (self.num_epochs * n_samples))
X = np.array(chunks[:-1]).T # the most efficient way depends on the algorithm (row of column based access)
y = np.array(chunks[-1], copy=False)
Expand Down
6 changes: 3 additions & 3 deletions packages/vaex-ml/vaex/ml/xgboost.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class XGBoostModel(state.HasState):
prediction_name = traitlets.Unicode(default_value='xgboost_prediction', help='The name of the virtual column housing the predictions.')

def __call__(self, *args):
data2d = np.vstack([arg.astype(np.float64) for arg in args]).T.copy()
data2d = np.stack([np.asarray(arg, np.float64) for arg in args], axis=1)
dmatrix = xgboost.DMatrix(data2d)
return self.booster.predict(dmatrix)

Expand Down Expand Up @@ -97,13 +97,13 @@ def fit(self, df, evals=(), early_stopping_rounds=None, evals_result=None, verbo
'''

data = df[self.features].values
target_data = df.evaluate(self.target)
target_data = df[self.target].to_numpy()
dtrain = xgboost.DMatrix(data, target_data)
if evals is not None:
evals = [list(elem) for elem in evals]
for item in evals:
data = item[0][self.features].values
target_data = item[0].evaluate(self.target)
target_data = item[0][self.target].to_numpy()
item[0] = xgboost.DMatrix(data, target_data)
else:
evals = ()
Expand Down
16 changes: 13 additions & 3 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,18 +391,18 @@ def array_factory_arrow(request, array_factory_arrow_normal, array_factory_arrow
array_factory1 = array_factory
array_factory2 = array_factory

@pytest.fixture(params=['df_factory_numpy', 'df_factory_arrow'])#, 'df_factory_parquet'])
@pytest.fixture(params=['df_factory_numpy', 'df_factory_arrow'], scope='session')#, 'df_factory_parquet'])
def df_factory(request, df_factory_numpy, df_factory_arrow):#, df_factory_parquet):
named = dict(df_factory_numpy=df_factory_numpy, df_factory_arrow=df_factory_arrow)#, df_factory_parquet=df_factory_parquet)
return named[request.param]


@pytest.fixture
@pytest.fixture(scope='session')
def df_factory_numpy():
return vaex.from_arrays


@pytest.fixture
@pytest.fixture(scope='session')
def df_factory_arrow():
def create(**arrays):
def try_convert(ar):
Expand All @@ -422,3 +422,13 @@ def create(**arrays):
df.export(path)
return vaex.open(path)
return create


@pytest.fixture(scope='session')
def df_example_original():
return vaex.example()


@pytest.fixture(scope='session')
def df_example(df_example_original, df_factory):
return df_factory(**df_example_original.to_dict())
18 changes: 9 additions & 9 deletions tests/ml/catboost_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
}


def test_catboost():
ds = vaex.ml.datasets.load_iris()
def test_catboost(df_iris):
ds = df_iris
ds_train, ds_test = ds.ml.train_test_split(test_size=0.2, verbose=False)
features = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
booster = vaex.ml.catboost.CatBoostModel(num_boost_round=10,
Expand Down Expand Up @@ -105,12 +105,12 @@ def test_catboost_batch_training():
assert list(weights_booster.booster.get_feature_importance()) != list(batch_booster.booster.get_feature_importance())


def test_catboost_numerical_validation():
ds = vaex.ml.datasets.load_iris()
def test_catboost_numerical_validation(df_iris):
ds = df_iris
features = ['sepal_width', 'petal_length', 'sepal_length', 'petal_width']

# Vanilla catboost
dtrain = cb.Pool(ds[features].values, label=ds.class_.values)
dtrain = cb.Pool(ds[features].values, label=ds.class_.to_numpy())
cb_bst = cb.train(params=params_multiclass, dtrain=dtrain, num_boost_round=3)
cb_pred = cb_bst.predict(dtrain, prediction_type='Probability')

Expand All @@ -124,8 +124,8 @@ def test_catboost_numerical_validation():
err_msg='The predictions of vaex.ml.catboost do not match those of pure catboost')


def test_lightgbm_serialize(tmpdir):
ds = vaex.ml.datasets.load_iris()
def test_lightgbm_serialize(tmpdir, df_iris):
ds = df_iris
features = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
target = 'class_'

Expand Down Expand Up @@ -161,9 +161,9 @@ def test_catboost_validation_set():
assert booster.booster.best_iteration_ is not None


def test_catboost_pipeline():
def test_catboost_pipeline(df_example):
# read data
ds = vaex.example()
ds = df_example
# train test splot
train, test = ds.ml.train_test_split(verbose=False)
# add virtual columns
Expand Down
4 changes: 2 additions & 2 deletions tests/ml/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
features = ['petal_width/2', 'petal_length/5']


def test_serialize():
df = vaex.ml.datasets.load_iris()
def test_serialize(df_iris):
df = df_iris
kmeans = vaex.ml.cluster.KMeans(n_clusters=3, features=features, init='random', random_state=42, max_iter=1)
kmeans.fit(df)

Expand Down
36 changes: 36 additions & 0 deletions tests/ml/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import pytest
import vaex


@pytest.fixture(scope='session')
def df_iris_original():
return vaex.ml.datasets.load_iris()


@pytest.fixture(scope='session')
def df_iris(df_iris_original, df_factory):
return df_factory(**df_iris_original.to_dict())


@pytest.fixture(scope='session')
def df_iris_1e5_original():
return vaex.ml.datasets.load_iris_1e5()


@pytest.fixture(scope='session')
def df_iris_1e5(df_iris_1e5_original, df_factory):
return df_factory(**df_iris_1e5_original.to_dict())




@pytest.fixture(scope='session')
def df_titanic_original():
return vaex.ml.datasets.load_titanic()


@pytest.fixture(scope='session')
def df_titanic(df_titanic_original, df_factory):
return df_factory(**df_titanic_original.to_dict())


28 changes: 13 additions & 15 deletions tests/ml/lightgbm_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@


@pytest.mark.skipif(sys.version_info < (3, 6), reason="requires python3.6 or higher")
def test_light_gbm_virtual_columns():
ds = vaex.ml.datasets.load_iris()
def test_light_gbm_virtual_columns(df_iris):
ds = df_iris
ds['x'] = ds.sepal_length * 1
ds['y'] = ds.sepal_width * 1
ds['w'] = ds.petal_length * 1
Expand All @@ -53,8 +53,8 @@ def test_light_gbm_virtual_columns():


@pytest.mark.skipif(sys.version_info < (3, 6), reason="requires python3.6 or higher")
def test_lightgbm():
ds = vaex.ml.datasets.load_iris()
def test_lightgbm(df_iris):
ds = df_iris
ds_train, ds_test = ds.ml.train_test_split(test_size=0.2, verbose=False)
features = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
features = _ensure_strings_from_expressions(features)
Expand All @@ -72,8 +72,8 @@ def test_lightgbm():


@pytest.mark.skipif(sys.version_info < (3, 6), reason="requires python3.6 or higher")
def test_lightgbm_serialize(tmpdir):
ds = vaex.ml.datasets.load_iris()
def test_lightgbm_serialize(tmpdir, df_iris):
ds = df_iris
features = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
target = 'class_'

Expand All @@ -90,13 +90,13 @@ def test_lightgbm_serialize(tmpdir):


@pytest.mark.skipif(sys.version_info < (3, 6), reason="requires python3.6 or higher")
def test_lightgbm_numerical_validation():
ds = vaex.ml.datasets.load_iris()
def test_lightgbm_numerical_validation(df_iris):
ds = df_iris
features = ['sepal_width', 'petal_length', 'sepal_length', 'petal_width']

# Vanilla lightgbm
X = np.array(ds[features])
dtrain = lgb.Dataset(X, label=ds.class_.values)
dtrain = lgb.Dataset(X, label=ds.class_.to_numpy())
lgb_bst = lgb.train(params, dtrain, 3)
lgb_pred = lgb_bst.predict(X)

Expand All @@ -109,9 +109,8 @@ def test_lightgbm_numerical_validation():


@pytest.mark.skipif(sys.version_info < (3, 6), reason="requires python3.6 or higher")
def test_lightgbm_validation_set():
# read data
ds = vaex.example()
def test_lightgbm_validation_set(df_example):
ds = df_example
# Train and test split
train, test = ds.ml.train_test_split(verbose=False)
# Define the training featuress
Expand All @@ -134,9 +133,8 @@ def test_lightgbm_validation_set():


@pytest.mark.skipif(sys.version_info < (3, 6), reason="requires python3.6 or higher")
def test_lightgbm_pipeline():
# read data
ds = vaex.example()
def test_lightgbm_pipeline(df_example):
ds = df_example
# train test splot
train, test = ds.ml.train_test_split(verbose=False)
# add virtual columns
Expand Down
8 changes: 4 additions & 4 deletions tests/ml/linear_model_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@


@pytest.mark.skipif(sys.version_info < (3, 6), reason="requires python3.6 or higher")
def test_linear_model():
ds = vaex.ml.datasets.load_iris()
def test_linear_model(df_iris):
ds = df_iris
m1 = vaex.ml.linear_model.LinearRegression(features=['petal_width'], binned=False)
m1.fit(ds, 'petal_length')
# print(m.coef_, m.intercept_)
Expand All @@ -23,8 +23,8 @@ def test_linear_model():

@pytest.mark.skipif(sys.version_info < (3, 6), reason="requires python3.6 or higher")
@pytest.mark.skip(reason="This will fail: produces wrong answer")
def test_logit():
ds = vaex.ml.datasets.load_iris()
def test_logit(df_iris):
ds = df_iris
ds.categorize(ds.class_, labels='0 1 2 3'.split(), inplace=True)
m1 = vaex.ml.linear_model.LogisticRegression(features=features, binned=False)
m1.fit(ds, 'class_')
Expand Down
Loading

0 comments on commit ce1397a

Please sign in to comment.