Skip to content

Commit 398a4a1

Browse files
kunalgosardevin-petersohn
authored andcommitted
Adding reproducible benchmarks (#26)
* add read_csv benchmarks * add to_csv benchmark * set custom log location * bug fix * Adding an arithmetic benchmark * adding in scripts to run benchmarks * cleaning log file name * specify variable number of runs * adding future imports * Join and merge benchmark * Adding join and merge benchmark * Updating run file * Addressing comments * ray.wait * clean up join benchmark * add timeout to benchmarks and fix join * small fix * add pandas benchmarking scripts * fixing pandas benchmarks * small fix * Add rw_benchmark and parsing * add groupby benchmarks * revert gitignore * remove duplicate file * Add plots and fix flake (#2) * Add some plotting * Fix flake8 * small fixes * rename file * small fixes * Fix indexing with _blk_to_series * rename log file * bug fix * ray.wait on groupby * create plots * Add transpose operation * remove accidently commited file * modify benchmarks * cleanup * update gitignore * cleanup * remove parsing notebook * revert blocks to series changes
1 parent e11135b commit 398a4a1

17 files changed

+717
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ venv.bak/
113113
.idea/**/usage.statistics.xml
114114
.idea/**/dictionaries
115115
.idea/**/shelf
116+
*.DS_Store
116117

117118
# Sensitive or high-churn files
118119
.idea/**/dataSources/

benchmarks/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
*.csv
2+
*.png

benchmarks/arithmetic_benchmark.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from __future__ import absolute_import
2+
from __future__ import division
3+
from __future__ import print_function
4+
5+
import logging
6+
import argparse
7+
import ray
8+
import os
9+
import modin.pandas as pd
10+
11+
from utils import time_logger
12+
13+
14+
parser = argparse.ArgumentParser(description='arithmetic benchmark')
15+
parser.add_argument('--path', dest='path', help='path to the csv data file')
16+
parser.add_argument('--logfile', dest='logfile', help='path to the log file')
17+
args = parser.parse_args()
18+
file = args.path
19+
file_size = os.path.getsize(file)
20+
21+
if not os.path.exists(os.path.split(args.logfile)[0]):
22+
os.makedirs(os.path.split(args.logfile)[0])
23+
24+
logging.basicConfig(filename=args.logfile, level=logging.INFO)
25+
26+
df = pd.read_csv(file)
27+
blocks = df._block_partitions.flatten().tolist()
28+
ray.wait(blocks, len(blocks))
29+
30+
with time_logger("Transpose: {}; Size: {} bytes".format(file, file_size)):
31+
blocks = df.T.flatten().tolist()
32+
ray.wait(blocks, len(blocks))
33+
34+
with time_logger("Sum on axis=0: {}; Size: {} bytes".format(file, file_size)):
35+
df.sum()
36+
37+
with time_logger("Sum on axis=1: {}; Size: {} bytes".format(file, file_size)):
38+
df.sum(axis=1)
39+
40+
with time_logger("Median on axis=0: {}; Size: {} bytes"
41+
.format(file, file_size)):
42+
df.median()
43+
44+
with time_logger("Median on axis=1: {}; Size: {} bytes"
45+
.format(file, file_size)):
46+
df.median(axis=1)
47+
48+
with time_logger("nunique on axis=0: {}; Size: {} bytes"
49+
.format(file, file_size)):
50+
df.nunique()
51+
52+
with time_logger("nunique on axis=1: {}; Size: {} bytes"
53+
.format(file, file_size)):
54+
df.nunique(axis=1)
55+
56+
with time_logger("Sum UDF on axis=0: {}; Size: {} bytes"
57+
.format(file, file_size)):
58+
df.apply(lambda df: df.sum())
59+
60+
with time_logger("Sum UDF on axis=1: {}; Size: {} bytes"
61+
.format(file, file_size)):
62+
df.apply(lambda df: df.sum(), axis=1)

benchmarks/df_op_benchmark.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from __future__ import absolute_import
2+
from __future__ import division
3+
from __future__ import print_function
4+
5+
import logging
6+
import argparse
7+
import ray
8+
import os
9+
import modin.pandas as pd
10+
11+
from utils import time_logger
12+
import numpy as np
13+
14+
parser = argparse.ArgumentParser(description='arithmetic benchmark')
15+
parser.add_argument('--path', dest='path', help='path to the csv data file')
16+
parser.add_argument('--logfile', dest='logfile', help='path to the log file')
17+
args = parser.parse_args()
18+
file = args.path
19+
file_size = os.path.getsize(file)
20+
21+
logging.basicConfig(filename=args.logfile, level=logging.INFO)
22+
23+
df = pd.read_csv(file)
24+
blocks = df._block_partitions.flatten().tolist()
25+
ray.wait(blocks, len(blocks))
26+
27+
num_rows, num_cols = df.shape
28+
new_row = np.random.randint(0, 100, size=num_cols)
29+
new_col = np.random.randint(0, 100, size=num_rows)
30+
31+
32+
def rand_row_loc():
33+
return np.random.randint(0, num_rows)
34+
35+
36+
def rand_col_loc():
37+
return np.random.randint(0, num_cols)
38+
39+
40+
# row/col r/w
41+
with time_logger("read a column: {}; Size: {} bytes".format(file, file_size)):
42+
df.iloc[:, rand_col_loc()]
43+
44+
with time_logger("read a row: {}; Size: {} bytes".format(file, file_size)):
45+
df.iloc[rand_row_loc(), :]
46+
47+
with time_logger("write a column: {}; Size: {} bytes".format(file, file_size)):
48+
df.iloc[:, rand_col_loc()] = new_col
49+
50+
with time_logger("write a row: {}; Size: {} bytes".format(file, file_size)):
51+
df.iloc[rand_row_loc(), :] = new_row
52+
53+
# element r/w
54+
55+
with time_logger("read an element: {}; Size: {} bytes".format(file,
56+
file_size)):
57+
df.iloc[rand_row_loc(), rand_col_loc()]
58+
59+
with time_logger("write an element: {}; Size: {} bytes".format(
60+
file, file_size)):
61+
df.iloc[rand_row_loc(), rand_col_loc()] = np.random.randint(0, 100)
62+
63+
# appending
64+
with time_logger("append a row: {}; Size: {} bytes".format(file, file_size)):
65+
df.append(pd.Series(new_row), ignore_index=True)
66+
67+
with time_logger("append a column: {}; Size: {} bytes".format(file,
68+
file_size)):
69+
df['new'] = new_col

benchmarks/generate_data.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from __future__ import absolute_import
2+
from __future__ import division
3+
from __future__ import print_function
4+
5+
import modin.pandas as pd
6+
import numpy as np
7+
import os
8+
9+
num_rows = [100, 10000, 100000, 150000, 200000, 350000, 500000]
10+
num_cols = [1000]
11+
12+
path_to_data = "benchmarks/data/"
13+
if not os.path.exists(path_to_data):
14+
os.makedirs(path_to_data)
15+
16+
for r in num_rows:
17+
for c in num_cols:
18+
df = pd.DataFrame(np.random.randint(0, 100, size=(r, c)))
19+
df.to_csv(path_to_data + "test-data-{}-{}.csv".format(r, c))
20+
21+
# Files for multi df tests
22+
num_rows = [100, 1000, 100000, 1000000]
23+
num_cols = [1000]
24+
25+
path_to_data = "benchmarks/data/multi/"
26+
if not os.path.exists(path_to_data):
27+
os.makedirs(path_to_data)
28+
29+
for r in num_rows:
30+
for c in num_cols:
31+
df = pd.DataFrame(np.random.randint(0, 100, size=(r, c)))
32+
df.to_csv(path_to_data + "test-data-{}-{}.csv".format(r, c))

benchmarks/groupby_benchmark.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from __future__ import absolute_import
2+
from __future__ import division
3+
from __future__ import print_function
4+
5+
import logging
6+
import argparse
7+
import ray
8+
import os
9+
import modin.pandas as pd
10+
11+
from utils import time_logger
12+
13+
14+
parser = argparse.ArgumentParser(description='groupby benchmark')
15+
parser.add_argument('--path', dest='path', help='path to the csv data file')
16+
parser.add_argument('--logfile', dest='logfile', help='path to the log file')
17+
args = parser.parse_args()
18+
file = args.path
19+
file_size = os.path.getsize(file)
20+
21+
if not os.path.exists(os.path.split(args.logfile)[0]):
22+
os.makedirs(os.path.split(args.logfile)[0])
23+
24+
logging.basicConfig(filename=args.logfile, level=logging.INFO)
25+
26+
df = pd.read_csv(file)
27+
blocks = df._block_partitions.flatten().tolist()
28+
ray.wait(blocks, len(blocks))
29+
30+
with time_logger("Groupby + sum aggregation on axis=0: {}; Size: {} bytes"
31+
.format(file, file_size)):
32+
df_groupby = df.groupby('1')
33+
blocks = df_groupby.sum()._block_partitions.flatten().tolist()
34+
ray.wait(blocks, len(blocks))
35+
36+
with time_logger("Groupby mean on axis=0: {}; Size: {} bytes"
37+
.format(file, file_size)):
38+
blocks = df_groupby.mean()._block_partitions.flatten().tolist()
39+
ray.wait(blocks, len(blocks))

benchmarks/io_benchmark.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from __future__ import absolute_import
2+
from __future__ import division
3+
from __future__ import print_function
4+
5+
import logging
6+
import argparse
7+
import os
8+
import ray
9+
import modin.pandas as pd
10+
11+
from utils import time_logger
12+
13+
14+
parser = argparse.ArgumentParser(description='read_csv benchmark')
15+
parser.add_argument('--path', dest='path', help='path to the csv file')
16+
parser.add_argument('--logfile', dest='logfile', help='path to the log file')
17+
args = parser.parse_args()
18+
file = args.path
19+
file_size = os.path.getsize(file)
20+
21+
if not os.path.exists(os.path.split(args.logfile)[0]):
22+
os.makedirs(os.path.split(args.logfile)[0])
23+
24+
logging.basicConfig(filename=args.logfile, level=logging.INFO)
25+
26+
with time_logger("Read csv file: {}; Size: {} bytes".format(file, file_size)):
27+
df = pd.read_csv(file)
28+
blocks = df._block_partitions.flatten().tolist()
29+
ray.wait(blocks, len(blocks))

benchmarks/join_merge_benchmark.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from __future__ import absolute_import
2+
from __future__ import division
3+
from __future__ import print_function
4+
5+
import logging
6+
import argparse
7+
import ray
8+
import os
9+
import modin.pandas as pd
10+
11+
from utils import time_logger
12+
13+
14+
parser = argparse.ArgumentParser(description='arithmetic benchmark')
15+
parser.add_argument('--left', dest='left', help='path to the left csv data '
16+
'file')
17+
parser.add_argument('--right', dest='right', help='path to the right csv data '
18+
'file')
19+
parser.add_argument('--logfile', dest='logfile', help='path to the log file')
20+
args = parser.parse_args()
21+
file_left = args.left
22+
file_size_left = os.path.getsize(file_left)
23+
24+
file_right = args.right
25+
file_size_right = os.path.getsize(file_right)
26+
27+
if not os.path.exists(os.path.split(args.logfile)[0]):
28+
os.makedirs(os.path.split(args.logfile)[0])
29+
30+
logging.basicConfig(filename=args.logfile, level=logging.INFO)
31+
32+
df_left = pd.read_csv(file_left)
33+
df_right = pd.read_csv(file_right)
34+
35+
blocks = df_left._block_partitions.flatten().tolist()
36+
ray.wait(blocks, len(blocks))
37+
blocks = df_right._block_partitions.flatten().tolist()
38+
ray.wait(blocks, len(blocks))
39+
40+
with time_logger("Inner Join: {} & {}; Left Size: {} bytes; Right Size: {} "
41+
"bytes".format(file_left, file_right, file_size_left,
42+
file_size_right)):
43+
result = df_left.join(df_right, how="inner", lsuffix='left_')
44+
ray.wait(result._block_partitions.flatten().tolist())
45+
46+
with time_logger("Outer Join: {} & {}; Left Size: {} bytes; Right Size: {} "
47+
"bytes".format(file_left, file_right, file_size_left,
48+
file_size_right)):
49+
result = df_left.join(df_right, how="outer", lsuffix='left_')
50+
ray.wait(result._block_partitions.flatten().tolist())
51+
52+
with time_logger("Inner Merge: {} & {}; Left Size: {} bytes; Right Size: {} "
53+
"bytes".format(file_left, file_right, file_size_left,
54+
file_size_right)):
55+
result = df_left.merge(df_right, how="inner",
56+
left_index=True, right_index=True)
57+
ray.wait(result._block_partitions.flatten().tolist())
58+
59+
with time_logger("Outer Merge: {} & {}; Left Size: {} bytes; Right Size: {} "
60+
"bytes".format(file_left, file_right, file_size_left,
61+
file_size_right)):
62+
result = df_left.merge(df_right, how="outer",
63+
left_index=True, right_index=True)
64+
ray.wait(result._block_partitions.flatten().tolist())
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from __future__ import absolute_import
2+
from __future__ import division
3+
from __future__ import print_function
4+
5+
import logging
6+
import argparse
7+
import os
8+
import pandas as pd
9+
10+
from utils import time_logger
11+
12+
13+
parser = argparse.ArgumentParser(description='arithmetic benchmark')
14+
parser.add_argument('--path', dest='path', help='path to the csv data file')
15+
parser.add_argument('--logfile', dest='logfile', help='path to the log file')
16+
args = parser.parse_args()
17+
file = args.path
18+
file_size = os.path.getsize(file)
19+
20+
if not os.path.exists(os.path.split(args.logfile)[0]):
21+
os.makedirs(os.path.split(args.logfile)[0])
22+
23+
logging.basicConfig(filename=args.logfile, level=logging.INFO)
24+
25+
df = pd.read_csv(file)
26+
27+
with time_logger("Transpose: {}; Size: {} bytes".format(file, file_size)):
28+
df.T
29+
30+
with time_logger("Sum on axis=0: {}; Size: {} bytes".format(file, file_size)):
31+
df.sum()
32+
33+
with time_logger("Sum on axis=1: {}; Size: {} bytes".format(file, file_size)):
34+
df.sum(axis=1)
35+
36+
with time_logger("Median on axis=0: {}; Size: {} bytes"
37+
.format(file, file_size)):
38+
df.median()
39+
40+
with time_logger("Median on axis=1: {}; Size: {} bytes"
41+
.format(file, file_size)):
42+
df.median(axis=1)
43+
44+
with time_logger("nunique on axis=0: {}; Size: {} bytes"
45+
.format(file, file_size)):
46+
df.nunique()
47+
48+
with time_logger("nunique on axis=1: {}; Size: {} bytes"
49+
.format(file, file_size)):
50+
df.nunique(axis=1)
51+
52+
with time_logger("Sum UDF on axis=0: {}; Size: {} bytes"
53+
.format(file, file_size)):
54+
df.apply(lambda df: df.sum())
55+
56+
with time_logger("Sum UDF on axis=1: {}; Size: {} bytes"
57+
.format(file, file_size)):
58+
df.apply(lambda df: df.sum(), axis=1)

0 commit comments

Comments
 (0)