Skip to content

Commit

Permalink
Merge #761
Browse files Browse the repository at this point in the history
761: Make sure nmost combine uses correct memctx r=WireBaron a=WireBaron

Fixes #759

Co-authored-by: Brian Rowe <brian@timescale.com>
  • Loading branch information
bors[bot] and Brian Rowe authored Jun 15, 2023
2 parents 96c29e0 + abff00e commit 04e5a41
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 36 deletions.
1 change: 1 addition & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This changelog should be updated as part of a PR if the work is worth noting (mo
#### New experimental features

#### Bug fixes
- [#761](https://github.com/timescale/timescaledb-toolkit/pull/761): Make sure nmost combine uses correct memctx

#### Other notable changes

Expand Down
27 changes: 15 additions & 12 deletions extension/src/nmost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,23 @@ fn nmost_rollup_trans_function<T: Ord + Copy>(
fn nmost_trans_combine<T: Clone + Ord + Copy>(
first: Option<Inner<NMostTransState<T>>>,
second: Option<Inner<NMostTransState<T>>>,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Inner<NMostTransState<T>>> {
match (first, second) {
(None, None) => None,
(None, Some(only)) | (Some(only), None) => unsafe {
Internal::new(only.clone()).to_inner()
},
(Some(a), Some(b)) => {
let mut a = a.clone();
// This could be made more efficient by iterating in the appropriate order with an early exit, but would requiring ordering the other heap
for entry in b.heap.iter() {
a.new_entry(*entry);
unsafe {
in_aggregate_context(fcinfo, || {
match (first, second) {
(None, None) => None,
(None, Some(only)) | (Some(only), None) => Internal::new(only.clone()).to_inner(),
(Some(a), Some(b)) => {
let mut a = a.clone();
// This could be made more efficient by iterating in the appropriate order with an early exit, but would requiring ordering the other heap
for entry in b.heap.iter() {
a.new_entry(*entry);
}
Internal::new(a).to_inner()
}
}
unsafe { Internal::new(a).to_inner() }
}
})
}
}

Expand Down
14 changes: 10 additions & 4 deletions extension/src/nmost/max_float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,16 @@ pub fn max_n_float_rollup_trans(
}

#[pg_extern(immutable, parallel_safe)]
pub fn max_n_float_combine(state1: Internal, state2: Internal) -> Option<Internal> {
nmost_trans_combine(unsafe { state1.to_inner::<MaxFloatTransType>() }, unsafe {
state2.to_inner::<MaxFloatTransType>()
})
pub fn max_n_float_combine(
state1: Internal,
state2: Internal,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Internal> {
nmost_trans_combine(
unsafe { state1.to_inner::<MaxFloatTransType>() },
unsafe { state2.to_inner::<MaxFloatTransType>() },
fcinfo,
)
.internal()
}

Expand Down
14 changes: 10 additions & 4 deletions extension/src/nmost/max_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,16 @@ pub fn max_n_int_rollup_trans(
}

#[pg_extern(immutable, parallel_safe)]
pub fn max_n_int_combine(state1: Internal, state2: Internal) -> Option<Internal> {
nmost_trans_combine(unsafe { state1.to_inner::<MaxIntTransType>() }, unsafe {
state2.to_inner::<MaxIntTransType>()
})
pub fn max_n_int_combine(
state1: Internal,
state2: Internal,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Internal> {
nmost_trans_combine(
unsafe { state1.to_inner::<MaxIntTransType>() },
unsafe { state2.to_inner::<MaxIntTransType>() },
fcinfo,
)
.internal()
}

Expand Down
14 changes: 10 additions & 4 deletions extension/src/nmost/max_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,16 @@ pub fn max_n_time_rollup_trans(
}

#[pg_extern(immutable, parallel_safe)]
pub fn max_n_time_combine(state1: Internal, state2: Internal) -> Option<Internal> {
nmost_trans_combine(unsafe { state1.to_inner::<MaxTimeTransType>() }, unsafe {
state2.to_inner::<MaxTimeTransType>()
})
pub fn max_n_time_combine(
state1: Internal,
state2: Internal,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Internal> {
nmost_trans_combine(
unsafe { state1.to_inner::<MaxTimeTransType>() },
unsafe { state2.to_inner::<MaxTimeTransType>() },
fcinfo,
)
.internal()
}

Expand Down
14 changes: 10 additions & 4 deletions extension/src/nmost/min_float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,16 @@ pub fn min_n_float_rollup_trans(
}

#[pg_extern(immutable, parallel_safe)]
pub fn min_n_float_combine(state1: Internal, state2: Internal) -> Option<Internal> {
nmost_trans_combine(unsafe { state1.to_inner::<MinFloatTransType>() }, unsafe {
state2.to_inner::<MinFloatTransType>()
})
pub fn min_n_float_combine(
state1: Internal,
state2: Internal,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Internal> {
nmost_trans_combine(
unsafe { state1.to_inner::<MinFloatTransType>() },
unsafe { state2.to_inner::<MinFloatTransType>() },
fcinfo,
)
.internal()
}

Expand Down
14 changes: 10 additions & 4 deletions extension/src/nmost/min_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,16 @@ pub fn min_n_int_rollup_trans(
}

#[pg_extern(immutable, parallel_safe)]
pub fn min_n_int_combine(state1: Internal, state2: Internal) -> Option<Internal> {
nmost_trans_combine(unsafe { state1.to_inner::<MinIntTransType>() }, unsafe {
state2.to_inner::<MinIntTransType>()
})
pub fn min_n_int_combine(
state1: Internal,
state2: Internal,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Internal> {
nmost_trans_combine(
unsafe { state1.to_inner::<MinIntTransType>() },
unsafe { state2.to_inner::<MinIntTransType>() },
fcinfo,
)
.internal()
}

Expand Down
14 changes: 10 additions & 4 deletions extension/src/nmost/min_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,16 @@ pub fn min_n_time_rollup_trans(
}

#[pg_extern(immutable, parallel_safe)]
pub fn min_n_time_combine(state1: Internal, state2: Internal) -> Option<Internal> {
nmost_trans_combine(unsafe { state1.to_inner::<MinTimeTransType>() }, unsafe {
state2.to_inner::<MinTimeTransType>()
})
pub fn min_n_time_combine(
state1: Internal,
state2: Internal,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Internal> {
nmost_trans_combine(
unsafe { state1.to_inner::<MinTimeTransType>() },
unsafe { state2.to_inner::<MinTimeTransType>() },
fcinfo,
)
.internal()
}

Expand Down

0 comments on commit 04e5a41

Please sign in to comment.