Skip to content

Commit

Permalink
Merge #459
Browse files Browse the repository at this point in the history
459: Th/approx count distinct r=thatzopoulos a=thatzopoulos

Issue : #3

This PR adds a new function `approx_count_distinct`, a method to count the number of distinct elements in a table, such as hyperloglog, that does not require the user to specify the number of buckets/registers. The number of buckets/registers is always set to 32,768

Co-authored-by: Thomas Hatzopoulos <thomas@timescale.com>
  • Loading branch information
bors[bot] and thatzopoulos authored Jul 1, 2022
2 parents 81d9536 + 4eff11f commit dd68ff7
Showing 1 changed file with 122 additions and 2 deletions.
124 changes: 122 additions & 2 deletions extension/src/hyperloglog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,43 @@ pub fn hyperloglog_trans(
fc: pg_sys::FunctionCallInfo,
) -> Option<Internal> {
// let state: Internal = Internal::from_datum();
hyperloglog_trans_inner(unsafe{ state.to_inner() }, size, value, fc).internal()
hyperloglog_trans_inner(
unsafe { state.to_inner() },
size,
value,
fc,
unsafe { pgx::get_getarg_type(fc, 2) },
)
.internal()
}

const APPROX_COUNT_DISTINCT_DEFAULT_SIZE: i32 = 32678;

/// Similar to hyperloglog_trans(), except size is set to a default of 32,678
#[pg_extern(immutable, parallel_safe,schema = "toolkit_experimental")]
pub fn approx_count_distinct_trans(
state: Internal,
// TODO we want to use crate::raw::AnyElement but it doesn't work for some reason...
value: Option<AnyElement>,
fc: pg_sys::FunctionCallInfo,
) -> Option<Internal> {
// let state: Internal = Internal::from_datum();
hyperloglog_trans_inner(
unsafe { state.to_inner() },
APPROX_COUNT_DISTINCT_DEFAULT_SIZE,
value,
fc,
unsafe { pgx::get_getarg_type(fc, 1) },
)
.internal()
}

pub fn hyperloglog_trans_inner(
state: Option<Inner<HyperLogLogTrans>>,
size: i32,
value: Option<AnyElement>,
fc: pg_sys::FunctionCallInfo,
arg_type: pg_sys::Oid
) -> Option<Inner<HyperLogLogTrans>> {
unsafe {
in_aggregate_context(fc, || {
Expand All @@ -66,7 +96,7 @@ pub fn hyperloglog_trans_inner(
though less than 1024 not recommended", size)
}

let typ = pgx::get_getarg_type(fc, 2);
let typ = arg_type;
let collation = get_collation(fc);
let hasher = DatumHashBuilder::from_type_id(typ, collation);
let trans = HyperLogLogTrans {
Expand Down Expand Up @@ -216,6 +246,22 @@ name = "hll_agg",
requires = [hyperloglog_trans, hyperloglog_final, hyperloglog_combine, hyperloglog_serialize, hyperloglog_deserialize],
);

extension_sql!("\n\
CREATE AGGREGATE toolkit_experimental.approx_count_distinct(value AnyElement)\n\
(\n\
stype = internal,\n\
sfunc = toolkit_experimental.approx_count_distinct_trans,\n\
finalfunc = hyperloglog_final,\n\
combinefunc = hyperloglog_combine,\n\
serialfunc = hyperloglog_serialize,\n\
deserialfunc = hyperloglog_deserialize,\n\
parallel = safe\n\
);\n\
",
name = "approx_count_distinct_agg",
requires = [approx_count_distinct_trans, hyperloglog_final, hyperloglog_combine, hyperloglog_serialize, hyperloglog_deserialize],
);

#[pg_extern(immutable, parallel_safe)]
pub fn hyperloglog_union(
state: Internal,
Expand Down Expand Up @@ -456,6 +502,80 @@ mod tests {
});
}

#[pg_test]
// Should have same results as test_hll_distinct_aggregate running with the same number of buckets
fn test_approx_count_distinct_aggregate() {
Spi::execute(|client| {
let text = client
.select(
"SELECT \
toolkit_experimental.approx_count_distinct(v::float)::TEXT \
FROM generate_series(1, 100) v",
None,
None,
)
.first()
.get_one::<String>();

let expected = "(\
version:1,\
log:Sparse(\
num_compressed:100,\
element_type:FLOAT8,\
collation:None,\
compressed_bytes:320,\
precision:15,\
compressed:[\
4,61,17,164,87,15,68,239,255,132,121,35,164,5,74,132,160,\
109,4,177,61,100,68,200,4,144,32,132,118,9,228,190,94,68,\
120,56,36,121,213,200,97,65,3,200,108,96,2,72,128,10,2,100,\
182,161,36,218,115,196,202,145,228,189,224,132,21,63,36,\
88,116,100,162,122,132,139,97,228,245,19,36,242,15,228,115,\
65,164,114,2,8,224,32,2,72,157,130,2,68,232,93,136,105,1,2,\
132,16,59,4,34,46,8,244,104,2,226,240,8,82,159,2,200,225,49,\
2,132,96,9,4,222,195,164,54,22,228,201,59,164,168,27,100,32,\
58,8,76,32,2,36,56,17,136,18,143,4,132,162,156,196,178,22,\
132,119,72,228,213,48,4,26,63,68,28,156,36,151,75,36,19,202,\
164,152,111,164,177,240,98,27,196,254,46,8,138,82,6,164,53,38,\
36,125,151,8,167,213,3,4,167,248,68,183,61,36,149,32,164,112,\
121,164,14,139,100,56,166,164,24,48,8,33,90,2,132,115,89,72,\
100,112,5,196,221,128,228,245,33,4,216,92,8,33,195,6,100,8,54,\
200,74,2,5,200,101,158,3,228,106,110,72,151,98,2,228,38,26,196,\
143,15,36,122,57,200,191,43,2,164,225,186,196,219,46,36,26,146,\
228,129,128,136,6,183,2,4,238,106,200,48,168,2,164,14,13,68,55,\
196,132,208,90,164,50,130,68,58,137,196,3,88,196,71,31\
]\
)\
)";
assert_eq!(text.unwrap(), expected);

let (count, arrow_count) = client
.select("SELECT \
distinct_count(\
toolkit_experimental.approx_count_distinct(v::float)\
), \
toolkit_experimental.approx_count_distinct(v::float)->toolkit_experimental.distinct_count() \
FROM generate_series(1, 100) v", None, None)
.first()
.get_two::<i32, i32>();
assert_eq!(count, Some(100));
assert_eq!(count, arrow_count);

let count2 = client
.select(
&format!(
"SELECT distinct_count('{}')",
expected
),
None,
None,
)
.first()
.get_one::<i32>();
assert_eq!(count2, count);
});
}

#[pg_test]
fn test_hll_byte_io() {
unsafe {
Expand Down

0 comments on commit dd68ff7

Please sign in to comment.