-
Notifications
You must be signed in to change notification settings - Fork 280
/
Copy pathtest_sparkml_vector_assembler.py
53 lines (39 loc) · 1.79 KB
/
test_sparkml_vector_assembler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
"""
Tests Spark-ML VectorAssembler
"""
import unittest
import warnings
import numpy as np
import torch
from sklearn.datasets import load_iris
from packaging.version import Version, parse
from hummingbird.ml._utils import sparkml_installed, pandas_installed
from hummingbird.ml import convert
if sparkml_installed():
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.feature import VectorAssembler
spark = SparkSession.builder.master("local[*]").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()
sql = SQLContext(spark)
if pandas_installed():
import pandas as pd
class TestSparkMLVectorAssembler(unittest.TestCase):
# Test VectorAssembler
@unittest.skipIf((not sparkml_installed()) or (not pandas_installed()), reason="Spark-ML test requires pyspark and pandas")
def test_vectorassembler_converter(self):
iris = load_iris()
features = ["sepal_length", "sepal_width", "petal_length", "petal_width"]
pd_df = pd.DataFrame(data=np.c_[iris["data"], iris["target"]], columns=features + ["target"])[
["sepal_length", "sepal_width", "petal_length", "petal_width"]
]
df = sql.createDataFrame(pd_df)
model = VectorAssembler(inputCols=features, outputCol="features")
test_df = df
torch_model = convert(model, "torch", test_df)
self.assertTrue(torch_model is not None)
spark_output = model.transform(test_df).toPandas()
spark_output["features"] = spark_output["features"].map(lambda x: np.array(x.toArray()))
spark_output_np = spark_output["features"].to_numpy()
torch_output_np = torch_model.transform(pd_df)
np.testing.assert_allclose(np.vstack(spark_output_np), torch_output_np, rtol=1e-06, atol=1e-06)
if __name__ == "__main__":
unittest.main()