Skip to content

Commit

Permalink
fix: record indtuples in multiprogress environment (tensorchord#54)
Browse files Browse the repository at this point in the history
Signed-off-by: usamoi <usamoi@outlook.com>
  • Loading branch information
usamoi authored Nov 8, 2024
1 parent b30d85c commit 82a3d3f
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 31 deletions.
5 changes: 2 additions & 3 deletions src/algorithm/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ pub trait HeapRelation {
}

pub trait Reporter {
fn tuples_total(&mut self, tuples_total: usize);
fn tuples_done(&mut self, tuples_done: usize);
fn tuples_total(&mut self, tuples_total: u64);
}

pub fn build<T: HeapRelation, R: Reporter>(
Expand All @@ -46,7 +45,7 @@ pub fn build<T: HeapRelation, R: Reporter>(
external_build.clone(),
),
RabbitholeBuildOptions::Internal(internal_build) => {
let mut tuples_total = 0_usize;
let mut tuples_total = 0_u64;
let samples = {
let mut rand = rand::thread_rng();
let max_number_of_samples = internal_build.nlist.saturating_mul(256);
Expand Down
75 changes: 47 additions & 28 deletions src/index/am.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,31 @@ pub unsafe extern "C" fn amcostestimate(
}
}

#[derive(Debug, Clone)]
struct PgReporter {}

impl Reporter for PgReporter {
fn tuples_total(&mut self, tuples_total: u64) {
unsafe {
pgrx::pg_sys::pgstat_progress_update_param(
pgrx::pg_sys::PROGRESS_CREATEIDX_TUPLES_TOTAL as _,
tuples_total as _,
);
}
}
}

impl PgReporter {
fn tuples_done(&mut self, tuples_done: u64) {
unsafe {
pgrx::pg_sys::pgstat_progress_update_param(
pgrx::pg_sys::PROGRESS_CREATEIDX_TUPLES_DONE as _,
tuples_done as _,
);
}
}
}

#[pgrx::pg_guard]
pub unsafe extern "C" fn ambuild(
heap: pgrx::pg_sys::Relation,
Expand Down Expand Up @@ -205,26 +230,6 @@ pub unsafe extern "C" fn ambuild(
self.opfamily
}
}
#[derive(Debug, Clone)]
pub struct PgReporter {}
impl Reporter for PgReporter {
fn tuples_total(&mut self, tuples_total: usize) {
unsafe {
pgrx::pg_sys::pgstat_progress_update_param(
pgrx::pg_sys::PROGRESS_CREATEIDX_TUPLES_TOTAL as _,
tuples_total as _,
);
}
}
fn tuples_done(&mut self, tuples_done: usize) {
unsafe {
pgrx::pg_sys::pgstat_progress_update_param(
pgrx::pg_sys::PROGRESS_CREATEIDX_TUPLES_DONE as _,
tuples_done as _,
);
}
}
}
let (vector_options, rabbithole_options) = unsafe { am_options::options(index) };
let opfamily = unsafe { am_options::opfamily(index) };
let heap_relation = Heap {
Expand Down Expand Up @@ -252,7 +257,7 @@ pub unsafe extern "C" fn ambuild(
index_info,
leader.tablescandesc,
leader.rabbitholeshared,
true,
Some(reporter),
);
leader.wait();
let nparticipants = leader.nparticipants;
Expand All @@ -271,17 +276,17 @@ pub unsafe extern "C" fn ambuild(
pgrx::pg_sys::ConditionVariableCancelSleep();
}
} else {
let mut tuples_done = 0;
reporter.tuples_done(tuples_done);
let mut indtuples = 0;
reporter.tuples_done(indtuples);
heap_relation.traverse(true, |(payload, vector)| {
algorithm::insert::insert(
index_relation.clone(),
payload,
vector,
opfamily.distance_kind(),
);
tuples_done += 1;
reporter.tuples_done(tuples_done);
indtuples += 1;
reporter.tuples_done(indtuples);
});
}
unsafe { pgrx::pgbox::PgBox::<pgrx::pg_sys::IndexBuildResult>::alloc0().into_pg() }
Expand All @@ -301,6 +306,7 @@ struct RabbitholeShared {

/* Mutable state */
nparticipantsdone: i32,
indtuples: u64,
}

fn is_mvcc_snapshot(snapshot: *mut pgrx::pg_sys::SnapshotData) -> bool {
Expand Down Expand Up @@ -405,6 +411,7 @@ impl RabbitholeLeader {
workersdonecv: std::mem::zeroed(),
mutex: std::mem::zeroed(),
nparticipantsdone: 0,
indtuples: 0,
});
pgrx::pg_sys::ConditionVariableInit(&raw mut (*rabbitholeshared).workersdonecv);
pgrx::pg_sys::SpinLockInit(&raw mut (*rabbitholeshared).mutex);
Expand Down Expand Up @@ -508,7 +515,7 @@ pub unsafe extern "C" fn rabbithole_parallel_build_main(
index_info,
tablescandesc,
rabbitholeshared,
false,
None,
);
}

Expand All @@ -524,7 +531,7 @@ unsafe fn parallel_build(
index_info: *mut pgrx::pg_sys::IndexInfo,
tablescandesc: *mut pgrx::pg_sys::ParallelTableScanDescData,
rabbitholeshared: *mut RabbitholeShared,
progress: bool,
mut reporter: Option<PgReporter>,
) {
#[derive(Debug, Clone)]
pub struct Heap {
Expand Down Expand Up @@ -606,13 +613,25 @@ unsafe fn parallel_build(
opfamily,
scan,
};
heap_relation.traverse(progress, |(payload, vector)| {
heap_relation.traverse(reporter.is_some(), |(payload, vector)| {
algorithm::insert::insert(
index_relation.clone(),
payload,
vector,
opfamily.distance_kind(),
);
unsafe {
let indtuples;
{
pgrx::pg_sys::SpinLockAcquire(&raw mut (*rabbitholeshared).mutex);
(*rabbitholeshared).indtuples += 1;
indtuples = (*rabbitholeshared).indtuples;
pgrx::pg_sys::SpinLockRelease(&raw mut (*rabbitholeshared).mutex);
}
if let Some(reporter) = reporter.as_mut() {
reporter.tuples_done(indtuples);
}
}
});

unsafe {
Expand Down

0 comments on commit 82a3d3f

Please sign in to comment.