Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: collect to list (non-windowed) (primitive/strings/booleans) #569

Merged
merged 13 commits into from
Jul 31, 2023
Prev Previous commit
Next Next commit
Allow to indicate unlimited size
  • Loading branch information
jordanrfrazier committed Jul 31, 2023
commit 5bdcbfae2e2b87abae128a960b129c81214f70f5
1 change: 0 additions & 1 deletion crates/sparrow-compiler/src/functions/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub(super) fn register(registry: &mut Registry) {
.with_implementation(Implementation::Instruction(InstOp::Index))
.set_internal();

// TODO: Make MAX default to something?
registry
.register("collect<T: any>(const max: i64, input: T, window: window = null) -> list<T>")
.with_implementation(Implementation::Instruction(InstOp::Collect))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct CollectBooleanEvaluator {
///
/// Once the max size is reached, the front will be popped and the new
/// value pushed to the back.
max: i64,
max: usize,
input: ValueRef,
tick: ValueRef,
duration: ValueRef,
Expand All @@ -36,7 +36,13 @@ impl EvaluatorFactory for CollectBooleanEvaluator {
};

let max = match info.args[0].value_ref.literal_value() {
Some(ScalarValue::Int64(Some(v))) => *v,
Some(ScalarValue::Int64(Some(v))) if *v <= 0 => {
anyhow::bail!("unexpected value of `max` -- must be > 0")
}
Some(ScalarValue::Int64(Some(v))) => *v as usize,
// If a user specifies `max = null`, we use usize::MAX value as a way
// to have an "unlimited" buffer.
Some(ScalarValue::Int64(None)) => usize::MAX,
Some(other) => anyhow::bail!("expected i64 for max parameter, saw {:?}", other),
None => anyhow::bail!("expected literal value for max parameter"),
};
Expand Down Expand Up @@ -91,7 +97,7 @@ impl CollectBooleanEvaluator {
izip!(entity_indices.values(), input).for_each(|(entity_index, input)| {
let entity_index = *entity_index as usize;

self.token.add_value(self.max as usize, entity_index, input);
self.token.add_value(self.max, entity_index, input);
let cur_list = self.token.state(entity_index);

list_builder.append_value(cur_list.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ where
///
/// Once the max size is reached, the front will be popped and the new
/// value pushed to the back.
max: i64,
max: usize,
input: ValueRef,
tick: ValueRef,
duration: ValueRef,
Expand All @@ -51,7 +51,13 @@ where
};

let max = match info.args[0].value_ref.literal_value() {
Some(ScalarValue::Int64(Some(v))) => *v,
Some(ScalarValue::Int64(Some(v))) if *v <= 0 => {
anyhow::bail!("unexpected value of `max` -- must be > 0")
}
Some(ScalarValue::Int64(Some(v))) => *v as usize,
// If a user specifies `max = null`, we use usize::MAX value as a way
// to have an "unlimited" buffer.
Some(ScalarValue::Int64(None)) => usize::MAX,
Some(other) => anyhow::bail!("expected i64 for max parameter, saw {:?}", other),
None => anyhow::bail!("expected literal value for max parameter"),
};
Expand Down Expand Up @@ -114,7 +120,7 @@ where
izip!(entity_indices.values(), input).for_each(|(entity_index, input)| {
let entity_index = *entity_index as usize;

self.token.add_value(self.max as usize, entity_index, input);
self.token.add_value(self.max, entity_index, input);
let cur_list = self.token.state(entity_index);

list_builder.append_value(cur_list.clone());
Expand Down
14 changes: 10 additions & 4 deletions crates/sparrow-instructions/src/evaluators/list/collect_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct CollectStringEvaluator {
///
/// Once the max size is reached, the front will be popped and the new
/// value pushed to the back.
max: i64,
max: usize,
input: ValueRef,
tick: ValueRef,
duration: ValueRef,
Expand All @@ -35,8 +35,14 @@ impl EvaluatorFactory for CollectStringEvaluator {
other => anyhow::bail!("expected list result type, saw {:?}", other),
};

let max = match &info.args[0].value_ref.literal_value() {
Some(ScalarValue::Int64(Some(v))) => *v,
let max = match info.args[0].value_ref.literal_value() {
Some(ScalarValue::Int64(Some(v))) if *v <= 0 => {
anyhow::bail!("unexpected value of `max` -- must be > 0")
}
Some(ScalarValue::Int64(Some(v))) => *v as usize,
// If a user specifies `max = null`, we use usize::MAX value as a way
// to have an "unlimited" buffer.
Some(ScalarValue::Int64(None)) => usize::MAX,
Some(other) => anyhow::bail!("expected i64 for max parameter, saw {:?}", other),
None => anyhow::bail!("expected literal value for max parameter"),
};
Expand Down Expand Up @@ -92,7 +98,7 @@ impl CollectStringEvaluator {
let entity_index = *entity_index as usize;

self.token
.add_value(self.max as usize, entity_index, input.map(|s| s.to_owned()));
.add_value(self.max, entity_index, input.map(|s| s.to_owned()));
let cur_list = self.token.state(entity_index);

list_builder.append_value(cur_list.clone());
Expand Down
21 changes: 21 additions & 0 deletions crates/sparrow-main/tests/e2e/collect_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,27 @@ pub(crate) async fn collect_data_fixture() -> DataFixture {
.unwrap()
}

#[tokio::test]
async fn test_collect_with_null_max() {
insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.n | collect(max = null) | index(0), f2: Collect.b | collect(max = null) | index(0), f3: Collect.s | collect(max = null) | index(0) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###"
_time,_subsort,_key_hash,_key,f1,f2,f3
1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo
1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo
1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo
1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo
1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,0,true,hEllo
1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,5,false,h
1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,5,false,h
1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B,5,false,h
1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,5,false,h
1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,5,false,h
1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C,1,true,g
1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,1,true,g
1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C,1,true,g
1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,1,true,g
"###);
}

#[tokio::test]
async fn test_collect_to_list_i64() {
insta::assert_snapshot!(QueryFixture::new("{ f1: Collect.n | collect(10) | index(0) }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###"
Expand Down