Description
Hi :-)
First of all, the code:
[dependencies]
sqlx = { version = "0.4.0-beta.1", default-features = false, features = ["runtime-tokio", "sqlite"] }
tokio = { version = "0.2", features = ["full"] }
use sqlx::Connection;
#[tokio::main]
async fn main() {
let mut db = sqlx::SqliteConnection::connect(":memory:")
.await
.unwrap();
let value: Option<(i64,)> = sqlx::query_as("SELECT 1")
.fetch_optional(&mut db)
.await
.unwrap();
// Keep the application alive
loop {
tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await;
}
}
This code makes the application instantaneously use 100% CPU.
Curious about it, I've conducted an investigation that led me here first:
impl<'c> Executor<'c> for &'c mut SqliteConnection {
/* ... */
// sqlx-core/src/sqlite/connection/executor.rs:157
fn fetch_optional<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<Option<SqliteRow>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
let mut s = self.fetch_many(query);
Box::pin(async move {
while let Some(v) = s.try_next().await? {
if let Either::Right(r) = v {
return Ok(Some(r));
}
}
Ok(None)
})
}
/* ... */
}
... and later there:
impl StatementWorker {
pub(crate) fn new() -> Self {
/* ... */
let handle = spawn({
/* ... */
move || {
/* ... */
// sqlx-core/src/sqlite/statement/worker.rs:49
'run: while status.load(Ordering::Acquire) >= 0 {
'statement: loop {
match status.load(Ordering::Acquire) {
STATE_CLOSE => {
/* ... */
}
STATE_READY => {
/* ... */
}
_ => {
// waits for the receiving end to be ready to receive the rows
// this should take less than 1 microsecond under most conditions
spin_loop_hint();
}
}
}
}
}
});
/* ... */
}
/* ... */
}
The issue is that the current implementation of fetch_optional()
retrieves only the first row from the stream and then silently discards the stream, without letting StatementWorker
know about it. The worker then starts entering spin_loop_hint()
millions times per second, waiting for response from a stream that doesn't exist anymore.
This forever-busy-loop can be reproduced even without referring to fetch_optional()
, like so:
async fn main() {
/* ... */
let mut stream = sqlx::query_as::<_, (Option<i64>,)>("SELECT 1")
.fetch_many(&mut db);
// Retrieve just the first row and then discard rest of the stream
while let Some(v) = s.try_next().await.unwrap() {
break;
}
// Keep the application alive
loop {
tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await;
}
}
As for a possible solution, I'd suggest introducing a new type, called ExecutionToken
:
pub(crate) struct ExecutionToken {
status: Arc<AtomicI32>,
}
impl Drop for ExecutionToken {
fn drop(&mut self) {
self.status.store(STATE_READY, Ordering::Release);
}
}
/* ... */
impl StatementWorker {
pub(crate) fn execute(&self, statement: &StatementHandle) -> ExecutionToken {
let token = ExecutionToken {
status: Arc::clone(&self.status),
};
self.statement
.store(statement.0.as_ptr(), Ordering::Release);
token
}
}
/* ... */
// and then later just:
// let _token = worker.execute(handle);
Since I'm proficient in neither sqlx's nor SQLite's internals, I'm not sure if that's the correct approach (although it works on my machine ™); if you think that's alright, I could prepare a merge request :-)
Thank you for your great work on this library,
Cheers.