Skip to content

Commit

Permalink
join, more solutions
Browse files Browse the repository at this point in the history
  • Loading branch information
jangorecki committed May 12, 2019
1 parent b2ebe34 commit 753c1a8
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 67 deletions.
173 changes: 173 additions & 0 deletions dask/join-dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#!/usr/bin/env python

print("# join-dask.py", flush=True)

import os
import gc
import timeit
import pandas as pd
import dask as dk
import dask.dataframe as dd

exec(open("./helpers.py").read())

ver = dk.__version__
git = dk.__git_revision__
task = "join"
solution = "dask"
fun = ".merge"
cache = "TRUE"

data_name = os.environ['SRC_JN_LOCAL']
src_jn_x = os.path.join("data", data_name+".csv")
y_data_name = join_to_tbls(data_name)
src_jn_y = [os.path.join("data", y_data_name[0]+".csv"), os.path.join("data", y_data_name[1]+".csv"), os.path.join("data", y_data_name[2]+".csv")]
if len(src_jn_y) != 3:
raise Exception("Something went wrong in preparing files used for join")

print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[2] + ", " + y_data_name[2], flush=True)

x = dd.read_csv(src_jn_x, na_filter=False, dtype={'id1':'category', 'id2':'category', 'id3':'category'}).persist()
small = dd.read_csv(src_jn_y[2], na_filter=False, dtype={'id1':'category', 'id2':'category', 'id3':'category'}).persist()
medium = dd.read_csv(src_jn_y[1], na_filter=False, dtype={'id1':'category', 'id2':'category', 'id3':'category'}).persist()
big = dd.read_csv(src_jn_y[0], na_filter=False, dtype={'id1':'category', 'id2':'category', 'id3':'category'}).persist()

in_rows = len(x)
print(in_rows, flush=True)
print(len(big.index), flush=True)
print(len(medium.index), flush=True)
print(len(small.index), flush=True)

print("joining...", flush=True)

question = "small inner on int" # q1
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(small, on='id4').compute()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v1_x'].sum(), ans['v1_y'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(small, on='id4').compute()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v1_x'].sum(), ans['v1_y'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "medium inner on int" # q2
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(medium, on='id4').compute()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v1_x'].sum(), ans['v1_y'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(medium, on='id4').compute()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v1_x'].sum(), ans['v1_y'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "medium outer on int" # q3
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(medium, how='left', on='id4').compute()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v1_x'].sum(), ans['v1_y'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(medium, how='left', on='id4').compute()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v1_x'].sum(), ans['v1_y'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "medium inner on factor" # q4
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(medium, on='id1').compute()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v1_x'].sum(), ans['v1_y'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(medium, on='id1').compute()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v1_x'].sum(), ans['v1_y'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "big inner on int" # q5
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(big, on='id4').compute()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v1_x'].sum(), ans['v1_y'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(big, on='id4').compute()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v1_x'].sum(), ans['v1_y'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

exit(0)
4 changes: 2 additions & 2 deletions datatable/join-datatable.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ print(tail(ans, 3))
rm(ans)

question = "medium outer on int" # q3
t = system.time(print(dim(ans<-medium[DT, on=.(id4), nomatch=NA])))[["elapsed"]]
t = system.time(print(dim(ans<-medium[DT, on=.(id4)])))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-ans[, .(sum(v1), sum(i.v1))])[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(DT), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt)
rm(ans)
t = system.time(print(dim(ans<-medium[DT, on=.(id4), nomatch=NA])))[["elapsed"]]
t = system.time(print(dim(ans<-medium[DT, on=.(id4)])))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-ans[, .(sum(v1), sum(i.v1))])[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(DT), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt)
Expand Down
4 changes: 2 additions & 2 deletions launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ timeout = timeout[run_tasks, on="task", nomatch=NULL] # filter for env var RUN_T
stopifnot(nrow(timeout)==1L)

solution = rbindlist(list(
dask = list(task=c("groupby")),
dask = list(task=c("groupby","join")),
data.table = list(task=c("groupby","join")),
dplyr = list(task=c("groupby","join")),
juliadf = list(task=c("groupby")),
modin = list(task=c()),
pandas = list(task=c("groupby","join")),
pydatatable = list(task=c("groupby")),
pydatatable = list(task=c("groupby")), # join after https://github.com/h2oai/datatable/issues/1080
spark = list(task=c("groupby")),
clickhouse = list(task=c("groupby"))
), idcol="solution")
Expand Down
26 changes: 8 additions & 18 deletions pandas/join-pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@
question = "small inner on int" # q1
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(small, how='inner', on='id4')
ans.reset_index(inplace=True)
ans = x.merge(small, on='id4')
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand All @@ -59,8 +58,7 @@
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(small, how='inner', on='id4')
ans.reset_index(inplace=True)
ans = x.merge(small, on='id4')
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand All @@ -75,8 +73,7 @@
question = "medium inner on int" # q2
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(medium, how='inner', on='id4')
ans.reset_index(inplace=True)
ans = x.merge(medium, on='id4')
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand All @@ -87,8 +84,7 @@
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(medium, how='inner', on='id4')
ans.reset_index(inplace=True)
ans = x.merge(medium, on='id4')
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand All @@ -104,7 +100,6 @@
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(medium, how='left', on='id4')
ans.reset_index(inplace=True)
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand All @@ -116,7 +111,6 @@
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(medium, how='left', on='id4')
ans.reset_index(inplace=True)
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand All @@ -131,8 +125,7 @@
question = "medium inner on factor" # q4
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(medium, how='inner', on='id1')
ans.reset_index(inplace=True)
ans = x.merge(medium, on='id1')
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand All @@ -143,8 +136,7 @@
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(medium, how='inner', on='id1')
ans.reset_index(inplace=True)
ans = x.merge(medium, on='id1')
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand All @@ -159,8 +151,7 @@
question = "big inner on int" # q5
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(big, how='inner', on='id4')
ans.reset_index(inplace=True)
ans = x.merge(big, on='id4')
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand All @@ -171,8 +162,7 @@
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.merge(big, how='inner', on='id4')
ans.reset_index(inplace=True)
ans = x.merge(big, on='id4')
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
Expand Down
Loading

0 comments on commit 753c1a8

Please sign in to comment.