Skip to content

Commit

Permalink
fix: use unlock notify also on sqlite3_exec (#2021) (#2055)
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam authored Aug 23, 2022
1 parent 5790ffc commit 2d65c5d
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 73 deletions.
39 changes: 22 additions & 17 deletions sqlx-core/src/sqlite/connection/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ use std::ptr;
use std::ptr::NonNull;

use crate::error::Error;
use libsqlite3_sys::{sqlite3, sqlite3_close, sqlite3_exec, sqlite3_last_insert_rowid, SQLITE_OK};
use libsqlite3_sys::{
sqlite3, sqlite3_close, sqlite3_exec, sqlite3_last_insert_rowid, SQLITE_LOCKED_SHAREDCACHE,
SQLITE_OK,
};

use crate::sqlite::SqliteError;
use crate::sqlite::{statement::unlock_notify, SqliteError};

/// Managed handle to the raw SQLite3 database handle.
/// The database handle will be closed when this is dropped and no `ConnectionHandleRef`s exist.
Expand Down Expand Up @@ -61,21 +64,23 @@ impl ConnectionHandle {

// SAFETY: we have exclusive access to the database handle
unsafe {
let status = sqlite3_exec(
self.as_ptr(),
query.as_ptr(),
// callback if we wanted result rows
None,
// callback data
ptr::null_mut(),
// out-pointer for the error message, we just use `SqliteError::new()`
ptr::null_mut(),
);

if status == SQLITE_OK {
Ok(())
} else {
Err(SqliteError::new(self.as_ptr()).into())
loop {
let status = sqlite3_exec(
self.as_ptr(),
query.as_ptr(),
// callback if we wanted result rows
None,
// callback data
ptr::null_mut(),
// out-pointer for the error message, we just use `SqliteError::new()`
ptr::null_mut(),
);

match status {
SQLITE_OK => return Ok(()),
SQLITE_LOCKED_SHAREDCACHE => unlock_notify::wait(self.as_ptr())?,
_ => return Err(SqliteError::new(self.as_ptr()).into()),
}
}
}
}
Expand Down
61 changes: 5 additions & 56 deletions sqlx-core/src/sqlite/statement/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::ptr;
use std::ptr::NonNull;
use std::slice::from_raw_parts;
use std::str::{from_utf8, from_utf8_unchecked};
use std::sync::{Condvar, Mutex};

use libsqlite3_sys::{
sqlite3, sqlite3_bind_blob64, sqlite3_bind_double, sqlite3_bind_int, sqlite3_bind_int64,
Expand All @@ -17,14 +16,16 @@ use libsqlite3_sys::{
sqlite3_column_name, sqlite3_column_origin_name, sqlite3_column_table_name,
sqlite3_column_type, sqlite3_column_value, sqlite3_db_handle, sqlite3_finalize, sqlite3_reset,
sqlite3_sql, sqlite3_step, sqlite3_stmt, sqlite3_stmt_readonly, sqlite3_table_column_metadata,
sqlite3_unlock_notify, sqlite3_value, SQLITE_DONE, SQLITE_LOCKED_SHAREDCACHE, SQLITE_MISUSE,
SQLITE_OK, SQLITE_ROW, SQLITE_TRANSIENT, SQLITE_UTF8,
sqlite3_value, SQLITE_DONE, SQLITE_LOCKED_SHAREDCACHE, SQLITE_MISUSE, SQLITE_OK, SQLITE_ROW,
SQLITE_TRANSIENT, SQLITE_UTF8,
};

use crate::error::{BoxDynError, Error};
use crate::sqlite::type_info::DataType;
use crate::sqlite::{SqliteError, SqliteTypeInfo};

use super::unlock_notify;

#[derive(Debug)]
pub(crate) struct StatementHandle(NonNull<sqlite3_stmt>);

Expand Down Expand Up @@ -314,7 +315,7 @@ impl StatementHandle {
SQLITE_LOCKED_SHAREDCACHE => {
// The shared cache is locked by another connection. Wait for unlock
// notification and try again.
wait_for_unlock_notify(self.db_handle())?;
unlock_notify::wait(self.db_handle())?;
// Need to reset the handle after the unlock
// (https://www.sqlite.org/unlock_notify.html)
sqlite3_reset(self.0.as_ptr());
Expand Down Expand Up @@ -344,55 +345,3 @@ impl Drop for StatementHandle {
}
}
}

unsafe fn wait_for_unlock_notify(conn: *mut sqlite3) -> Result<(), SqliteError> {
let notify = Notify::new();

if sqlite3_unlock_notify(
conn,
Some(unlock_notify_cb),
&notify as *const Notify as *mut Notify as *mut _,
) != SQLITE_OK
{
return Err(SqliteError::new(conn));
}

notify.wait();

Ok(())
}

unsafe extern "C" fn unlock_notify_cb(ptr: *mut *mut c_void, len: c_int) {
let ptr = ptr as *mut &Notify;
let slice = from_raw_parts(ptr, len as usize);

for notify in slice {
notify.fire();
}
}

struct Notify {
mutex: Mutex<bool>,
condvar: Condvar,
}

impl Notify {
fn new() -> Self {
Self {
mutex: Mutex::new(false),
condvar: Condvar::new(),
}
}

fn wait(&self) {
let _ = self
.condvar
.wait_while(self.mutex.lock().unwrap(), |fired| !*fired)
.unwrap();
}

fn fire(&self) {
*self.mutex.lock().unwrap() = true;
self.condvar.notify_one();
}
}
1 change: 1 addition & 0 deletions sqlx-core/src/sqlite/statement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::borrow::Cow;
use std::sync::Arc;

mod handle;
pub(super) mod unlock_notify;
mod r#virtual;

pub(crate) use handle::StatementHandle;
Expand Down
61 changes: 61 additions & 0 deletions sqlx-core/src/sqlite/statement/unlock_notify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::ffi::c_void;
use std::os::raw::c_int;
use std::slice;
use std::sync::{Condvar, Mutex};

use libsqlite3_sys::{sqlite3, sqlite3_unlock_notify, SQLITE_OK};

use crate::sqlite::SqliteError;

// Wait for unlock notification (https://www.sqlite.org/unlock_notify.html)
pub unsafe fn wait(conn: *mut sqlite3) -> Result<(), SqliteError> {
let notify = Notify::new();

if sqlite3_unlock_notify(
conn,
Some(unlock_notify_cb),
&notify as *const Notify as *mut Notify as *mut _,
) != SQLITE_OK
{
return Err(SqliteError::new(conn));
}

notify.wait();

Ok(())
}

unsafe extern "C" fn unlock_notify_cb(ptr: *mut *mut c_void, len: c_int) {
let ptr = ptr as *mut &Notify;
let slice = slice::from_raw_parts(ptr, len as usize);

for notify in slice {
notify.fire();
}
}

struct Notify {
mutex: Mutex<bool>,
condvar: Condvar,
}

impl Notify {
fn new() -> Self {
Self {
mutex: Mutex::new(false),
condvar: Condvar::new(),
}
}

fn wait(&self) {
let _ = self
.condvar
.wait_while(self.mutex.lock().unwrap(), |fired| !*fired)
.unwrap();
}

fn fire(&self) {
*self.mutex.lock().unwrap() = true;
self.condvar.notify_one();
}
}

0 comments on commit 2d65c5d

Please sign in to comment.