Skip to content

Commit ec1259a

Browse files
authored
mean na.rm=TRUE uses GForce (#4851)
1 parent 788c585 commit ec1259a

File tree

4 files changed

+142
-97
lines changed

4 files changed

+142
-97
lines changed

NEWS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
1. `nafill()` now applies `fill=` to the front/back of the vector when `type="locf|nocb"`, [#3594](https://github.com/Rdatatable/data.table/issues/3594). Thanks to @ben519 for the feature request. It also now returns a named object based on the input names. Note that if you are considering joining and then using `nafill(...,type='locf|nocb')` afterwards, please review `roll=`/`rollends=` which should achieve the same result in one step more efficiently. `nafill()` is for when filling-while-joining (i.e. `roll=`/`rollends=`/`nomatch=`) cannot be applied.
1010

11+
2. `mean(na.rm=TRUE)` by group is now GForce optimized, [#4849](https://github.com/Rdatatable/data.table/issues/4849). Thanks to the [h2oai/db-benchmark](https://github.com/h2oai/db-benchmark) project for spotting this issue. The 1 billion row example in the issue shows 48s reduced to 14s. The optimization also applies to type `integer64` resulting in a difference to the `bit64::mean.integer64` method: `data.table` returns a `double` result whereas `bit64` rounds the mean to the nearest integer.
12+
1113
## BUG FIXES
1214

1315
## NOTES

R/data.table.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2856,7 +2856,7 @@ ghead = function(x, n) .Call(Cghead, x, as.integer(n)) # n is not used at the mo
28562856
gtail = function(x, n) .Call(Cgtail, x, as.integer(n)) # n is not used at the moment
28572857
gfirst = function(x) .Call(Cgfirst, x)
28582858
glast = function(x) .Call(Cglast, x)
2859-
gsum = function(x, na.rm=FALSE) .Call(Cgsum, x, na.rm, TRUE) # warnOverflow=TRUE, #986
2859+
gsum = function(x, na.rm=FALSE) .Call(Cgsum, x, na.rm)
28602860
gmean = function(x, na.rm=FALSE) .Call(Cgmean, x, na.rm)
28612861
gprod = function(x, na.rm=FALSE) .Call(Cgprod, x, na.rm)
28622862
gmedian = function(x, na.rm=FALSE) .Call(Cgmedian, x, na.rm)

inst/tests/tests.Rraw

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17263,3 +17263,13 @@ if (identical(x, enc2native(x))) {
1726317263

1726417264
# fintersect now preserves order of first argument like intersect, #4716
1726517265
test(2163, fintersect(data.table(x=c("b", "c", "a")), data.table(x=c("a","c")))$x, c("c", "a"))
17266+
17267+
# mean na.rm=TRUE GForce, #4849
17268+
d = data.table(a=1, b=list(1,2))
17269+
test(2164.1, d[, mean(b), by=a], error="not supported by GForce mean")
17270+
if (test_bit64) {
17271+
d = data.table(a=INT(1,1,2,2), b=as.integer64(c(2,3,4,NA)))
17272+
test(2164.2, d[, mean(b), by=a], data.table(a=INT(1,2), V1=c(2.5, NA)))
17273+
test(2164.3, d[, mean(b, na.rm=TRUE), by=a], data.table(a=INT(1,2), V1=c(2.5, 4)))
17274+
}
17275+

src/gsumm.c

Lines changed: 129 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,10 @@ void *gather(SEXP x, bool *anyNA)
337337
return gx;
338338
}
339339

340-
SEXP gsum(SEXP x, SEXP narmArg, SEXP warnOverflowArg)
340+
SEXP gsum(SEXP x, SEXP narmArg)
341341
{
342342
if (!isLogical(narmArg) || LENGTH(narmArg)!=1 || LOGICAL(narmArg)[0]==NA_LOGICAL) error(_("na.rm must be TRUE or FALSE"));
343343
const bool narm = LOGICAL(narmArg)[0];
344-
const bool warnOverflow = LOGICAL(warnOverflowArg)[0];
345344
if (inherits(x, "factor")) error(_("sum is not meaningful for factors."));
346345
const int n = (irowslen == -1) ? length(x) : irowslen;
347346
double started = wallclock();
@@ -401,7 +400,7 @@ SEXP gsum(SEXP x, SEXP narmArg, SEXP warnOverflowArg)
401400
//Rprintf(_("gsum int took %.3f\n"), wallclock()-started);
402401
if (overflow) {
403402
UNPROTECT(1); // discard the result with overflow
404-
if (warnOverflow) warning(_("The sum of an integer column for a group was more than type 'integer' can hold so the result has been coerced to 'numeric' automatically for convenience."));
403+
warning(_("The sum of an integer column for a group was more than type 'integer' can hold so the result has been coerced to 'numeric' automatically for convenience."));
405404
ans = PROTECT(allocVector(REALSXP, ngrp));
406405
double *restrict ansp = REAL(ans);
407406
memset(ansp, 0, ngrp*sizeof(double));
@@ -570,113 +569,147 @@ SEXP gsum(SEXP x, SEXP narmArg, SEXP warnOverflowArg)
570569
return(ans);
571570
}
572571

573-
SEXP gmean(SEXP x, SEXP narm)
572+
SEXP gmean(SEXP x, SEXP narmArg)
574573
{
575-
SEXP ans=R_NilValue;
576-
//clock_t start = clock();
577-
if (!isLogical(narm) || LENGTH(narm)!=1 || LOGICAL(narm)[0]==NA_LOGICAL) error(_("na.rm must be TRUE or FALSE"));
578-
if (!isVectorAtomic(x)) error(_("GForce mean can only be applied to columns, not .SD or similar. Likely you're looking for 'DT[,lapply(.SD,mean),by=,.SDcols=]'. See ?data.table."));
579574
if (inherits(x, "factor")) error(_("mean is not meaningful for factors."));
580-
if (!LOGICAL(narm)[0]) {
581-
int protecti=0;
582-
ans = PROTECT(gsum(x, narm, /*#986, warnOverflow=*/ScalarLogical(FALSE))); protecti++;
583-
switch(TYPEOF(ans)) {
584-
case LGLSXP: case INTSXP:
585-
ans = PROTECT(coerceVector(ans, REALSXP)); protecti++;
586-
case REALSXP: {
587-
double *xd = REAL(ans);
588-
for (int i=0; i<ngrp; i++) *xd++ /= grpsize[i]; // let NA propogate
589-
} break;
590-
case CPLXSXP: {
591-
Rcomplex *xd = COMPLEX(ans);
592-
for (int i=0; i<ngrp; i++) {
593-
xd->i /= grpsize[i];
594-
xd->r /= grpsize[i];
595-
xd++;
596-
}
597-
} break;
598-
default :
599-
error(_("Internal error: gsum returned type '%s'. typeof(x) is '%s'"), type2char(TYPEOF(ans)), type2char(TYPEOF(x))); // # nocov
600-
}
601-
UNPROTECT(protecti);
602-
return(ans);
603-
}
604-
// na.rm=TRUE. Similar to gsum, but we need to count the non-NA as well for the divisor
575+
if (!isLogical(narmArg) || LENGTH(narmArg)!=1 || LOGICAL(narmArg)[0]==NA_LOGICAL) error(_("na.rm must be TRUE or FALSE"));
576+
const bool narm = LOGICAL(narmArg)[0];
605577
const int n = (irowslen == -1) ? length(x) : irowslen;
606-
if (nrow != n) error(_("nrow [%d] != length(x) [%d] in %s"), nrow, n, "gsum");
607-
608-
long double *s = calloc(ngrp, sizeof(long double)), *si=NULL; // s = sum; si = sum imaginary just for complex
609-
if (!s) error(_("Unable to allocate %d * %d bytes for sum in gmean na.rm=TRUE"), ngrp, sizeof(long double));
610-
611-
int *c = calloc(ngrp, sizeof(int));
612-
if (!c) error(_("Unable to allocate %d * %d bytes for counts in gmean na.rm=TRUE"), ngrp, sizeof(int));
613-
578+
double started = wallclock();
579+
const bool verbose=GetVerbose();
580+
if (verbose) Rprintf(_("This gmean took (narm=%s) ... "), narm?"TRUE":"FALSE"); // narm=TRUE only at this point
581+
if (nrow != n) error(_("nrow [%d] != length(x) [%d] in %s"), nrow, n, "gmean");
582+
bool anyNA=false;
583+
SEXP ans=R_NilValue;
584+
int protecti=0;
614585
switch(TYPEOF(x)) {
615-
case LGLSXP: case INTSXP: {
616-
const int *xd = INTEGER(x);
617-
for (int i=0; i<n; i++) {
618-
int thisgrp = grp[i];
619-
int ix = (irowslen == -1) ? i : irows[i]-1;
620-
if (xd[ix] == NA_INTEGER) continue;
621-
s[thisgrp] += xd[ix]; // no under/overflow here, s is long double
622-
c[thisgrp]++;
623-
}
624-
} break;
586+
case LGLSXP: case INTSXP:
587+
x = PROTECT(coerceVector(x, REALSXP)); protecti++;
625588
case REALSXP: {
626-
const double *xd = REAL(x);
627-
for (int i=0; i<n; i++) {
628-
int thisgrp = grp[i];
629-
int ix = (irowslen == -1) ? i : irows[i]-1;
630-
if (ISNAN(xd[ix])) continue;
631-
s[thisgrp] += xd[ix];
632-
c[thisgrp]++;
633-
}
634-
} break;
635-
case CPLXSXP: {
636-
const Rcomplex *xd = COMPLEX(x);
637-
si = calloc(ngrp, sizeof(long double));
638-
if (!si) error(_("Unable to allocate %d * %d bytes for si in gmean na.rm=TRUE"), ngrp, sizeof(long double));
639-
for (int i=0; i<n; i++) {
640-
int thisgrp = grp[i];
641-
int ix = (irowslen == -1) ? i : irows[i]-1;
642-
if (ISNAN(xd[ix].r) || ISNAN(xd[ix].i)) continue; // || otherwise we'll need two counts in two c's too?
643-
s[thisgrp] += xd[ix].r;
644-
si[thisgrp] += xd[ix].i;
645-
c[thisgrp]++;
589+
if (INHERITS(x, char_integer64)) {
590+
x = PROTECT(coerceAs(x, /*as=*/ScalarReal(1), /*copyArg=*/ScalarLogical(TRUE))); protecti++;
646591
}
647-
} break;
648-
default:
649-
free(s); free(c); // # nocov because it already stops at gsum, remove nocov if gmean will support a type that gsum wont
650-
error(_("Type '%s' not supported by GForce mean (gmean) na.rm=TRUE. Either add the prefix base::mean(.) or turn off GForce optimization using options(datatable.optimize=1)"), type2char(TYPEOF(x))); // # nocov
651-
}
652-
switch(TYPEOF(x)) {
653-
case LGLSXP: case INTSXP: case REALSXP: {
654-
ans = PROTECT(allocVector(REALSXP, ngrp));
655-
double *ansd = REAL(ans);
656-
for (int i=0; i<ngrp; i++) {
657-
if (c[i]==0) { ansd[i] = R_NaN; continue; } // NaN to follow base::mean
658-
s[i] /= c[i];
659-
ansd[i] = s[i]>DBL_MAX ? R_PosInf : (s[i] < -DBL_MAX ? R_NegInf : (double)s[i]);
592+
const double *restrict gx = gather(x, &anyNA);
593+
ans = PROTECT(allocVector(REALSXP, ngrp)); protecti++;
594+
double *restrict ansp = REAL(ans);
595+
memset(ansp, 0, ngrp*sizeof(double));
596+
if (!narm || !anyNA) {
597+
#pragma omp parallel for num_threads(getDTthreads(highSize, false))
598+
for (int h=0; h<highSize; h++) {
599+
double *restrict _ans = ansp + (h<<shift);
600+
for (int b=0; b<nBatch; b++) {
601+
const int pos = counts[ b*highSize + h ];
602+
const int howMany = ((h==highSize-1) ? (b==nBatch-1?lastBatchSize:batchSize) : counts[ b*highSize + h + 1 ]) - pos;
603+
const double *my_gx = gx + b*batchSize + pos;
604+
const uint16_t *my_low = low + b*batchSize + pos;
605+
for (int i=0; i<howMany; i++) {
606+
_ans[my_low[i]] += my_gx[i]; // let NA propagate when !narm
607+
}
608+
}
609+
}
610+
#pragma omp parallel for num_threads(getDTthreads(ngrp, true))
611+
for (int i=0; i<ngrp; i++) ansp[i] /= grpsize[i];
612+
} else {
613+
// narm==true and anyNA==true
614+
int *restrict nna_counts = calloc(ngrp, sizeof(int));
615+
if (!nna_counts) error(_("Unable to allocate %d * %d bytes for non-NA counts in gmean na.rm=TRUE"), ngrp, sizeof(int));
616+
#pragma omp parallel for num_threads(getDTthreads(highSize, false))
617+
for (int h=0; h<highSize; h++) {
618+
double *restrict _ans = ansp + (h<<shift);
619+
int *restrict _nna = nna_counts + (h<<shift);
620+
for (int b=0; b<nBatch; b++) {
621+
const int pos = counts[ b*highSize + h ];
622+
const int howMany = ((h==highSize-1) ? (b==nBatch-1?lastBatchSize:batchSize) : counts[ b*highSize + h + 1 ]) - pos;
623+
const double *my_gx = gx + b*batchSize + pos;
624+
const uint16_t *my_low = low + b*batchSize + pos;
625+
for (int i=0; i<howMany; i++) {
626+
const double elem = my_gx[i];
627+
if (!ISNAN(elem)) {
628+
_ans[my_low[i]] += elem;
629+
_nna[my_low[i]]++;
630+
}
631+
}
632+
}
633+
}
634+
#pragma omp parallel for num_threads(getDTthreads(ngrp, true))
635+
for (int i=0; i<ngrp; i++) ansp[i] /= nna_counts[i];
636+
free(nna_counts);
660637
}
661638
} break;
662639
case CPLXSXP: {
663-
ans = PROTECT(allocVector(CPLXSXP, ngrp));
664-
Rcomplex *ansd = COMPLEX(ans);
665-
for (int i=0; i<ngrp; i++) {
666-
if (c[i]==0) { ansd[i].r = R_NaN; ansd[i].i = R_NaN; continue; }
667-
s[i] /= c[i];
668-
si[i] /= c[i];
669-
ansd[i].r = s[i] >DBL_MAX ? R_PosInf : (s[i] < -DBL_MAX ? R_NegInf : (double)s[i]);
670-
ansd[i].i = si[i]>DBL_MAX ? R_PosInf : (si[i]< -DBL_MAX ? R_NegInf : (double)si[i]);
640+
const Rcomplex *restrict gx = gather(x, &anyNA);
641+
ans = PROTECT(allocVector(CPLXSXP, ngrp)); protecti++;
642+
Rcomplex *restrict ansp = COMPLEX(ans);
643+
memset(ansp, 0, ngrp*sizeof(Rcomplex));
644+
if (!narm || !anyNA) {
645+
#pragma omp parallel for num_threads(getDTthreads(highSize, false))
646+
for (int h=0; h<highSize; h++) {
647+
Rcomplex *restrict _ans = ansp + (h<<shift);
648+
for (int b=0; b<nBatch; b++) {
649+
const int pos = counts[ b*highSize + h ];
650+
const int howMany = ((h==highSize-1) ? (b==nBatch-1?lastBatchSize:batchSize) : counts[ b*highSize + h + 1 ]) - pos;
651+
const Rcomplex *my_gx = gx + b*batchSize + pos;
652+
const uint16_t *my_low = low + b*batchSize + pos;
653+
for (int i=0; i<howMany; i++) {
654+
_ans[my_low[i]].r += my_gx[i].r; // let NA propagate when !narm
655+
_ans[my_low[i]].i += my_gx[i].i;
656+
}
657+
}
658+
}
659+
#pragma omp parallel for num_threads(getDTthreads(ngrp, true))
660+
for (int i=0; i<ngrp; i++) {
661+
ansp[i].i /= grpsize[i];
662+
ansp[i].r /= grpsize[i];
663+
}
664+
} else {
665+
// narm==true and anyNA==true
666+
int *restrict nna_counts_r = calloc(ngrp, sizeof(int));
667+
int *restrict nna_counts_i = calloc(ngrp, sizeof(int));
668+
if (!nna_counts_r || !nna_counts_i) {
669+
// # nocov start
670+
free(nna_counts_r); // free(NULL) is allowed and does nothing. Avoids repeating the error() call here.
671+
free(nna_counts_i);
672+
error(_("Unable to allocate %d * %d bytes for non-NA counts in gmean na.rm=TRUE"), ngrp, sizeof(int));
673+
// # nocov end
674+
}
675+
#pragma omp parallel for num_threads(getDTthreads(highSize, false))
676+
for (int h=0; h<highSize; h++) {
677+
Rcomplex *restrict _ans = ansp + (h<<shift);
678+
int *restrict _nna_r = nna_counts_r + (h<<shift);
679+
int *restrict _nna_i = nna_counts_i + (h<<shift);
680+
for (int b=0; b<nBatch; b++) {
681+
const int pos = counts[ b*highSize + h ];
682+
const int howMany = ((h==highSize-1) ? (b==nBatch-1?lastBatchSize:batchSize) : counts[ b*highSize + h + 1 ]) - pos;
683+
const Rcomplex *my_gx = gx + b*batchSize + pos;
684+
const uint16_t *my_low = low + b*batchSize + pos;
685+
for (int i=0; i<howMany; i++) {
686+
const Rcomplex elem = my_gx[i];
687+
if (!ISNAN(elem.r)) {
688+
_ans[my_low[i]].r += elem.r;
689+
_nna_r[my_low[i]]++;
690+
}
691+
if (!ISNAN(elem.i)) {
692+
_ans[my_low[i]].i += elem.i;
693+
_nna_i[my_low[i]]++;
694+
}
695+
}
696+
}
697+
}
698+
#pragma omp parallel for num_threads(getDTthreads(ngrp, true))
699+
for (int i=0; i<ngrp; i++) {
700+
ansp[i].r /= nna_counts_r[i];
701+
ansp[i].i /= nna_counts_i[i];
702+
}
703+
free(nna_counts_r);
704+
free(nna_counts_i);
671705
}
672706
} break;
673707
default:
674-
error(_("Internal error: unsupported type at the end of gmean")); // # nocov
708+
error(_("Type '%s' not supported by GForce mean (gmean). Either add the prefix base::mean(.) or turn off GForce optimization using options(datatable.optimize=1)"), type2char(TYPEOF(x)));
675709
}
676-
free(s); free(si); free(c);
677710
copyMostAttrib(x, ans);
678-
// Rprintf(_("this gmean na.rm=TRUE took %8.3f\n"), 1.0*(clock()-start)/CLOCKS_PER_SEC);
679-
UNPROTECT(1);
711+
if (verbose) { Rprintf(_("%.3fs\n"), wallclock()-started); }
712+
UNPROTECT(protecti);
680713
return(ans);
681714
}
682715

0 commit comments

Comments
 (0)