Skip to content

sqlite: Fix #616 #627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sqlx-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ pub enum Error {
#[error("attempted to acquire a connection on a closed pool")]
PoolClosed,

/// A background worker (e.g. [`StatementWorker`]) has crashed.
///
/// [`StatementWorker`]: crate::sqlite::StatementWorker
#[error("attempted to communicate with a crashed background worker")]
WorkerCrashed,

#[cfg(feature = "migrate")]
#[error("{0}")]
Migrate(#[source] Box<crate::migrate::MigrateError>),
Expand Down
24 changes: 10 additions & 14 deletions sqlx-core/src/sqlite/connection/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ pub(super) fn describe<'c: 'e, 'q: 'e, 'e>(
let mut statement = statement?;

// we start by finding the first statement that *can* return results
while let Some((statement, ..)) = statement.prepare(&mut conn.handle)? {
num_params += statement.bind_parameter_count();
while let Some((stmt, ..)) = statement.prepare(&mut conn.handle)? {
num_params += stmt.bind_parameter_count();

let mut stepped = false;

let num = statement.column_count();
let num = stmt.column_count();
if num == 0 {
// no columns in this statement; skip
continue;
Expand All @@ -44,7 +44,7 @@ pub(super) fn describe<'c: 'e, 'q: 'e, 'e>(
// to [column_decltype]

// if explain.. fails, ignore the failure and we'll have no fallback
let (fallback, fallback_nullable) = match explain(conn, statement.sql()).await {
let (fallback, fallback_nullable) = match explain(conn, stmt.sql()).await {
Ok(v) => v,
Err(err) => {
log::debug!("describe: explain introspection failed: {}", err);
Expand All @@ -54,24 +54,20 @@ pub(super) fn describe<'c: 'e, 'q: 'e, 'e>(
};

for col in 0..num {
let name = statement.column_name(col).to_owned();
let name = stmt.column_name(col).to_owned();

let type_info = if let Some(ty) = statement.column_decltype(col) {
let type_info = if let Some(ty) = stmt.column_decltype(col) {
ty
} else {
// if that fails, we back up and attempt to step the statement
// once *if* its read-only and then use [column_type] as a
// fallback to [column_decltype]
if !stepped && statement.read_only() {
if !stepped && stmt.read_only() {
stepped = true;

conn.worker.execute(statement);
conn.worker.wake();

let _ = conn.worker.step(statement).await;
let _ = conn.worker.step(*stmt).await;
}

let mut ty = statement.column_type_info(col);
let mut ty = stmt.column_type_info(col);

if ty.0 == DataType::Null {
if let Some(fallback) = fallback.get(col).cloned() {
Expand All @@ -82,7 +78,7 @@ pub(super) fn describe<'c: 'e, 'q: 'e, 'e>(
ty
};

nullable.push(statement.column_nullable(col)?.or_else(|| {
nullable.push(stmt.column_nullable(col)?.or_else(|| {
// if we do not *know* if this is nullable, check the EXPLAIN fallback
fallback_nullable.get(col).copied().and_then(identity)
}));
Expand Down
19 changes: 6 additions & 13 deletions sqlx-core/src/sqlite/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
handle: ref mut conn,
ref mut statements,
ref mut statement,
ref worker,
ref mut worker,
..
} = self;

Expand All @@ -91,25 +91,18 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
// keep track of how many arguments we have bound
let mut num_arguments = 0;

while let Some((handle, columns, column_names, last_row_values)) = stmt.prepare(conn)? {
while let Some((stmt, columns, column_names, last_row_values)) = stmt.prepare(conn)? {
// bind values to the statement
num_arguments += bind(handle, &arguments, num_arguments)?;

// tell the worker about the new statement
worker.execute(handle);

// wake up the worker if needed
// the worker parks its thread on async-std when not in use
worker.wake();
num_arguments += bind(stmt, &arguments, num_arguments)?;

loop {
// save the rows from the _current_ position on the statement
// and send them to the still-live row object
SqliteRow::inflate_if_needed(handle, &*columns, last_row_values.take());
SqliteRow::inflate_if_needed(stmt, &*columns, last_row_values.take());

// invoke [sqlite3_step] on the dedicated worker thread
// this will move us forward one row or finish the statement
let s = worker.step(handle).await?;
let s = worker.step(*stmt).await?;

match s {
Either::Left(changes) => {
Expand All @@ -129,7 +122,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {

Either::Right(()) => {
let (row, weak_values_ref) = SqliteRow::current(
*handle,
*stmt,
columns,
column_names
);
Expand Down
3 changes: 0 additions & 3 deletions sqlx-core/src/sqlite/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,5 @@ impl Drop for SqliteConnection {
// we must explicitly drop the statements as the drop-order in a struct is undefined
self.statements.clear();
self.statement.take();

// we then explicitly close the worker
self.worker.close();
}
}
150 changes: 33 additions & 117 deletions sqlx-core/src/sqlite/statement/worker.rs
Original file line number Diff line number Diff line change
@@ -1,151 +1,67 @@
use crate::error::Error;
use crate::sqlite::statement::StatementHandle;
use crossbeam_channel::{bounded, unbounded, Sender};
use either::Either;
use libsqlite3_sys::sqlite3_stmt;
use libsqlite3_sys::{sqlite3_step, SQLITE_DONE, SQLITE_ROW};
use sqlx_rt::yield_now;
use std::ptr::null_mut;
use std::sync::atomic::{spin_loop_hint, AtomicI32, AtomicPtr, Ordering};
use std::sync::Arc;
use std::thread::{self, park, spawn, JoinHandle};

const STATE_CLOSE: i32 = -1;

const STATE_READY: i32 = 0;

const STATE_INITIAL: i32 = 1;
use std::thread;

// Each SQLite connection has a dedicated thread.

// TODO: Tweak this so that we can use a thread pool per pool of SQLite3 connections to reduce
// OS resource usage. Low priority because a high concurrent load for SQLite3 is very
// unlikely.

// TODO: Reduce atomic complexity. There must be a simpler way to do this that doesn't
// compromise performance.

pub(crate) struct StatementWorker {
statement: Arc<AtomicPtr<sqlite3_stmt>>,
status: Arc<AtomicI32>,
handle: Option<JoinHandle<()>>,
tx: Sender<StatementWorkerCommand>,
}

enum StatementWorkerCommand {
Step {
statement: StatementHandle,
tx: Sender<Result<Either<u64, ()>, Error>>,
},
}

impl StatementWorker {
pub(crate) fn new() -> Self {
let statement = Arc::new(AtomicPtr::new(null_mut::<sqlite3_stmt>()));
let status = Arc::new(AtomicI32::new(STATE_INITIAL));

let handle = spawn({
let statement = Arc::clone(&statement);
let status = Arc::clone(&status);

move || {
// wait for the first command
park();
let (tx, rx) = unbounded();

'run: while status.load(Ordering::Acquire) >= 0 {
'statement: loop {
match status.load(Ordering::Acquire) {
STATE_CLOSE => {
// worker has been dropped; get out
break 'run;
}
thread::spawn(move || {
for cmd in rx {
match cmd {
StatementWorkerCommand::Step { statement, tx } => {
let status = unsafe { sqlite3_step(statement.0.as_ptr()) };

STATE_READY => {
let statement = statement.load(Ordering::Acquire);
if statement.is_null() {
// we do not have the statement handle yet
thread::yield_now();
continue;
}
let resp = match status {
SQLITE_ROW => Ok(Either::Right(())),
SQLITE_DONE => Ok(Either::Left(statement.changes())),
_ => Err(statement.last_error().into()),
};

let v = unsafe { sqlite3_step(statement) };

status.store(v, Ordering::Release);

if v == SQLITE_DONE {
// when a statement is _done_, we park the thread until
// we need it again
park();
break 'statement;
}
}

_ => {
// waits for the receiving end to be ready to receive the rows
// this should take less than 1 microsecond under most conditions
spin_loop_hint();
}
}
let _ = tx.send(resp);
}
}
}
});

Self {
handle: Some(handle),
statement,
status,
}
Self { tx }
}

pub(crate) fn wake(&self) {
if let Some(handle) = &self.handle {
handle.thread().unpark();
}
}
pub(crate) async fn step(
&mut self,
statement: StatementHandle,
) -> Result<Either<u64, ()>, Error> {
let (tx, rx) = bounded(1);

pub(crate) fn execute(&self, statement: &StatementHandle) {
// readies the worker to execute the statement
// for async-std, this unparks our dedicated thread

self.statement
.store(statement.0.as_ptr(), Ordering::Release);
}

pub(crate) async fn step(&self, statement: &StatementHandle) -> Result<Either<u64, ()>, Error> {
// storing <0> as a terminal in status releases the worker
// to proceed to the next [sqlite3_step] invocation
self.status.store(STATE_READY, Ordering::Release);

// we then use a spin loop to wait for this to finish
// 99% of the time this should be < 1 μs
let status = loop {
let status = self
.status
.compare_and_swap(STATE_READY, STATE_READY, Ordering::AcqRel);

if status != STATE_READY {
break status;
}
self.tx
.send(StatementWorkerCommand::Step { statement, tx })
.map_err(|_| Error::WorkerCrashed)?;

while rx.is_empty() {
yield_now().await;
};

match status {
// a row was found
SQLITE_ROW => Ok(Either::Right(())),

// reached the end of the query results,
// emit the # of changes
SQLITE_DONE => Ok(Either::Left(statement.changes())),

_ => Err(statement.last_error().into()),
}
}

pub(crate) fn close(&mut self) {
self.status.store(STATE_CLOSE, Ordering::Release);

if let Some(handle) = self.handle.take() {
handle.thread().unpark();
handle.join().unwrap();
}
}
}

impl Drop for StatementWorker {
fn drop(&mut self) {
self.close();
rx.recv().map_err(|_| Error::WorkerCrashed)?
}
}