Skip to content

Commit

Permalink
fix: optimize insertions in building when lists = 1 (#106)
Browse files Browse the repository at this point in the history
Signed-off-by: usamoi <usamoi@outlook.com>
Co-authored-by: Keming <kemingy94@gmail.com>
  • Loading branch information
usamoi and kemingy authored Nov 21, 2024
1 parent 9833035 commit c4ac1c0
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 92 deletions.
22 changes: 11 additions & 11 deletions src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const _: () = assert!(align_of::<Page>() == pgrx::pg_sys::MAXIMUM_ALIGNOF as usi
const _: () = assert!(size_of::<Page>() == pgrx::pg_sys::BLCKSZ as usize);

impl Page {
pub fn init_mut(this: &mut MaybeUninit<Self>, tracking_freespace: bool) -> &mut Self {
pub fn init_mut(this: &mut MaybeUninit<Self>) -> &mut Self {
unsafe {
pgrx::pg_sys::PageInit(
this.as_mut_ptr() as pgrx::pg_sys::Page,
Expand All @@ -33,8 +33,7 @@ impl Page {
);
(&raw mut (*this.as_mut_ptr()).opaque).write(Opaque {
next: u32::MAX,
tracking_freespace,
fast_forward: u32::MAX,
skip: u32::MAX,
});
}
let this = unsafe { MaybeUninit::assume_init_mut(this) };
Expand All @@ -59,7 +58,7 @@ impl Page {
assert!(self.header.pd_upper as usize <= size_of::<Self>());
let lower = self.header.pd_lower as usize;
let upper = self.header.pd_upper as usize;
assert!(lower < upper);
assert!(lower <= upper);
((lower - offset_of!(PageHeaderData, pd_linp)) / size_of::<ItemIdData>()) as u16
}
pub fn get(&self, i: u16) -> Option<&[u8]> {
Expand Down Expand Up @@ -159,8 +158,7 @@ impl Page {
#[repr(C, align(8))]
pub struct Opaque {
pub next: u32,
pub tracking_freespace: bool,
pub fast_forward: u32,
pub skip: u32,
}

const _: () = assert!(align_of::<Opaque>() == pgrx::pg_sys::MAXIMUM_ALIGNOF as usize);
Expand Down Expand Up @@ -195,6 +193,7 @@ pub struct BufferWriteGuard {
page: NonNull<Page>,
state: *mut pgrx::pg_sys::GenericXLogState,
id: u32,
tracking_freespace: bool,
}

impl BufferWriteGuard {
Expand All @@ -215,7 +214,7 @@ impl Drop for BufferWriteGuard {
if std::thread::panicking() {
pgrx::pg_sys::GenericXLogAbort(self.state);
} else {
if self.get().get_opaque().tracking_freespace {
if self.tracking_freespace {
pgrx::pg_sys::RecordPageWithFreeSpace(
self.raw,
self.id,
Expand Down Expand Up @@ -257,7 +256,7 @@ impl Relation {
BufferReadGuard { buf, page, id }
}
}
pub fn write(&self, id: u32) -> BufferWriteGuard {
pub fn write(&self, id: u32, tracking_freespace: bool) -> BufferWriteGuard {
assert!(id != u32::MAX, "no such page");
unsafe {
use pgrx::pg_sys::{
Expand All @@ -284,6 +283,7 @@ impl Relation {
page: page.cast(),
state,
id,
tracking_freespace,
}
}
}
Expand All @@ -310,13 +310,14 @@ impl Relation {
.cast::<MaybeUninit<Page>>(),
)
.expect("failed to get page");
Page::init_mut(page.as_mut(), tracking_freespace);
Page::init_mut(page.as_mut());
BufferWriteGuard {
raw: self.raw,
buf,
page: page.cast(),
state,
id: pgrx::pg_sys::BufferGetBlockNumber(buf),
tracking_freespace,
}
}
}
Expand All @@ -327,8 +328,7 @@ impl Relation {
if id == u32::MAX {
return None;
}
let write = self.write(id);
assert!(write.get().get_opaque().tracking_freespace);
let write = self.write(id, true);
if write.get().freespace() < freespace as _ {
// the free space is recorded incorrectly
pgrx::pg_sys::RecordPageWithFreeSpace(
Expand Down
8 changes: 2 additions & 6 deletions src/vchordrq/algorithm/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ pub fn build<T: HeapRelation, R: Reporter>(
};
let mut meta = Tape::create(&relation, false);
assert_eq!(meta.first(), 0);
let mut forwards = Tape::<std::convert::Infallible>::create(&relation, false);
assert_eq!(forwards.first(), 1);
let mut vectors = Tape::create(&relation, true);
assert_eq!(vectors.first(), 2);
let mut pointer_of_means = Vec::<Vec<(u32, u16)>>::new();
for i in 0..structures.len() {
let mut level = Vec::new();
Expand Down Expand Up @@ -125,13 +122,11 @@ pub fn build<T: HeapRelation, R: Reporter>(
}
pointer_of_firsts.push(level);
}
forwards.head.get_mut().get_opaque_mut().fast_forward = vectors.first();
meta.push(&MetaTuple {
dims,
height_of_root: structures.len() as u32,
is_residual,
vectors_first: vectors.first(),
forwards_first: forwards.first(),
mean: pointer_of_means.last().unwrap()[0],
first: pointer_of_firsts.last().unwrap()[0],
});
Expand Down Expand Up @@ -336,7 +331,8 @@ struct Tape<'a, T> {

impl<'a, T> Tape<'a, T> {
fn create(relation: &'a Relation, tracking_freespace: bool) -> Self {
let head = relation.extend(tracking_freespace);
let mut head = relation.extend(tracking_freespace);
head.get_mut().get_opaque_mut().skip = head.id();
let first = head.id();
Self {
relation,
Expand Down
126 changes: 61 additions & 65 deletions src/vchordrq/algorithm/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ use base::search::Pointer;
use std::cmp::Reverse;
use std::collections::BinaryHeap;

pub fn insert(relation: Relation, payload: Pointer, vector: Vec<f32>, distance_kind: DistanceKind) {
pub fn insert(
relation: Relation,
payload: Pointer,
vector: Vec<f32>,
distance_kind: DistanceKind,
in_building: bool,
) {
let meta_guard = relation.read(0);
let meta_tuple = meta_guard
.get()
Expand All @@ -38,50 +44,14 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec<f32>, distance_k
chain,
})
.unwrap();
chain = Some('chain: {
if let Some(mut write) = relation.search(tuple.len()) {
let i = write.get_mut().alloc(&tuple).unwrap();
break 'chain (write.id(), i);
}
let mut current = relation.read(1).get().get_opaque().fast_forward;
let mut changed = false;
loop {
let read = relation.read(current);
let flag = 'flag: {
if read.get().freespace() as usize >= tuple.len() {
break 'flag true;
}
if read.get().get_opaque().next == u32::MAX {
break 'flag true;
}
false
};
if flag {
drop(read);
let mut write = relation.write(current);
if let Some(i) = write.get_mut().alloc(&tuple) {
break 'chain (current, i);
}
if write.get().get_opaque().next == u32::MAX {
if changed {
relation.write(1).get_mut().get_opaque_mut().fast_forward =
write.id();
}
let mut extend = relation.extend(true);
write.get_mut().get_opaque_mut().next = extend.id();
if let Some(i) = extend.get_mut().alloc(&tuple) {
break 'chain (extend.id(), i);
} else {
panic!("a tuple cannot even be fit in a fresh page");
}
}
current = write.get().get_opaque().next;
} else {
current = read.get().get_opaque().next;
}
changed = true;
}
});
chain = Some(append(
relation.clone(),
meta_tuple.vectors_first,
&tuple,
true,
true,
true,
));
}
chain.unwrap()
};
Expand Down Expand Up @@ -168,7 +138,7 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec<f32>, distance_k
} else {
rabitq::code(dims, &vector)
};
let h0_tuple = rkyv::to_bytes::<_, 8192>(&Height0Tuple {
let tuple = rkyv::to_bytes::<_, 8192>(&Height0Tuple {
mean: h0_vector,
payload: h0_payload,
dis_u_2: code.dis_u_2,
Expand All @@ -178,38 +148,64 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec<f32>, distance_k
t: code.t(),
})
.unwrap();
let first = list.0;
append(relation, list.0, &tuple, false, in_building, in_building);
}

fn append(
relation: Relation,
first: u32,
tuple: &[u8],
tracking_freespace: bool,
skipping_traversal: bool,
updating_skip: bool,
) -> (u32, u16) {
if tracking_freespace {
if let Some(mut write) = relation.search(tuple.len()) {
let i = write.get_mut().alloc(tuple).unwrap();
return (write.id(), i);
}
}
assert!(first != u32::MAX);
let mut current = first;
loop {
let read = relation.read(current);
let flag = 'flag: {
if read.get().freespace() as usize >= h0_tuple.len() {
break 'flag true;
}
if read.get().get_opaque().next == u32::MAX {
break 'flag true;
}
false
};
if flag {
if read.get().freespace() as usize >= tuple.len()
|| read.get().get_opaque().next == u32::MAX
{
drop(read);
let mut write = relation.write(current);
if write.get_mut().alloc(&h0_tuple).is_some() {
return;
let mut write = relation.write(current, tracking_freespace);
if let Some(i) = write.get_mut().alloc(tuple) {
return (current, i);
}
if write.get().get_opaque().next == u32::MAX {
let mut extend = relation.extend(false);
let mut extend = relation.extend(tracking_freespace);
write.get_mut().get_opaque_mut().next = extend.id();
if extend.get_mut().alloc(&h0_tuple).is_some() {
return;
drop(write);
if let Some(i) = extend.get_mut().alloc(tuple) {
let result = (extend.id(), i);
drop(extend);
if updating_skip {
let mut past = relation.write(first, tracking_freespace);
let skip = &mut past.get_mut().get_opaque_mut().skip;
assert!(*skip != u32::MAX);
*skip = std::cmp::max(*skip, result.0);
}
return result;
} else {
panic!("a tuple cannot even be fit in a fresh page");
}
}
current = write.get().get_opaque().next;
if skipping_traversal && current == first && write.get().get_opaque().skip != first {
current = write.get().get_opaque().skip;
} else {
current = write.get().get_opaque().next;
}
} else {
current = read.get().get_opaque().next;
if skipping_traversal && current == first && read.get().get_opaque().skip != first {
current = read.get().get_opaque().skip;
} else {
current = read.get().get_opaque().next;
}
}
}
}
1 change: 0 additions & 1 deletion src/vchordrq/algorithm/tuples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ pub struct MetaTuple {
pub height_of_root: u32,
pub is_residual: bool,
pub vectors_first: u32,
pub forwards_first: u32,
// raw vector
pub mean: (u32, u16),
// for meta tuple, it's pointers to next level
Expand Down
4 changes: 2 additions & 2 deletions src/vchordrq/algorithm/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) -
let mut current = first;
while current != u32::MAX {
delay();
let mut h0_guard = relation.write(current);
let mut h0_guard = relation.write(current, false);
let mut reconstruct_removes = Vec::new();
for i in 1..=h0_guard.get().len() {
let h0_tuple = h0_guard
Expand Down Expand Up @@ -90,7 +90,7 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) -
};
if flag {
drop(read);
let mut write = relation.write(current);
let mut write = relation.write(current, true);
for i in 1..=write.get().len() {
let Some(vector_tuple) = write.get().get(i) else {
continue;
Expand Down
3 changes: 3 additions & 0 deletions src/vchordrq/index/am.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ pub unsafe extern "C" fn ambuild(
payload,
vector,
opfamily.distance_kind(),
true,
);
indtuples += 1;
reporter.tuples_done(indtuples);
Expand Down Expand Up @@ -617,6 +618,7 @@ unsafe fn parallel_build(
payload,
vector,
opfamily.distance_kind(),
true,
);
unsafe {
let indtuples;
Expand Down Expand Up @@ -705,6 +707,7 @@ pub unsafe extern "C" fn aminsert(
pointer,
vector.into_vec(),
opfamily.distance_kind(),
false,
);
}
false
Expand Down
2 changes: 1 addition & 1 deletion src/vchordrqfscan/algorithm/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub fn build<T: HeapRelation, R: Reporter>(
}
pointer_of_firsts.push(level);
}
forwards.head.get_mut().get_opaque_mut().fast_forward = vectors.first();
forwards.head.get_mut().get_opaque_mut().skip = vectors.first();
meta.push(&MetaTuple {
dims,
height_of_root: structures.len() as u32,
Expand Down
Loading

0 comments on commit c4ac1c0

Please sign in to comment.