forked from mrpowers-io/quinn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbenchmark_column_performance.py
93 lines (78 loc) · 3.12 KB
/
benchmark_column_performance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from __future__ import annotations
import json
import timeit
from pathlib import Path
def auto_timeit(
stmt: str = "pass",
setup: str = "pass",
min_runtime_seconds: int = 2,
) -> list[float]:
"""Automatically determine the number of runs to perform to get a minimum."""
min_runs = 5
print(f"Running {stmt} 1 time...")
t = timeit.repeat(stmt, setup, repeat=1, number=1)
print(f"First run: {t[0]:.2f} seconds")
if t[0] >= min_runtime_seconds:
return t
expected_runs_needed = int((min_runtime_seconds // t[0]) + 1)
if expected_runs_needed < min_runs:
expected_runs_needed = min_runs
expected_runtime = t[0] * expected_runs_needed
print(f"Running {stmt} {expected_runs_needed} times.")
print(f"Expected runtime: {expected_runtime:.2f} seconds...")
return timeit.repeat(stmt, setup, repeat=expected_runs_needed, number=1)
def get_result(
test_name: str,
dataset: dict,
expr: str,
min_runtime_seconds: int,
) -> None:
"""Run a test and save the results to a file."""
setup = f"""import timeit
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession
builder = (
SparkSession.builder.appName("MyApp")
.config("spark.executor.memory", "10G")
.config("spark.driver.memory", "25G")
.config("spark.sql.shuffle.partitions", "2")
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
)
spark = builder.getOrCreate()
{dataset['name']} = spark.read.parquet('benchmarks/data/mvv_{dataset['name']}')
"""
stmt = expr.replace("df", dataset["name"])
result = auto_timeit(stmt, setup, min_runtime_seconds)
summary = {
"test_name": test_name,
"dataset": dataset["name"],
"dataset_size": dataset["size"],
"runtimes": result,
}
result_path = f"results/{test_name}_{dataset['name']}.json"
with Path(__file__).parent.joinpath(result_path).open(mode="w") as f:
json.dump(summary, f, indent=4)
config = {
"toPandas": {"expr": "list(df.select('mvv').toPandas()['mvv'])"},
"flatmap": {"expr": "df.select('mvv').rdd.flatMap(lambda x: x).collect()"},
"map": {"expr": "df.select('mvv').rdd.map(lambda row : row[0]).collect()"},
"collectlist": {"expr": "[row[0] for row in df.select('mvv').collect()]"},
"localIterator": {"expr": "[r[0] for r in df.select('mvv').toLocalIterator()]"},
}
DATASETS = {
"large": {"name": "large", "size": 100_000_000, "min_runtime_seconds": 1200},
"medium": {"name": "medium", "size": 10_000_000, "min_runtime_seconds": 360},
"small": {"name": "small", "size": 100_000, "min_runtime_seconds": 20},
"xsmall": {"name": "xsmall", "size": 1_000, "min_runtime_seconds": 20},
}
for test_name, test_config in config.items():
print(f"======================{test_name}======================")
for dataset_name in DATASETS:
dataset = DATASETS[dataset_name]
print(f"TESTING DATASET {dataset['name']} [n={dataset['size']:,}]")
get_result(
test_name=test_name,
dataset=dataset,
expr=test_config["expr"],
min_runtime_seconds=dataset["min_runtime_seconds"],
)