From abff00eaaefcd75344c53fffd09a1423636c695d Mon Sep 17 00:00:00 2001 From: Brian Rowe Date: Thu, 15 Jun 2023 10:58:02 -0700 Subject: [PATCH] Make sure nmost combine uses correct memctx --- Changelog.md | 1 + extension/src/nmost.rs | 27 +++++++++++++++------------ extension/src/nmost/max_float.rs | 14 ++++++++++---- extension/src/nmost/max_int.rs | 14 ++++++++++---- extension/src/nmost/max_time.rs | 14 ++++++++++---- extension/src/nmost/min_float.rs | 14 ++++++++++---- extension/src/nmost/min_int.rs | 14 ++++++++++---- extension/src/nmost/min_time.rs | 14 ++++++++++---- 8 files changed, 76 insertions(+), 36 deletions(-) diff --git a/Changelog.md b/Changelog.md index 417af100..3106cfeb 100644 --- a/Changelog.md +++ b/Changelog.md @@ -9,6 +9,7 @@ This changelog should be updated as part of a PR if the work is worth noting (mo #### New experimental features #### Bug fixes +- [#761](https://github.com/timescale/timescaledb-toolkit/pull/761): Make sure nmost combine uses correct memctx #### Other notable changes diff --git a/extension/src/nmost.rs b/extension/src/nmost.rs index 019c4452..3114cc68 100644 --- a/extension/src/nmost.rs +++ b/extension/src/nmost.rs @@ -119,20 +119,23 @@ fn nmost_rollup_trans_function( fn nmost_trans_combine( first: Option>>, second: Option>>, + fcinfo: pg_sys::FunctionCallInfo, ) -> Option>> { - match (first, second) { - (None, None) => None, - (None, Some(only)) | (Some(only), None) => unsafe { - Internal::new(only.clone()).to_inner() - }, - (Some(a), Some(b)) => { - let mut a = a.clone(); - // This could be made more efficient by iterating in the appropriate order with an early exit, but would requiring ordering the other heap - for entry in b.heap.iter() { - a.new_entry(*entry); + unsafe { + in_aggregate_context(fcinfo, || { + match (first, second) { + (None, None) => None, + (None, Some(only)) | (Some(only), None) => Internal::new(only.clone()).to_inner(), + (Some(a), Some(b)) => { + let mut a = a.clone(); + // This could be made more efficient by iterating in the appropriate order with an early exit, but would requiring ordering the other heap + for entry in b.heap.iter() { + a.new_entry(*entry); + } + Internal::new(a).to_inner() + } } - unsafe { Internal::new(a).to_inner() } - } + }) } } diff --git a/extension/src/nmost/max_float.rs b/extension/src/nmost/max_float.rs index 48ab8e66..90f5ae3e 100644 --- a/extension/src/nmost/max_float.rs +++ b/extension/src/nmost/max_float.rs @@ -82,10 +82,16 @@ pub fn max_n_float_rollup_trans( } #[pg_extern(immutable, parallel_safe)] -pub fn max_n_float_combine(state1: Internal, state2: Internal) -> Option { - nmost_trans_combine(unsafe { state1.to_inner::() }, unsafe { - state2.to_inner::() - }) +pub fn max_n_float_combine( + state1: Internal, + state2: Internal, + fcinfo: pg_sys::FunctionCallInfo, +) -> Option { + nmost_trans_combine( + unsafe { state1.to_inner::() }, + unsafe { state2.to_inner::() }, + fcinfo, + ) .internal() } diff --git a/extension/src/nmost/max_int.rs b/extension/src/nmost/max_int.rs index e23f995f..f7d12d4b 100644 --- a/extension/src/nmost/max_int.rs +++ b/extension/src/nmost/max_int.rs @@ -76,10 +76,16 @@ pub fn max_n_int_rollup_trans( } #[pg_extern(immutable, parallel_safe)] -pub fn max_n_int_combine(state1: Internal, state2: Internal) -> Option { - nmost_trans_combine(unsafe { state1.to_inner::() }, unsafe { - state2.to_inner::() - }) +pub fn max_n_int_combine( + state1: Internal, + state2: Internal, + fcinfo: pg_sys::FunctionCallInfo, +) -> Option { + nmost_trans_combine( + unsafe { state1.to_inner::() }, + unsafe { state2.to_inner::() }, + fcinfo, + ) .internal() } diff --git a/extension/src/nmost/max_time.rs b/extension/src/nmost/max_time.rs index 87a90af5..f43738cc 100644 --- a/extension/src/nmost/max_time.rs +++ b/extension/src/nmost/max_time.rs @@ -77,10 +77,16 @@ pub fn max_n_time_rollup_trans( } #[pg_extern(immutable, parallel_safe)] -pub fn max_n_time_combine(state1: Internal, state2: Internal) -> Option { - nmost_trans_combine(unsafe { state1.to_inner::() }, unsafe { - state2.to_inner::() - }) +pub fn max_n_time_combine( + state1: Internal, + state2: Internal, + fcinfo: pg_sys::FunctionCallInfo, +) -> Option { + nmost_trans_combine( + unsafe { state1.to_inner::() }, + unsafe { state2.to_inner::() }, + fcinfo, + ) .internal() } diff --git a/extension/src/nmost/min_float.rs b/extension/src/nmost/min_float.rs index 8a274f06..473de8ca 100644 --- a/extension/src/nmost/min_float.rs +++ b/extension/src/nmost/min_float.rs @@ -81,10 +81,16 @@ pub fn min_n_float_rollup_trans( } #[pg_extern(immutable, parallel_safe)] -pub fn min_n_float_combine(state1: Internal, state2: Internal) -> Option { - nmost_trans_combine(unsafe { state1.to_inner::() }, unsafe { - state2.to_inner::() - }) +pub fn min_n_float_combine( + state1: Internal, + state2: Internal, + fcinfo: pg_sys::FunctionCallInfo, +) -> Option { + nmost_trans_combine( + unsafe { state1.to_inner::() }, + unsafe { state2.to_inner::() }, + fcinfo, + ) .internal() } diff --git a/extension/src/nmost/min_int.rs b/extension/src/nmost/min_int.rs index e50783f9..11e8ec26 100644 --- a/extension/src/nmost/min_int.rs +++ b/extension/src/nmost/min_int.rs @@ -68,10 +68,16 @@ pub fn min_n_int_rollup_trans( } #[pg_extern(immutable, parallel_safe)] -pub fn min_n_int_combine(state1: Internal, state2: Internal) -> Option { - nmost_trans_combine(unsafe { state1.to_inner::() }, unsafe { - state2.to_inner::() - }) +pub fn min_n_int_combine( + state1: Internal, + state2: Internal, + fcinfo: pg_sys::FunctionCallInfo, +) -> Option { + nmost_trans_combine( + unsafe { state1.to_inner::() }, + unsafe { state2.to_inner::() }, + fcinfo, + ) .internal() } diff --git a/extension/src/nmost/min_time.rs b/extension/src/nmost/min_time.rs index c924d57f..57423ecc 100644 --- a/extension/src/nmost/min_time.rs +++ b/extension/src/nmost/min_time.rs @@ -68,10 +68,16 @@ pub fn min_n_time_rollup_trans( } #[pg_extern(immutable, parallel_safe)] -pub fn min_n_time_combine(state1: Internal, state2: Internal) -> Option { - nmost_trans_combine(unsafe { state1.to_inner::() }, unsafe { - state2.to_inner::() - }) +pub fn min_n_time_combine( + state1: Internal, + state2: Internal, + fcinfo: pg_sys::FunctionCallInfo, +) -> Option { + nmost_trans_combine( + unsafe { state1.to_inner::() }, + unsafe { state2.to_inner::() }, + fcinfo, + ) .internal() }