Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rolling functions #9

Closed
wants to merge 49 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
5eb8512
rolling functions
jangorecki Apr 24, 2023
16f6d5d
Merge branch 'master' into rollfun
jangorecki Jun 17, 2023
67b7fbf
rollfun task scope polishing, DT implementation
jangorecki Jun 21, 2023
554eedc
comments in datagen
jangorecki Jun 21, 2023
ff52fd8
define udf in q10
jangorecki Jun 22, 2023
7e265ef
rollfun set up, dt and dplyr for now
jangorecki Jun 24, 2023
29ada82
dt to dplyr validation
jangorecki Jun 25, 2023
0df2fbd
Merge branch 'master' into rollfun
jangorecki Jul 2, 2023
b6dd51e
rollfun CI
jangorecki Jul 2, 2023
045b7d5
rollfun questions amended
jangorecki Jul 2, 2023
ec3ebc0
comments and readme
jangorecki Jul 2, 2023
6366a17
Merge branch 'master' into rollfun
jangorecki Jul 18, 2023
96a2eb0
run conf back to master
jangorecki Jul 18, 2023
7ef67c2
pandas rollfun
jangorecki Jul 20, 2023
572085b
done
jangorecki Jul 20, 2023
57e04b2
enable pandas rollfun config
jangorecki Jul 20, 2023
c82a962
use standard R repo for GH Actions
jangorecki Jul 20, 2023
860211d
fix temporary objects in pandass rollfun script
jangorecki Jul 20, 2023
6e6e6f9
frollmin not yet implemented in DT
jangorecki Jul 21, 2023
b91a66a
rollfun pandas script execute
jangorecki Jul 21, 2023
0c92c59
fix chk format for pandas rollfun
jangorecki Jul 23, 2023
b97cb34
duckdb-latest rollfun
jangorecki Jul 23, 2023
d92fd88
fix missing nr and nc
jangorecki Jul 23, 2023
01542d8
nicely close db connection again
jangorecki Jul 23, 2023
e904bdd
duckdb rollfun q8 q9 fix
jangorecki Jul 23, 2023
5e2618e
rollfun report
jangorecki Jul 24, 2023
088e89b
spark rollfun
jangorecki Jul 24, 2023
0da07f9
spark rollfun add missing window size
jangorecki Jul 24, 2023
6c4cd5f
CI smaller data
jangorecki Jul 24, 2023
253d5c6
Merge branch 'master' into rollfun
jangorecki Jul 24, 2023
a18c7c5
missing change to nrow CI for rollfun
jangorecki Jul 24, 2023
e064aab
spark rollfun disable median
jangorecki Jul 25, 2023
fac19b0
spark rollfun fixes
jangorecki Jul 25, 2023
657cee0
duckdb rollfun workaround for partial window
jangorecki Jul 29, 2023
3b60271
q10 update r2 rather than v1, duckdb rollfun
jangorecki Jul 30, 2023
af5991f
q6 update colnames, duckdb rollfun
jangorecki Jul 30, 2023
3eb8394
history report, rollfun
jangorecki Jul 30, 2023
241f593
spark rollfun workaround for partial window
jangorecki Jul 30, 2023
05e1660
rollfun validation post fixes
jangorecki Jul 31, 2023
edea136
dt rollfun q8 q9 needs exact
jangorecki Jul 31, 2023
3c114e5
DT q8 q9 uses algo=fast again as roundoff is way less than tolerance
jangorecki Aug 4, 2023
9b7e57c
readme rollfun
jangorecki Aug 4, 2023
076b573
cleanup dev code
jangorecki Aug 4, 2023
546eb28
report for rollfun
jangorecki Aug 5, 2023
5bb0922
workaround for using branch rather than master
jangorecki Aug 6, 2023
b104f3a
more pkgs required for dplyr script
jangorecki Aug 9, 2023
1fa7420
rollfun timeout exceptions after aws run
jangorecki Aug 12, 2023
03dac72
improve how pretty handles edge cases, seen in R1_1e6_NA_0_1 advanced
jangorecki Aug 12, 2023
e07185e
update DT for added rollmin and rollmedian
jangorecki Sep 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/regression.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ jobs:
shell: bash
run: python3 _utils/prep_solutions.py --task=join && source path.env && TEST_RUN=true ./run.sh

- name: Run mini Rollfun benchmark
shell: bash
run: python3 _utils/prep_solutions.py --task=rollfun && source path.env && TEST_RUN=true ./run.sh

- name: Validate benchmark results
shell: bash
run: ./_utils/validate_no_errors.sh
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Contribution and feedback are very welcome!
- [x] groupby
- [x] join
- [x] groupby2014
- [x] rollfun

# Solutions

Expand Down
98 changes: 96 additions & 2 deletions _benchplot/benchplot-dict.R
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ groupby.query.exceptions = {list(
"arrow" = list("Expression row_number() <= 2L not supported in Arrow; pulling data into R" = "max v1 - min v2 by id3", "Expression cor(v1, v2, ... is not supported in arrow; pulling data into R" = "regression v1 v2 by id2 id4"),
"duckdb" = list(),
"duckdb-latest" = list(),
"datafusion" = list(),
"datafusion" = list()
)}
groupby.data.exceptions = {list( # exceptions as of run 1575727624
"data.table" = {list(
Expand Down Expand Up @@ -468,7 +468,7 @@ join.data.exceptions = {list(
"J1_1e9_NA_5_0","J1_1e9_NA_0_1") # q1 r1
)},
"polars" = {list(
"out of memory" = c("J1_1e9_NA_0_0","J1_1e9_NA_5_0","J1_1e9_NA_0_1"),
"out of memory" = c("J1_1e9_NA_0_0","J1_1e9_NA_5_0","J1_1e9_NA_0_1")
)},
"arrow" = {list(
"out of memory" = c("J1_1e9_NA_0_0","J1_1e9_NA_5_0","J1_1e9_NA_0_1", "J1_1e8_NA_0_0", "J1_1e8_NA_5_0", "J1_1e8_NA_0_1" )#,
Expand Down Expand Up @@ -529,3 +529,97 @@ groupby2014.data.exceptions = {list(
)}
)}
groupby2014.exceptions = task.exceptions(groupby2014.query.exceptions, groupby2014.data.exceptions)

# rollfun ----

rollfun_q_title_fun = function(x) {
stopifnot(c("question","iquestion","out_rows","out_cols","in_rows") %in% names(x),
uniqueN(x, by="iquestion")==nrow(x))
x[, sprintf("Query %s: \"%s\"",
iquestion, as.character(question)),
by = "iquestion"]$V1
}
rollfun.syntax.dict = {list(
"data.table" = {c(
"mean" = "frollmean(x$v1, w)",
"window small" = "frollmean(x$v1, wsmall)",
"window big" = "frollmean(x$v1, wbig)",
"min" = "frollmin(x$v1, w)",
"median" = "frollmedian(x$v1, w)",
"multiroll" = "frollmean(list(x$v1, x$v2), c(w-50L, w+50L))",
"weighted" = "",
"uneven dense" = "frollmean(x$v1, frolladapt(x$id2, w), adaptive=TRUE)",
"uneven sparse" = "frollmean(x$v1, frolladapt(x$id3, w), adaptive=TRUE)",
"regression" = ""
)},
"dplyr" = {c(
"mean" = "slide_mean(x$v1, before=w-1L, complete=TRUE)",
"window small" = "slide_mean(x$v1, before=wsmall-1L, complete=TRUE)",
"window big" = "slide_mean(x$v1, before=wbig-1L, complete=TRUE)",
"min" = "slide_min(x$v1, before=w-1L, complete=TRUE)",
"median" = "",
"multiroll" = "list(slide_mean(x$v1, before=w-51L, complete=TRUE), slide_mean(x$v1, before=w+49L, complete=TRUE), slide_mean(x$v2, before=w-51L, complete=TRUE), slide_mean(x$v2, before=w+49L, complete=TRUE))",
"weighted" = "",
"uneven dense" = "slide_index_mean(x$v1, i=x$id2, before=w-1L, complete=TRUE)",
"uneven sparse" = "slide_index_mean(x$v1, i=x$id3, before=w-1L, complete=TRUE)",
"regression" = ""
)},
"pandas" = {c(
"mean" = "x['v1'].rolling(w).mean()",
"window small" = "x['v1'].rolling(wsmall).mean()",
"window big" = "x['v1'].rolling(wbig).mean()",
"min" = "x['v1'].rolling(w).min()",
"median" = "x['v1'].rolling(w).median()",
"multiroll" = "pd.concat([x[['v1','v2']].rolling(w-50).mean().reset_index(drop=True), x[['v1','v2']].rolling(w+50).mean().reset_index(drop=True)], axis=1)",
"weighted" = "",
"uneven dense" = "{y}.rolling('{w}s').mean()",
"uneven sparse" = "{y}.rolling('{w}s').mean()",
"regression" = ""
)},
"spark" = {c(
"mean" = "select avg(v1) over (order by id1 rows between {w-1} preceding and current row) as v1 from x",
"window small" = "select avg(v1) over (order by id1 rows between {wsmall-1} preceding and current row) as v1 from x",
"window big" = "select avg(v1) over (order by id1 rows between {wbig-1} preceding and current row) as v1 from x",
"min" = "select min(v1) over (order by id1 rows between {w-1} preceding and current row) as v1 from x",
"median" = "select median(v1) over (order by id1 rows between {w-1} preceding and current row) as v1 from x",
"multiroll" = "select avg(v1) over small as v1_small, avg(v1) over big as v1_big, avg(v2) over small as v2_small, avg(v2) over big as v2_big from x window small as (order by id1 rows between {w-51} preceding and current row), big as (order by id1 rows between {w+49} preceding and current row)",
"weighted" = "",
"uneven dense" = "select avg(v1) over (order by id2 range between {w-1} preceding and current row) as v1 from x",
"uneven sparse" = "select avg(v1) over (order by id3 range between {w-1} preceding and current row) as v1 from x",
"regression" = ""
)},
"duckdb-latest" = {c(
"mean" = "SELECT avg(v1) OVER (ORDER BY id1 ROWS BETWEEN {w-1} PRECEDING AND CURRENT ROW) AS v1 FROM x",
"window small" = "SELECT avg(v1) OVER (ORDER BY id1 ROWS BETWEEN {wsmall-1} PRECEDING AND CURRENT ROW) AS v1 FROM x",
"window big" = "SELECT avg(v1) OVER (ORDER BY id1 ROWS BETWEEN {wbig-1} PRECEDING AND CURRENT ROW) AS v1 FROM x",
"min" = "SELECT min(v1) OVER (ORDER BY id1 ROWS BETWEEN {w-1} PRECEDING AND CURRENT ROW) AS v1 FROM x",
"median" = "SELECT median(v1) OVER (ORDER BY id1 ROWS BETWEEN {w-1} PRECEDING AND CURRENT ROW) AS v1 FROM x",
"multiroll" = "SELECT avg(v1) OVER small AS v1_small, avg(v1) OVER big AS v1_big, avg(v2) OVER small AS v2_small, avg(v2) OVER big AS v2_big FROM x WINDOW small AS (ORDER BY id1 ROWS BETWEEN w-51 PRECEDING AND CURRENT ROW), big AS (ORDER BY id1 ROWS BETWEEN w+49 PRECEDING AND CURRENT ROW)",
"weighted" = "",
"uneven dense" = "SELECT avg(v1) OVER (ORDER BY id2 RANGE BETWEEN {w-1} PRECEDING AND CURRENT ROW) AS v1 FROM x",
"uneven sparse" = "SELECT avg(v1) OVER (ORDER BY id3 RANGE BETWEEN {w-1} PRECEDING AND CURRENT ROW) AS v1 FROM x",
"regression" = "SELECT regr_r2(v2, v1) OVER (ORDER BY id1 ROWS BETWEEN {w-1} PRECEDING AND CURRENT ROW) AS r2 FROM x"
)}
)}
rollfun.query.exceptions = {list(
"data.table" = list("not yet implemented" = "weighted", "not yet implemented" = "regression"),
"dplyr" = list("not yet implemented" = "median", "not yet implemented" = "weighted", "not yet implemented" = "regression"),
"pandas" = list("not yet implemented" = "weighted", "not yet implemented" = "regression"),
"spark" = list("not yet implemented" = "median", "not yet implemented" = "weighted", "not yet implemented" = "regression"),
"duckdb-latest" = list("not yet implemented" = "weighted")
)}
rollfun.data.exceptions = {list(
"data.table" = {list(
)},
"dplyr" = {list(
)},
"pandas" = {list(
)},
"spark" = {list(
"timeout" = c("R1_1e7_NA_0_1", "R1_1e8_NA_0_1")
)},
"duckdb-latest" = {list(
"timeout" = c("R1_1e8_NA_0_1")
)}
)}
rollfun.exceptions = task.exceptions(rollfun.query.exceptions, rollfun.data.exceptions)
3 changes: 2 additions & 1 deletion _benchplot/benchplot.R
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ benchplot = function(
}
margins(nsolutions, pending=pending)
x[na_time_sec==FALSE, "max_time" := max(c(time1, time2)), by=c("solution","question")]
lim_x = tail(xlab_labels(max(c(0, x$max_time), na.rm=TRUE)), n=1L)
trunc5 = function(x) trunc(x*1e5)/1e5
lim_x = tail(xlab_labels(trunc5(max(c(0, x$max_time), na.rm=TRUE))), n=1L)
if (lim_x == 0) stop("internal error: lim x is c(0,0), this should be already escaped at the beginning with 'sum(x$na_time_sec)==nrow(x)'")
# get bars Y coordinates, positions only, plot later in bar1
all_y_bars = barplot(rep(NA_real_, length(pad)), horiz=TRUE, xlim=c(0, lim_x), axes=FALSE, xpd=FALSE)
Expand Down
5 changes: 4 additions & 1 deletion _control/data.csv
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ join,J1_1e7_NA_0_1,1e7,NA,0,1,1
join,J1_1e8_NA_0_0,1e8,NA,0,0,1
join,J1_1e8_NA_5_0,1e8,NA,5,0,1
join,J1_1e8_NA_0_1,1e8,NA,0,1,1
join,J1_1e9_NA_0_0,1e9,NA,0,0,1
join,J1_1e9_NA_0_0,1e9,NA,0,0,1
rollfun,R1_1e6_NA_0_1,1e6,NA,0,1,1
rollfun,R1_1e7_NA_0_1,1e7,NA,0,1,1
rollfun,R1_1e8_NA_0_1,1e8,NA,0,1,1
10 changes: 10 additions & 0 deletions _control/questions.csv
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,13 @@ groupby2014,sum v1 by id1:id2,basic
groupby2014,sum v1 mean v3 by id3,basic
groupby2014,mean v1:v3 by id4,basic
groupby2014,sum v1:v3 by id6,basic
rollfun,mean,basic
rollfun,window small,basic
rollfun,window big,basic
rollfun,min,basic
rollfun,median,basic
rollfun,multiroll,advanced
rollfun,weighted,advanced
rollfun,uneven dense,advanced
rollfun,uneven sparse,advanced
rollfun,regression,advanced
5 changes: 5 additions & 0 deletions _control/solutions.csv
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ solution,task
data.table,groupby
data.table,join
data.table,groupby2014
data.table,rollfun
dplyr,groupby
dplyr,join
dplyr,groupby2014
dplyr,rollfun
pandas,groupby
pandas,join
pandas,groupby2014
pandas,rollfun
pydatatable,groupby
pydatatable,join
spark,groupby
spark,join
spark,rollfun
dask,groupby
dask,join
juliadf,groupby
Expand All @@ -28,5 +32,6 @@ duckdb,groupby
duckdb,join
duckdb-latest,groupby
duckdb-latest,join
duckdb-latest,rollfun
datafusion,groupby
datafusion,join
3 changes: 3 additions & 0 deletions _control/timeout.csv
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ join,1e9,360
groupby2014,1e7,60
groupby2014,1e8,120
groupby2014,1e9,180
rollfun,1e6,60
rollfun,1e7,120
rollfun,1e8,180
35 changes: 35 additions & 0 deletions _data/rollfun-datagen.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Rscript _data/rollfun-datagen.R 1e6 0 0 1
# Rscript _data/rollfun-datagen.R 1e7 0 0 1
# Rscript _data/rollfun-datagen.R 1e8 0 0 1

args = commandArgs(TRUE)

pretty_sci = function(x) {
tmp<-strsplit(as.character(x), "+", fixed=TRUE)[[1L]]
if(length(tmp)==1L) {
paste0(substr(tmp, 1L, 1L), "e", nchar(tmp)-1L)
} else if(length(tmp)==2L){
paste0(tmp[1L], as.character(as.integer(tmp[2L])))
}
}

library(data.table)
N=as.integer(args[1L]); K=as.integer(args[2L]); nas=as.integer(args[3L]); sort=as.integer(args[4L])
stopifnot(nas==0L, sort==1L) ## timeseries data always sorted
set.seed(108)
cat(sprintf("Producing data of %s rows, %s NAs ratio, %s sort flag\n", pretty_sci(N), nas, sort))
DT = list()
DT[["id1"]] = seq.int(N) ## index, do we need it as POSIXct/IDate?
## uneven idx
DT[["id2"]] = sort(sample(N*1.1, N)) ## index dense
DT[["id3"]] = sort(sample(N*2, N)) ## index sparse
DT[["v1"]] = cumprod(rnorm(N, 1, 0.005)) ## more risky asset
DT[["v2"]] = cumprod(rnorm(N, 1, 0.001)) ## less risky asset
DT[["weights"]] = rnorm(n=N, m=1, sd=0.1)

setDT(DT)
file = sprintf("R1_%s_NA_%s_%s.csv", pretty_sci(N), nas, sort)
cat(sprintf("Writing data to %s\n", file))
fwrite(DT, file)
cat(sprintf("Data written to %s, quitting\n", file))
if (!interactive()) quit("no", status=0)
2 changes: 2 additions & 0 deletions _launcher/solution.R
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ data.desc = function(task, nrow, k, na, sort) {
prefix = "J1"
} else if (task=="groupby2014") {
prefix = "G0"
} else if (task=="rollfun") {
prefix = "R1"
} else {
stop("undefined task in solution.R data.desc function")
}
Expand Down
Loading
Loading