diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index c69205be..f672c812 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -101,7 +101,7 @@ jobs: if: ${{ matrix.python-version == '3.10' && matrix.toolchain == 'stable' }} run: | source venv/bin/activate - flake8 --exclude venv --ignore=E501,W503 + flake8 --exclude venv,benchmarks/db-benchmark --ignore=E501,W503 black --line-length 79 --diff --check . - name: Run tests diff --git a/benchmarks/db-benchmark/README.md b/benchmarks/db-benchmark/README.md index 93293b0d..8ce45344 100644 --- a/benchmarks/db-benchmark/README.md +++ b/benchmarks/db-benchmark/README.md @@ -27,6 +27,6 @@ DataFusion's Python bindings. Run the following from root of this project. ```bash -$ docker build -t db-benchmark -f benchmarks/db-benchmark/db-benchmark.dockerfile . -$ docker run --privileged -it db-benchmark +docker build -t db-benchmark -f benchmarks/db-benchmark/db-benchmark.dockerfile . +docker run --privileged -it db-benchmark ``` diff --git a/benchmarks/db-benchmark/db-benchmark.dockerfile b/benchmarks/db-benchmark/db-benchmark.dockerfile index 2876b5b6..d8842b25 100644 --- a/benchmarks/db-benchmark/db-benchmark.dockerfile +++ b/benchmarks/db-benchmark/db-benchmark.dockerfile @@ -108,6 +108,7 @@ RUN #./duckdb/setup-duckdb.sh # END OF SETUP RUN python3 -m pip install --upgrade pandas +RUN python3 -m pip install --upgrade polars psutil RUN python3 -m pip install --upgrade datafusion # Now add our solution diff --git a/benchmarks/db-benchmark/groupby-datafusion.py b/benchmarks/db-benchmark/groupby-datafusion.py index 76dd38fe..2c35259e 100644 --- a/benchmarks/db-benchmark/groupby-datafusion.py +++ b/benchmarks/db-benchmark/groupby-datafusion.py @@ -19,14 +19,20 @@ import gc import timeit import datafusion as df -from datafusion import functions as f -from datafusion import col +from datafusion import ( + col, + functions as f, + RuntimeConfig, + SessionConfig, + SessionContext, +) +import pyarrow from pyarrow import csv as pacsv print("# groupby-datafusion.py", flush=True) -# exec(open("./_helpers/helpers.py").read()) +exec(open("./_helpers/helpers.py").read()) def ans_shape(batches): @@ -40,8 +46,12 @@ def ans_shape(batches): return rows, cols -# ver = df.__version__ -ver = "7.0.0" +def execute(df): + print(df.execution_plan().display_indent()) + return df.collect() + + +ver = df.__version__ git = "" task = "groupby" solution = "datafusion" @@ -49,16 +59,47 @@ def ans_shape(batches): cache = "TRUE" on_disk = "FALSE" +# experimental - support running with both DataFrame and SQL APIs +sql = True + data_name = os.environ["SRC_DATANAME"] src_grp = os.path.join("data", data_name + ".csv") print("loading dataset %s" % src_grp, flush=True) +schema = pyarrow.schema( + [ + ("id4", pyarrow.int32()), + ("id5", pyarrow.int32()), + ("id6", pyarrow.int32()), + ("v1", pyarrow.int32()), + ("v2", pyarrow.int32()), + ("v3", pyarrow.float64()), + ] +) + data = pacsv.read_csv( - src_grp, convert_options=pacsv.ConvertOptions(auto_dict_encode=True) + src_grp, + convert_options=pacsv.ConvertOptions( + auto_dict_encode=True, column_types=schema + ), ) print("dataset loaded") -ctx = df.SessionContext() +# create a session context with explicit runtime and config settings +runtime = ( + RuntimeConfig() + .with_disk_manager_os() + .with_fair_spill_pool(64 * 1024 * 1024 * 1024) +) +config = ( + SessionConfig() + .with_repartition_joins(False) + .with_repartition_aggregations(False) + .set("datafusion.execution.coalesce_batches", "false") +) +ctx = SessionContext(config, runtime) +print(ctx) + ctx.register_record_batches("x", [data.to_batches()]) print("registered record batches") # cols = ctx.sql("SHOW columns from x") @@ -72,50 +113,107 @@ def ans_shape(batches): question = "sum v1 by id1" # q1 gc.collect() t_start = timeit.default_timer() -ans = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1").collect() +if sql: + df = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1") +else: + df = ctx.table("x").aggregate( + [f.col("id1")], [f.sum(f.col("v1")).alias("v1")] + ) +ans = execute(df) + shape = ans_shape(ans) -# print(shape, flush=True) +print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q1: {t}") -# m = memory_usage() +m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() question = "sum v1 by id1:id2" # q2 gc.collect() t_start = timeit.default_timer() -ans = ctx.sql( - "SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2" -).collect() +if sql: + df = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2") +else: + df = ctx.table("x").aggregate( + [f.col("id1"), f.col("id2")], [f.sum(f.col("v1")).alias("v1")] + ) +ans = execute(df) shape = ans_shape(ans) -# print(shape, flush=True) +print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q2: {t}") -# m = memory_usage() +m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() question = "sum v1 mean v3 by id3" # q3 gc.collect() t_start = timeit.default_timer() -ans = ctx.sql( - "SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3" -).collect() +if sql: + df = ctx.sql( + "SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3" + ) +else: + df = ctx.table("x").aggregate( + [f.col("id3")], + [ + f.sum(f.col("v1")).alias("v1"), + f.avg(f.col("v3")).alias("v3"), + ], + ) +ans = execute(df) shape = ans_shape(ans) -# print(shape, flush=True) +print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q3: {t}") -# m = memory_usage() +m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = ( @@ -125,7 +223,25 @@ def ans_shape(batches): .to_numpy()[0] ) chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() @@ -136,10 +252,10 @@ def ans_shape(batches): "SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4" ).collect() shape = ans_shape(ans) -# print(shape, flush=True) +print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q4: {t}") -# m = memory_usage() +m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = ( @@ -149,7 +265,25 @@ def ans_shape(batches): .to_numpy()[0] ) chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() @@ -160,10 +294,10 @@ def ans_shape(batches): "SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6" ).collect() shape = ans_shape(ans) -# print(shape, flush=True) +print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q5: {t}") -# m = memory_usage() +m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = ( @@ -173,7 +307,25 @@ def ans_shape(batches): .to_numpy()[0] ) chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() @@ -184,10 +336,10 @@ def ans_shape(batches): "SELECT id4, id5, approx_percentile_cont(v3, .5) AS median_v3, stddev(v3) AS stddev_v3 FROM x GROUP BY id4, id5" ).collect() shape = ans_shape(ans) -# print(shape, flush=True) +print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q6: {t}") -# m = memory_usage() +m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = ( @@ -197,7 +349,25 @@ def ans_shape(batches): .to_numpy()[0] ) chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() @@ -208,15 +378,33 @@ def ans_shape(batches): "SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3" ).collect() shape = ans_shape(ans) -# print(shape, flush=True) +print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q7: {t}") -# m = memory_usage() +m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("range_v1_v2"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() @@ -227,15 +415,33 @@ def ans_shape(batches): "SELECT id6, v3 from (SELECT id6, v3, row_number() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS row FROM x) t WHERE row <= 2" ).collect() shape = ans_shape(ans) -# print(shape, flush=True) +print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q8: {t}") -# m = memory_usage() +m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v3"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() @@ -244,15 +450,33 @@ def ans_shape(batches): t_start = timeit.default_timer() ans = ctx.sql("SELECT corr(v1, v2) as corr FROM x GROUP BY id2, id4").collect() shape = ans_shape(ans) -# print(shape, flush=True) +print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q9: {t}") -# m = memory_usage() +m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("corr"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() @@ -263,10 +487,10 @@ def ans_shape(batches): "SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM x GROUP BY id1, id2, id3, id4, id5, id6" ).collect() shape = ans_shape(ans) -# print(shape, flush=True) +print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q10: {t}") -# m = memory_usage() +m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = ( @@ -276,7 +500,25 @@ def ans_shape(batches): .to_numpy()[0] ) chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +write_log( + task=task, + data=data_name, + in_rows=in_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() diff --git a/benchmarks/db-benchmark/join-datafusion.py b/benchmarks/db-benchmark/join-datafusion.py index 8843b55c..602cee69 100755 --- a/benchmarks/db-benchmark/join-datafusion.py +++ b/benchmarks/db-benchmark/join-datafusion.py @@ -26,26 +26,7 @@ print("# join-datafusion.py", flush=True) -# exec(open("./_helpers/helpers.py").read()) - - -def join_to_tbls(data_name): - x_n = int(float(data_name.split("_")[1])) - y_n = [ - "{:.0e}".format(x_n / 1e6), - "{:.0e}".format(x_n / 1e3), - "{:.0e}".format(x_n), - ] - y_n = [ - y_n[0].replace("+0", ""), - y_n[1].replace("+0", ""), - y_n[2].replace("+0", ""), - ] - return [ - data_name.replace("NA", y_n[0]), - data_name.replace("NA", y_n[1]), - data_name.replace("NA", y_n[2]), - ] +exec(open("./_helpers/helpers.py").read()) def ans_shape(batches): @@ -59,7 +40,7 @@ def ans_shape(batches): return rows, cols -ver = "6.0.0" +ver = df.__version__ task = "join" git = "" solution = "datafusion" @@ -84,13 +65,16 @@ def ans_shape(batches): + ", " + y_data_name[0] + ", " - + y_data_name[2] + + y_data_name[1] + ", " + y_data_name[2], flush=True, ) ctx = df.SessionContext() +print(ctx) + +# TODO we should be applying projections to these table reads to crete relations of different sizes x_data = pacsv.read_csv( src_jn_x, convert_options=pacsv.ConvertOptions(auto_dict_encode=True) @@ -133,8 +117,26 @@ def ans_shape(batches): df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -# m = memory_usage() -# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +m = memory_usage() +write_log( + task=task, + data=data_name, + in_rows=x_data.num_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() @@ -156,8 +158,26 @@ def ans_shape(batches): .column(0)[0] ) chkt = timeit.default_timer() - t_start -# m = memory_usage() -# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +m = memory_usage() +write_log( + task=task, + data=data_name, + in_rows=x_data.num_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() @@ -179,8 +199,26 @@ def ans_shape(batches): .column(0)[0] ) chkt = timeit.default_timer() - t_start -# m = memory_usage() -# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +m = memory_usage() +write_log( + task=task, + data=data_name, + in_rows=x_data.num_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() @@ -202,8 +240,26 @@ def ans_shape(batches): .column(0)[0] ) chkt = timeit.default_timer() - t_start -# m = memory_usage() -# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +m = memory_usage() +write_log( + task=task, + data=data_name, + in_rows=x_data.num_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() @@ -225,8 +281,26 @@ def ans_shape(batches): .column(0)[0] ) chkt = timeit.default_timer() - t_start -# m = memory_usage() -# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +m = memory_usage() +write_log( + task=task, + data=data_name, + in_rows=x_data.num_rows, + question=question, + out_rows=shape[0], + out_cols=shape[1], + solution=solution, + version=ver, + git=git, + fun=fun, + run=1, + time_sec=t, + mem_gb=m, + cache=cache, + chk=make_chk([chk]), + chk_time_sec=chkt, + on_disk=on_disk, +) del ans gc.collect() diff --git a/benchmarks/db-benchmark/run-bench.sh b/benchmarks/db-benchmark/run-bench.sh index 2c308092..36a6087d 100755 --- a/benchmarks/db-benchmark/run-bench.sh +++ b/benchmarks/db-benchmark/run-bench.sh @@ -17,5 +17,11 @@ # under the License. set -e +#SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/polars/groupby-polars.py SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/datafusion-python/groupby-datafusion.py -#SRC_DATANAME=J1_1e7_NA_0_0 python3 /db-benchmark/datafusion-python/join-datafusion.py + +# joins need more work still +#SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/datafusion-python/join-datafusion.py +#SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/polars/join-polars.py + +cat time.csv diff --git a/dev/python_lint.sh b/dev/python_lint.sh index 94934629..3bc67fb1 100755 --- a/dev/python_lint.sh +++ b/dev/python_lint.sh @@ -22,5 +22,5 @@ set -e source venv/bin/activate -flake8 --exclude venv --ignore=E501,W503 +flake8 --exclude venv,benchmarks/db-benchmark --ignore=E501,W503 black --line-length 79 .