Skip to content

Commit

Permalink
IMPALA-5310: Part 2: Add SAMPLED_NDV() function.
Browse files Browse the repository at this point in the history
Adds a new SAMPLED_NDV() aggregate function that is
intended to be used in COMPUTE STATS TABLESAMPLE.
This patch only adds the function itself. Integration
with COMPUTE STATS will come in a separate patch.

SAMPLED_NDV() estimates the number of distinct values (NDV)
based on a sample of data and the corresponding sampling rate.
The main idea is to collect several x/y data points where x is
the number of rows and y is the corresponding NDV estimate.
These data points are used to fit an objective function to the
data such that the true NDV can be extrapolated.
The aggregate function maintains a fixed number of HyperLogLog
intermediates to compute the x/y points.
Several objective functions are fit and the best-fit one is
used for extrapolation.

Adds the MPFIT C library to perform curve fitting:
https://www.physics.wisc.edu/~craigm/idl/cmpfit.html

The library is a C port from Fortran. Scipy uses the
Fortran version of the library for curve fitting.

Testing:
- added functional tests
- core/hdfs run passed

Change-Id: Ia51d56ee67ec6073e92f90bebb4005484138b820
Reviewed-on: http://gerrit.cloudera.org:8080/8569
Reviewed-by: Alex Behm <alex.behm@cloudera.com>
Tested-by: Impala Public Jenkins
  • Loading branch information
Alex Behm authored and Impala Public Jenkins committed Dec 12, 2017
1 parent e57c77f commit 0936e32
Show file tree
Hide file tree
Showing 19 changed files with 3,976 additions and 12 deletions.
81 changes: 81 additions & 0 deletions LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -946,3 +946,84 @@ Some portions of this module are derived from code from FastHash
OTHER DEALINGS IN THE SOFTWARE.

--------------------------------------------------------------------------------

be/src/thirdparty/mpfit

MPFIT: A MINPACK-1 Least Squares Fitting Library in C

Original public domain version by B. Garbow, K. Hillstrom, J. More'
(Argonne National Laboratory, MINPACK project, March 1980)
Copyright (1999) University of Chicago
(see below)

Tranlation to C Language by S. Moshier (moshier.net)
(no restrictions placed on distribution)

Enhancements and packaging by C. Markwardt
(comparable to IDL fitting routine MPFIT
see http://cow.physics.wisc.edu/~craigm/idl/idl.html)
Copyright (C) 2003, 2004, 2006, 2007 Craig B. Markwardt

This software is provided as is without any warranty whatsoever.
Permission to use, copy, modify, and distribute modified or
unmodified copies is granted, provided this copyright and disclaimer
are included unchanged.


Source code derived from MINPACK must have the following disclaimer
text provided.

===========================================================================
Minpack Copyright Notice (1999) University of Chicago. All rights reserved

Redistribution and use in source and binary forms, with or
without modification, are permitted provided that the
following conditions are met:

1. Redistributions of source code must retain the above
copyright notice, this list of conditions and the following
disclaimer.

2. Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials
provided with the distribution.

3. The end-user documentation included with the
redistribution, if any, must include the following
acknowledgment:

"This product includes software developed by the
University of Chicago, as Operator of Argonne National
Laboratory.

Alternately, this acknowledgment may appear in the software
itself, if and wherever such third-party acknowledgments
normally appear.

4. WARRANTY DISCLAIMER. THE SOFTWARE IS SUPPLIED "AS IS"
WITHOUT WARRANTY OF ANY KIND. THE COPYRIGHT HOLDER, THE
UNITED STATES, THE UNITED STATES DEPARTMENT OF ENERGY, AND
THEIR EMPLOYEES: (1) DISCLAIM ANY WARRANTIES, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO ANY IMPLIED WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE
OR NON-INFRINGEMENT, (2) DO NOT ASSUME ANY LEGAL LIABILITY
OR RESPONSIBILITY FOR THE ACCURACY, COMPLETENESS, OR
USEFULNESS OF THE SOFTWARE, (3) DO NOT REPRESENT THAT USE OF
THE SOFTWARE WOULD NOT INFRINGE PRIVATELY OWNED RIGHTS, (4)
DO NOT WARRANT THAT THE SOFTWARE WILL FUNCTION
UNINTERRUPTED, THAT IT IS ERROR-FREE OR THAT ANY ERRORS WILL
BE CORRECTED.

5. LIMITATION OF LIABILITY. IN NO EVENT WILL THE COPYRIGHT
HOLDER, THE UNITED STATES, THE UNITED STATES DEPARTMENT OF
ENERGY, OR THEIR EMPLOYEES: BE LIABLE FOR ANY INDIRECT,
INCIDENTAL, CONSEQUENTIAL, SPECIAL OR PUNITIVE DAMAGES OF
ANY KIND OR NATURE, INCLUDING BUT NOT LIMITED TO LOSS OF
PROFITS OR LOSS OF DATA, FOR ANY REASON WHATSOEVER, WHETHER
SUCH LIABILITY IS ASSERTED ON THE BASIS OF CONTRACT, TORT
(INCLUDING NEGLIGENCE OR STRICT LIABILITY), OR OTHERWISE,
EVEN IF ANY OF SAID PARTIES HAS BEEN WARNED OF THE
POSSIBILITY OF SUCH LOSS OR DAMAGES.

--------------------------------------------------------------------------------
4 changes: 4 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ Project for use in the OpenSSL Toolkit (http://www.openssl.org/)
This product includes cryptographic software written by Eric Young
(eay@cryptsoft.com). This product includes software written by Tim
Hudson (tjh@cryptsoft.com).

This product includes software developed by the University of Chicago,
as Operator of Argonne National Laboratory.
Copyright (C) 1999 University of Chicago. All rights reserved.
209 changes: 206 additions & 3 deletions be/src/exprs/aggregate-functions-ir.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,33 @@

#include "exprs/aggregate-functions.h"

#include <math.h>
#include <algorithm>
#include <map>
#include <sstream>
#include <utility>
#include <cmath>

#include <boost/random/ranlux.hpp>
#include <boost/random/uniform_int.hpp>

#include "codegen/impala-ir.h"
#include "common/logging.h"
#include "exprs/anyval-util.h"
#include "exprs/hll-bias.h"
#include "runtime/decimal-value.inline.h"
#include "runtime/runtime-state.h"
#include "runtime/string-value.inline.h"
#include "runtime/timestamp-value.h"
#include "runtime/timestamp-value.inline.h"
#include "exprs/anyval-util.h"
#include "exprs/hll-bias.h"
#include "util/mpfit-util.h"

#include "common/names.h"

using boost::uniform_int;
using boost::ranlux64_3;
using std::make_pair;
using std::map;
using std::min_element;
using std::nth_element;
using std::pop_heap;
using std::push_heap;
Expand Down Expand Up @@ -1480,6 +1482,186 @@ BigIntVal AggregateFunctions::HllFinalize(FunctionContext* ctx, const StringVal&
return estimate;
}

/// Intermediate aggregation state for the SampledNdv() function.
/// Stores NUM_HLL_BUCKETS of the form <row_count, hll_state>.
/// The 'row_count' keeps track of how many input rows were aggregated into that
/// bucket, and the 'hll_state' is an intermediate aggregation state of HyperLogLog.
/// See the header comments on the SampledNdv() function for more details.
class SampledNdvState {
public:
/// Empirically determined number of HLL buckets. Power of two for fast modulo.
static const uint32_t NUM_HLL_BUCKETS = 32;

/// A bucket contains an update count and an HLL intermediate state.
static constexpr int64_t BUCKET_SIZE = sizeof(int64_t) + AggregateFunctions::HLL_LEN;

/// Sampling percent which was given as the second argument to SampledNdv().
/// Stored here to avoid existing issues with passing constant arguments to all
/// aggregation phases and because we convert the sampling percent argument from
/// decimal to double. See IMPALA-6179.
double sample_perc;

/// Counts the number of Update() calls. Used for determining which bucket to update.
int64_t total_row_count;

/// Array of buckets.
struct {
int64_t row_count;
uint8_t hll[AggregateFunctions::HLL_LEN];
} buckets[NUM_HLL_BUCKETS];
};

void AggregateFunctions::SampledNdvInit(FunctionContext* ctx, StringVal* dst) {
// Uses a preallocated FIXED_UDA_INTERMEDIATE intermediate value.
DCHECK_EQ(dst->len, sizeof(SampledNdvState));
memset(dst->ptr, 0, sizeof(SampledNdvState));

DoubleVal* sample_perc = reinterpret_cast<DoubleVal*>(ctx->GetConstantArg(1));
if (sample_perc == nullptr) return;
// Guaranteed by the FE.
DCHECK(!sample_perc->is_null);
DCHECK_GE(sample_perc->val, 0.0);
DCHECK_LE(sample_perc->val, 1.0);
SampledNdvState* state = reinterpret_cast<SampledNdvState*>(dst->ptr);
state->sample_perc = sample_perc->val;
}

/// Incorporate the 'src' into one of the intermediate HLLs, which will be used by
/// Finalize() to generate a set of the (x,y) data points.
template <typename T>
void AggregateFunctions::SampledNdvUpdate(FunctionContext* ctx, const T& src,
const DoubleVal& sample_perc, StringVal* dst) {
SampledNdvState* state = reinterpret_cast<SampledNdvState*>(dst->ptr);
int64_t bucket_idx = state->total_row_count % SampledNdvState::NUM_HLL_BUCKETS;
StringVal hll_dst = StringVal(state->buckets[bucket_idx].hll, HLL_LEN);
HllUpdate(ctx, src, &hll_dst);
++state->buckets[bucket_idx].row_count;
++state->total_row_count;
}

void AggregateFunctions::SampledNdvMerge(FunctionContext* ctx, const StringVal& src,
StringVal* dst) {
SampledNdvState* src_state = reinterpret_cast<SampledNdvState*>(src.ptr);
SampledNdvState* dst_state = reinterpret_cast<SampledNdvState*>(dst->ptr);
for (int i = 0; i < SampledNdvState::NUM_HLL_BUCKETS; ++i) {
StringVal src_hll = StringVal(src_state->buckets[i].hll, HLL_LEN);
StringVal dst_hll = StringVal(dst_state->buckets[i].hll, HLL_LEN);
HllMerge(ctx, src_hll, &dst_hll);
dst_state->buckets[i].row_count += src_state->buckets[i].row_count;
}
// Total count. Not really needed after Update() but kept for sanity checking.
dst_state->total_row_count += src_state->total_row_count;
// Propagate sampling percent to Finalize().
dst_state->sample_perc = src_state->sample_perc;
}

BigIntVal AggregateFunctions::SampledNdvFinalize(FunctionContext* ctx,
const StringVal& src) {
SampledNdvState* state = reinterpret_cast<SampledNdvState*>(src.ptr);

// Generate 'num_points' data points with x=row_count and y=ndv_estimate. These points
// are used to fit a function for the NDV growth and estimate the real NDV.
constexpr int num_points =
SampledNdvState::NUM_HLL_BUCKETS * SampledNdvState::NUM_HLL_BUCKETS;
int64_t counts[num_points] = { 0 };
int64_t ndvs[num_points] = { 0 };

int64_t min_ndv = numeric_limits<int64_t>::max();
int64_t min_count = numeric_limits<int64_t>::max();
// We have a fixed number of HLL intermediates to generate data points. Any unique
// subset of intermediates can be combined to create a new data point. It was
// empirically determined that 'num_data' points is typically sufficient and there are
// diminishing returns from generating additional data points.
// The generation method below was chosen for its simplicity. It successively merges
// buckets in a rolling window of size NUM_HLL_BUCKETS. Repeating the last data point
// where all buckets are merged biases the curve fitting to hit that data point which
// makes sense because that's likely the most accurate one. The number of data points
// are sufficient for reasonable accuracy.
int pidx = 0;
for (int i = 0; i < SampledNdvState::NUM_HLL_BUCKETS; ++i) {
uint8_t merged_hll_data[HLL_LEN];
memset(merged_hll_data, 0, HLL_LEN);
StringVal merged_hll(merged_hll_data, HLL_LEN);
int64_t merged_count = 0;
for (int j = 0; j < SampledNdvState::NUM_HLL_BUCKETS; ++j) {
int bucket_idx = (i + j) % SampledNdvState::NUM_HLL_BUCKETS;
merged_count += state->buckets[bucket_idx].row_count;
counts[pidx] = merged_count;
StringVal hll = StringVal(state->buckets[bucket_idx].hll, HLL_LEN);
HllMerge(ctx, hll, &merged_hll);
ndvs[pidx] = HllFinalEstimate(merged_hll.ptr, HLL_LEN);
++pidx;
}
min_count = std::min(min_count, state->buckets[i].row_count);
min_ndv = std::min(min_ndv, ndvs[i * SampledNdvState::NUM_HLL_BUCKETS]);
}
// Based on the point-generation method above the last elements represent the data
// point where all buckets are merged.
int64_t max_count = counts[num_points - 1];
int64_t max_ndv = ndvs[num_points - 1];

// Scale all values to [0,1] since some objective functions require it (e.g., Sigmoid).
double count_scale = max_count - min_count;
double ndv_scale = max_ndv - min_ndv;
if (count_scale == 0) count_scale = 1.0;
if (ndv_scale == 0) ndv_scale = 1.0;
double scaled_counts[num_points];
double scaled_ndvs[num_points];
for (int i = 0; i < num_points; ++i) {
scaled_counts[i] = counts[i] / count_scale;
scaled_ndvs[i] = ndvs[i] / ndv_scale;
}

// List of objective functions. Curve fitting will select the best values for the
// parameters a, b, c, d.
vector<ObjectiveFunction> ndv_fns;
// Linear function: f(x) = a + b * x
ndv_fns.push_back(ObjectiveFunction("LIN", 2,
[](double x, const double* params) -> double {
return params[0] + params[1] * x;
}
));
// Logarithmic function: f(x) = a + b * log(x)
ndv_fns.push_back(ObjectiveFunction("LOG", 2,
[](double x, const double* params) -> double {
return params[0] + params[1] * log(x);
}
));
// Power function: f(x) = a + b * pow(x, c)
ndv_fns.push_back(ObjectiveFunction("POW", 3,
[](double x, const double* params) -> double {
return params[0] + params[1] * pow(x, params[2]);
}
));
// Sigmoid function: f(x) = a + b * (c / (c + pow(d, -x)))
ndv_fns.push_back(ObjectiveFunction("SIG", 4,
[](double x, const double* params) -> double {
return params[0] + params[1] * (params[2] / (params[2] + pow(params[3], -x)));
}
));

// Perform least mean squares fitting on all objective functions.
vector<ObjectiveFunction> valid_ndv_fns;
for (ObjectiveFunction& f: ndv_fns) {
if(f.LmsFit(scaled_counts, scaled_ndvs, num_points)) {
valid_ndv_fns.push_back(std::move(f));
}
}

// Select the best-fit function for estimating the NDV.
auto best_fit_fn = min_element(valid_ndv_fns.begin(), valid_ndv_fns.end(),
[](const ObjectiveFunction& a, const ObjectiveFunction& b) -> bool {
return a.GetError() < b.GetError();
}
);

// Compute the extrapolated NDV based on the extrapolated row count.
double extrap_count = max_count / state->sample_perc;
double scaled_extrap_count = extrap_count / count_scale;
double scaled_extrap_ndv = best_fit_fn->GetY(scaled_extrap_count);
return round(scaled_extrap_ndv * ndv_scale);
}

// An implementation of a simple single pass variance algorithm. A standard UDA must
// be single pass (i.e. does not scan the table more than once), so the most canonical
// two pass approach is not practical.
Expand Down Expand Up @@ -2140,6 +2322,27 @@ template void AggregateFunctions::HllUpdate(
template void AggregateFunctions::HllUpdate(
FunctionContext*, const DecimalVal&, StringVal*);

template void AggregateFunctions::SampledNdvUpdate(
FunctionContext*, const BooleanVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
FunctionContext*, const TinyIntVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
FunctionContext*, const SmallIntVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
FunctionContext*, const IntVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
FunctionContext*, const BigIntVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
FunctionContext*, const FloatVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
FunctionContext*, const DoubleVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
FunctionContext*, const StringVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
FunctionContext*, const TimestampVal&, const DoubleVal&, StringVal*);
template void AggregateFunctions::SampledNdvUpdate(
FunctionContext*, const DecimalVal&, const DoubleVal&, StringVal*);

template void AggregateFunctions::KnuthVarUpdate(
FunctionContext*, const TinyIntVal&, StringVal*);
template void AggregateFunctions::KnuthVarUpdate(
Expand Down
17 changes: 17 additions & 0 deletions be/src/exprs/aggregate-functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,23 @@ class AggregateFunctions {
/// estimates.
static uint64_t HllFinalEstimate(const uint8_t* buckets, int32_t num_buckets);

/// Estimates the number of distinct values (NDV) based on a sample of data and the
/// corresponding sampling rate. The main idea of this function is to collect several
/// (x,y) data points where x is the number of rows and y is the corresponding NDV
/// estimate. These data points are used to fit an objective function to the data such
/// that the true NDV can be extrapolated.
/// This aggregate function maintains a fixed number of HyperLogLog intermediates.
/// The Update() phase updates the intermediates in a round-robin fashion.
/// The Merge() phase combines the corresponding intermediates.
/// The Finalize() phase generates (x,y) data points, performs curve fitting, and
/// computes the estimated true NDV.
static void SampledNdvInit(FunctionContext*, StringVal* dst);
template <typename T>
static void SampledNdvUpdate(FunctionContext*, const T& src,
const DoubleVal& sample_perc, StringVal* dst);
static void SampledNdvMerge(FunctionContext*, const StringVal& src, StringVal* dst);
static BigIntVal SampledNdvFinalize(FunctionContext*, const StringVal& src);

/// Knuth's variance algorithm, more numerically stable than canonical stddev
/// algorithms; reference implementation:
/// http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm
Expand Down
Loading

0 comments on commit 0936e32

Please sign in to comment.