Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace the half-lock with helping strategy #50

Merged
merged 19 commits into from
Jan 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Helping: Remove alignment reqs on T::Base
The pointer is passed through an indirection "box" thing and these are
forced to be aligned.
  • Loading branch information
vorner committed Jan 1, 2021
commit b504c8d760d0d6f3cb972bf2de52dc47fb79d47d
187 changes: 134 additions & 53 deletions src/debt/helping.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
//! Slots and global/thread local data for the Helping strategy.

use std::cell::Cell;
use std::sync::atomic::AtomicUsize;
use std::ptr;
use std::sync::atomic::Ordering::*;
use std::sync::atomic::{AtomicPtr, AtomicUsize};

use super::Debt;
use crate::RefCnt;

pub(crate) const REPLACEMENT_TAG: usize = 0b01;
pub(crate) const GEN_TAG: usize = 0b10;
pub(crate) const TAG_MASK: usize = 0b11;
pub(crate) const IDLE: usize = 0;

/// Thread local data for the helping strategy.
#[derive(Default)]
Expand All @@ -18,11 +20,35 @@ pub(super) struct Local {
generation: Cell<usize>,
}

/// The slots for the helping strategy.
// Make sure the pointers have 2 empty bits. Always.
#[derive(Default)]
#[repr(align(4))]
struct Handover(AtomicUsize);

/// The slots for the helping strategy.
pub(super) struct Slots {
control: AtomicUsize,
slot: Debt,
active_addr: AtomicUsize,
handover: Handover,
space_offer: AtomicPtr<Handover>,
}

impl Default for Slots {
fn default() -> Self {
Slots {
control: AtomicUsize::new(IDLE),
slot: Debt::default(),
// Doesn't matter yet
active_addr: AtomicUsize::new(0),
// Also doesn't matter
handover: Handover::default(),
// Here we would like it to point to our handover. But for that we need to be in place
// in RAM (effectively pinned, though we use older Rust than Pin, possibly?), so not
// yet. See init().
space_offer: AtomicPtr::new(ptr::null_mut()),
}
}
}

impl Slots {
Expand All @@ -37,71 +63,126 @@ impl Slots {
local.generation.set(gen);
let discard = gen == 0;
let gen = gen | GEN_TAG;
// We will sync by the write to the debt. But we also sync the value of the previous
// We will sync by the write to the control. But we also sync the value of the previous
// generation/released slot. That way we may re-confirm in the writer that the reader is
// not in between here and the compare_exchange below with a stale gen (eg. if we are in
// here, the re-confirm there will load the NO_DEPT and we are fine).
self.active_addr.store(ptr, Release);

// We could do load and store separately as we are the only ones allowed to
// overwrite a NO_DEPT, but we actually need the SeqCst to be a read-write
// operation in here (we need both the release and acquire part of it).
let ok = self
.slot
.0
.compare_exchange(Debt::NONE, gen, SeqCst, Relaxed)
.is_ok();
assert!(ok, "Run out of slots");
// We are the only ones allowed to do the IDLE -> * transition and we never leave it in
// anything else after an transaction, so this is OK. But we still need a load-store SeqCst
// operation here :-(.
let prev = self.control.swap(gen, SeqCst);
debug_assert_eq!(IDLE, prev, "Left control in wrong state");

(gen, discard)
}

pub(super) fn help<R, T>(&self, gen: usize, storage_addr: usize, replacement: &R) -> bool
pub(super) fn help<R, T>(&self, who: &Self, storage_addr: usize, replacement: &R)
where
T: RefCnt,
R: Fn() -> T,
{
// The reader is trying to claim the slot right now. Let's have a look at the address where
// the data should come from and help the reader out.
let active_addr = self.active_addr.load(Acquire);
if active_addr != storage_addr {
// Re-confirm the gen matches. That way with the above active_addr
// load and Acquire we make sure the active_addr is not newer than
// the gen and therefore we are not missing a place where we need
// to help (eg. that Acquire makes sure the gen catches up with
// it).

// -> The same means it really is bothering some other ArcSwap than the one we are
// interested in, which means it is solved.
//
// -> different: Something changed under our hands and we are not sure what happened.
// Not solved, retry the whole thing.
return self.slot.0.load(Relaxed) == gen;
debug_assert_eq!(IDLE, self.control.load(Relaxed));
let mut control = who.control.load(Acquire);
loop {
match control & TAG_MASK {
// Nothing to help with
IDLE if control == IDLE => break,
// Someone has already helped out with that, so we have nothing to do here
REPLACEMENT_TAG => break,
// Something is going on, let's have a better look.
GEN_TAG => {
debug_assert!(
!ptr::eq(self, who),
"Refusing to help myself, makes no sense"
);
// Get the address that other thread is trying to load from. By that acquire,
// we also sync the control into our thread once more and reconfirm that the
// value of the active_addr is in between two same instances, therefore up to
// date to it.
let active_addr = who.active_addr.load(Acquire);
if active_addr != storage_addr {
let new_control = who.control.load(Acquire);
if new_control == control {
// The other thread is doing something, but to some other ArcSwap, so
// we don't care.
break;
} else {
// The control just changed under our hands, we don't know what to
// trust, so retry.
control = new_control;
continue;
}
}

// Now we know this work is for us. Try to create a replacement and offer it.
// This actually does a full-featured load under the hood, but we are currently
// idle and the load doesn't re-enter write.
let replacement = replacement();
let replace_addr = T::as_ptr(&replacement) as usize;
let their_space = who.space_offer.load(Acquire);
// Relaxed is fine, our own thread and nobody but us writes in here.
let my_space = self.space_offer.load(Relaxed);
// Relaxed is fine, we'll sync by the next compare-exchange. If we don't, the
// value won't ever be read anyway.
unsafe {
(*my_space).0.store(replace_addr, Relaxed);
}
let space_addr = (my_space as usize) | REPLACEMENT_TAG;
match who
.control
.compare_exchange(control, space_addr, AcqRel, Acquire)
{
Ok(_) => {
// We have successfully sent our replacement out (Release) and got
// their space in return (Acquire on that load above).
self.space_offer.store(their_space, Release);
// The ref count went with it, so forget about it here.
T::into_ptr(replacement);
// We have successfully helped out, so we are done.
break;
}
Err(new_control) => {
// Something has changed in between. Let's try again, nothing changed
// (the replacement will get dropped at the end of scope, we didn't do
// anything with the spaces, etc.
control = new_control;
}
}
}
_ => unreachable!("Invalid control value {:X}", control),
}
}
// Get a replacement value and try to donate it.
let replacement = replacement();
let replace_addr = T::as_ptr(&replacement) as usize;
let replace_addr = replace_addr | REPLACEMENT_TAG;
if self
.slot
.0
// Release the value we send there. TODO: Do we need the Acquire
// there?
//
// Relaxed on failure: Basically, nothing happened anywhere, all
// data stayed with us and we are going to retry this loop from
// scratch.
.compare_exchange_weak(gen, replace_addr, AcqRel, Relaxed)
.is_ok()
{
// OK, it went it
T::into_ptr(replacement);
return true;
}

pub(super) fn init(&mut self) {
*self.space_offer.get_mut() = &mut self.handover;
}

pub(super) fn confirm(&self, gen: usize, ptr: usize) -> Result<(), usize> {
// Put the slot there and consider it acquire of a „lock“. For that we need swap, not store
// only.
let prev = self.slot.0.swap(ptr as usize, AcqRel);
debug_assert_eq!(Debt::NONE, prev);

// Confirm by writing to the control (or discover that we got helped). We stop anyone else
// from helping by setting it to IDLE.
let control = self.control.swap(IDLE, AcqRel);
if control == gen {
// Nobody interfered, we have our debt in place and can proceed.
Ok(())
} else {
// Someone put a replacement in there.
debug_assert_eq!(control & TAG_MASK, REPLACEMENT_TAG);
let handover = (control & !TAG_MASK) as *mut Handover;
let replacement = unsafe { &*handover }.0.load(Acquire);
self.space_offer.store(handover, Release);
// Note we've left the debt in place. The caller should pay it back (without ever
// taking advantage of it) to make sure any extra is actually dropped (it is possible
// someone provided the replacement *and* paid the debt and we need just one of them).
Err(replacement)
}
// else -> replacement is dropped.
// Also, loop once more because the current slot did *not* get
// resolved. Retry and see if the reader already got what it wanted or
// try creating a new replacement.
false
}
}

Expand Down
39 changes: 27 additions & 12 deletions src/debt/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ impl Node {
// If that didn't work, create a new one and prepend to the list.
.unwrap_or_else(|| {
let node = Box::leak(Box::new(Node::default()));
// Not shared between threads yet, so ordinary write would be fine too.
node.in_use.store(NODE_USED, Relaxed);
node.helping.init();
// We don't want to read any data in addition to the head, Relaxed is fine
// here.
//
Expand Down Expand Up @@ -174,14 +173,6 @@ impl Node {
pub(crate) fn helping_slot(&self) -> &Debt {
self.helping.slot()
}

pub(super) fn help<R, T>(&self, gen: usize, storage_addr: usize, replacement: &R) -> bool
where
T: RefCnt,
R: Fn() -> T,
{
self.helping.help(gen, storage_addr, replacement)
}
}

/// A wrapper around a node pointer, to un-claim the node on thread shutdown.
Expand Down Expand Up @@ -240,7 +231,7 @@ impl LocalNode {
node.fast.get_debt(ptr, &self.fast)
}

pub(crate) fn new_helping(&self, ptr: usize) -> (&'static Debt, usize) {
pub(crate) fn new_helping(&self, ptr: usize) -> usize {
let node = &self.node.get().expect("LocalNode::with ensures it is set");
assert_eq!(node.in_use.load(Relaxed), NODE_USED);
let (gen, discard) = node.helping.get_debt(ptr, &self.helping);
Expand All @@ -250,7 +241,31 @@ impl LocalNode {
node.start_cooldown();
self.node.take();
}
(node.helping_slot(), gen)
gen
}

pub(crate) fn confirm_helping(
&self,
gen: usize,
ptr: usize,
) -> Result<&'static Debt, (&'static Debt, usize)> {
let node = &self.node.get().expect("LocalNode::with ensures it is set");
let slot = node.helping_slot();
node.helping
.confirm(gen, ptr)
.map(|()| slot)
.map_err(|repl| (slot, repl))
}

// FIXME: Do we need replacement be passed to us? We do pass T and the address... And we could
// pass storage_addr as an actual pointer or ref?
pub(super) fn help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R)
where
T: RefCnt,
R: Fn() -> T,
{
let node = &self.node.get().expect("LocalNode::with ensures it is set");
node.helping.help(&who.helping, storage_addr, replacement)
}
}

Expand Down
Loading