Skip to content

SQLite's StatementWorker consumes 100% CPU inside its busy loop #616

Closed
@Patryk27

Description

@Patryk27

Hi :-)

First of all, the code:

[dependencies]
sqlx = { version = "0.4.0-beta.1", default-features = false, features = ["runtime-tokio", "sqlite"] }
tokio = { version = "0.2", features = ["full"] }
use sqlx::Connection;

#[tokio::main]
async fn main() {
    let mut db = sqlx::SqliteConnection::connect(":memory:")
        .await
        .unwrap();

    let value: Option<(i64,)> = sqlx::query_as("SELECT 1")
        .fetch_optional(&mut db)
        .await
        .unwrap();

    // Keep the application alive
    loop {
        tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await;
    }
}

This code makes the application instantaneously use 100% CPU.

Curious about it, I've conducted an investigation that led me here first:

impl<'c> Executor<'c> for &'c mut SqliteConnection {
    /* ... */

    // sqlx-core/src/sqlite/connection/executor.rs:157
    fn fetch_optional<'e, 'q: 'e, E: 'q>(
        self,
        query: E,
    ) -> BoxFuture<'e, Result<Option<SqliteRow>, Error>>
    where
        'c: 'e,
        E: Execute<'q, Self::Database>,
    {
        let mut s = self.fetch_many(query);

        Box::pin(async move {
            while let Some(v) = s.try_next().await? {
                if let Either::Right(r) = v {
                    return Ok(Some(r));
                }
            }

            Ok(None)
        })
    }

    /* ... */
}

... and later there:

impl StatementWorker {
    pub(crate) fn new() -> Self {
        /* ... */

        let handle = spawn({
            /* ... */

            move || {
                /* ... */

                // sqlx-core/src/sqlite/statement/worker.rs:49
                'run: while status.load(Ordering::Acquire) >= 0 {
                    'statement: loop {
                        match status.load(Ordering::Acquire) {
                            STATE_CLOSE => {
                                /* ... */
                            }

                            STATE_READY => {
                                /* ... */
                            }

                            _ => {
                                // waits for the receiving end to be ready to receive the rows
                                // this should take less than 1 microsecond under most conditions
                                spin_loop_hint();
                            }
                        }
                    }
                }
            }
        });

        /* ... */
    }

    /* ... */
}

The issue is that the current implementation of fetch_optional() retrieves only the first row from the stream and then silently discards the stream, without letting StatementWorker know about it. The worker then starts entering spin_loop_hint() millions times per second, waiting for response from a stream that doesn't exist anymore.

This forever-busy-loop can be reproduced even without referring to fetch_optional(), like so:

async fn main() {
    /* ... */

    let mut stream = sqlx::query_as::<_, (Option<i64>,)>("SELECT 1")
        .fetch_many(&mut db);

    // Retrieve just the first row and then discard rest of the stream
    while let Some(v) = s.try_next().await.unwrap() {
        break;
    }

    // Keep the application alive
    loop {
        tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await;
    }
}

As for a possible solution, I'd suggest introducing a new type, called ExecutionToken:

pub(crate) struct ExecutionToken {
    status: Arc<AtomicI32>,
}

impl Drop for ExecutionToken {
    fn drop(&mut self) {
        self.status.store(STATE_READY, Ordering::Release);
    }
}

/* ... */

impl StatementWorker {
    pub(crate) fn execute(&self, statement: &StatementHandle) -> ExecutionToken {
        let token = ExecutionToken {
            status: Arc::clone(&self.status),
        };

        self.statement
            .store(statement.0.as_ptr(), Ordering::Release);

        token
    }
}

/* ... */

// and then later just:
// let _token = worker.execute(handle);

Since I'm proficient in neither sqlx's nor SQLite's internals, I'm not sure if that's the correct approach (although it works on my machine ™); if you think that's alright, I could prepare a merge request :-)

Thank you for your great work on this library,
Cheers.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions