Skip to content

RUST-1384 Move all Ctx actions to a dedicated worker thread #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
re-impl progress
  • Loading branch information
abr-egn committed Aug 1, 2022
commit 8d1909b4f36606b031e9060c67c85b6de7c552d6
109 changes: 59 additions & 50 deletions mongocrypt/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,22 +361,20 @@ impl Algorithm {
}

pub struct Ctx {
handle: JoinHandle<()>,
handle: Option<JoinHandle<()>>,
send: Sender<CtxAction>,
// TODO: impl drop
}

impl HasStatus for Ctx {
unsafe fn native_status(&self, status: *mut sys::mongocrypt_status_t) {
let _ = self.try_native_status(status);
}

fn error(&self) -> Error {
let out = Status::new();
if let Err(e) = self.try_native_status(*out.native()) {
return e;
impl Drop for Ctx {
fn drop(&mut self) {
let (send, recv) = oneshot();
if self.send.send(CtxAction::Done(send)).is_ok() {
let _ = recv.recv();
}
if let Some(handle) = self.handle.take() {
handle.join();
}
out.as_error()
}
}

Expand All @@ -389,13 +387,12 @@ impl Ctx {
let handle = thread::spawn(move || {
Self::worker(crypt_ptr, recv)
});
let ctx = Ctx { handle, send };
let init = ctx.run(|ctx_ptr| {
let builder = CtxBuilder::borrow(ctx_ptr);
let ctx = Ctx { handle: Some(handle), send };
ctx.run(|local| {
let builder = CtxBuilder::borrow(local.0);
f(builder)
});
//let builder = CtxBuilder::steal(unsafe { sys::mongocrypt_ctx_new(crypt.0) });
todo!()
})??;
Ok(ctx)
}

fn worker(crypt: AssertSendPtr, actions: Receiver<CtxAction>) {
Expand All @@ -405,58 +402,63 @@ impl Ctx {
);
while let Ok(action) = actions.recv() {
match action {
CtxAction::Done => return,
CtxAction::Done(send) => {
let _ = send.send(());
return;
}
CtxAction::Fn(f) => f(*ctx.borrow()),
}
}
}

fn run<T: 'static + Send>(&self, f: impl FnOnce(*mut sys::mongocrypt_ctx_t) -> T + Send + 'static) -> Result<T> {
fn run<T: 'static + Send>(&self, f: impl FnOnce(LocalCtx) -> T + Send + 'static) -> Result<T> {
let (send, recv) = oneshot();
self.send.send(CtxAction::Fn(Box::new(|ctx_ptr| {
// If the receiver is closed, what happens here doesn't matter.
let _ = send.send(f(ctx_ptr));
let _ = send.send(f(LocalCtx(ctx_ptr)));
}))).map_err(thread_err)?;
recv.recv().map_err(thread_err)
}

fn try_native_status(&self, status: *mut sys::mongocrypt_status_t) -> Result<()> {
let status = AssertSendPtr::new(status);
self.run(move |ctx_ptr| {
unsafe { sys::mongocrypt_ctx_status(ctx_ptr, status.get()); }
})
fn run_err(&self, f: impl FnOnce(&LocalCtx) -> bool + Send + 'static) -> Result<()> {
self.run(move |local| {
if !f(&local) {
return Err(local.error());
}
Ok(())
})?
}

/// Get the current state of a context.
pub fn state(&self) -> Result<State> {
let s = self.run(|ctx_ptr| {
unsafe { sys::mongocrypt_ctx_state(ctx_ptr) }
})?;
if s == sys::mongocrypt_ctx_state_t_MONGOCRYPT_CTX_ERROR {
return Err(self.error());
}
Ok(State::from_native(s))
self.run(|local| {
let s = unsafe { sys::mongocrypt_ctx_state(local.0) };
if s == sys::mongocrypt_ctx_state_t_MONGOCRYPT_CTX_ERROR {
return Err(local.error());
}
Ok(State::from_native(s))
})?
}

/*
/// Get BSON necessary to run the mongo operation when in `State::NeedMongo*` states.
///
/// The returned value:
/// * for `State::NeedMongoCollinfo` it is a listCollections filter.
/// * for `State::NeedMongoKeys` it is a find filter.
/// * for `State::NeedMongoMarkings` it is a command to send to mongocryptd.
pub fn mongo_op(&self) -> Result<&RawDocument> {
// Safety: `mongocrypt_ctx_mongo_op` updates the passed-in `Binary` to point to a chunk of
// BSON with the same lifetime as the underlying `Ctx`. The `Binary` itself does not own
// the memory, and gets cleaned up at the end of the unsafe block. Lifetime inference on
// the return type binds `op_bytes` to the same lifetime as `&self`, which is the correct
// one.
let op_bytes = unsafe {
let op_bytes = {
let bin = Binary::new();
if !sys::mongocrypt_ctx_mongo_op(*self.inner.borrow(), *bin.native()) {
return Err(self.error());
}
bin.bytes()?
let bin_ptr = AssertSendPtr::new(*bin.native());
self.run_err(move |local| {
unsafe { sys::mongocrypt_ctx_mongo_op(local.0, bin_ptr.get()) }
})?;
// Safety: `mongocrypt_ctx_mongo_op` updates the passed-in `Binary` to point to a chunk of
// BSON with the same lifetime as the underlying `Ctx`. The `Binary` itself does not own
// the memory, and gets cleaned up at the end of the unsafe block. Lifetime inference on
// the return type binds `op_bytes` to the same lifetime as `&self`, which is the correct
// one.
unsafe { bin.bytes()? }
};
rawdoc_view(op_bytes)
}
Expand All @@ -474,14 +476,13 @@ impl Ctx {
/// - For `State::NeedMongoMarkings` it is a reply from mongocryptd.
pub fn mongo_feed(&mut self, reply: &RawDocument) -> Result<()> {
let bin = BinaryRef::new(reply.as_bytes());
unsafe {
if !sys::mongocrypt_ctx_mongo_feed(*self.inner.borrow(), *bin.native()) {
return Err(self.error());
}
}
Ok(())
let bin_ptr = AssertSendPtr::new(unsafe { *bin.native() });
self.run_err(move |local| {
unsafe { sys::mongocrypt_ctx_mongo_feed(local.0, bin_ptr.get()) }
})
}

/*
/// Call when done feeding the reply (or replies) back to the context.
pub fn mongo_done(&mut self) -> Result<()> {
unsafe {
Expand Down Expand Up @@ -549,6 +550,14 @@ impl Ctx {
*/
}

struct LocalCtx(*mut sys::mongocrypt_ctx_t);

impl HasStatus for LocalCtx {
unsafe fn native_status(&self, status: *mut sys::mongocrypt_status_t) {
unsafe { sys::mongocrypt_ctx_status(self.0, status); }
}
}

struct AssertSendPtr(*mut c_void);

impl AssertSendPtr {
Expand All @@ -565,7 +574,7 @@ unsafe impl Send for AssertSendPtr { }

enum CtxAction {
Fn(Box<dyn FnOnce(*mut sys::mongocrypt_ctx_t) + Send>),
Done,
Done(OneshotSender<()>),
}

struct OneshotSender<T>(mpsc::SyncSender<T>);
Expand Down