Skip to content

Commit

Permalink
Merge pull request #444 from pimeys/lru
Browse files Browse the repository at this point in the history
LRU Statement Caching
  • Loading branch information
mehcode authored Jun 25, 2020
2 parents 72c4e04 + 363dbfb commit 8600e5d
Show file tree
Hide file tree
Showing 29 changed files with 381 additions and 25 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,4 @@ url = { version = "2.1.1", default-features = false }
uuid = { version = "0.8.1", default-features = false, optional = true, features = [ "std" ] }
whoami = "0.8.1"
stringprep = "0.1.2"
lru-cache = "0.1.2"
3 changes: 3 additions & 0 deletions sqlx-core/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod statement_cache;

pub(crate) use statement_cache::StatementCache;
61 changes: 61 additions & 0 deletions sqlx-core/src/common/statement_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use lru_cache::LruCache;

/// A cache for prepared statements. When full, the least recently used
/// statement gets removed.
#[derive(Debug)]
pub struct StatementCache<T> {
inner: LruCache<String, T>,
}

impl<T> StatementCache<T> {
/// Create a new cache with the given capacity.
pub fn new(capacity: usize) -> Self {
Self {
inner: LruCache::new(capacity),
}
}

/// Returns a mutable reference to the value corresponding to the given key
/// in the cache, if any.
pub fn get_mut(&mut self, k: &str) -> Option<&mut T> {
self.inner.get_mut(k)
}

/// Inserts a new statement to the cache, returning the least recently used
/// statement id if the cache is full, or if inserting with an existing key,
/// the replaced existing statement.
pub fn insert(&mut self, k: &str, v: T) -> Option<T> {
let mut lru_item = None;

if self.inner.capacity() == self.len() && !self.inner.contains_key(k) {
lru_item = self.remove_lru();
} else if self.contains_key(k) {
lru_item = self.inner.remove(k);
}

self.inner.insert(k.into(), v);

lru_item
}

/// The number of statements in the cache.
pub fn len(&self) -> usize {
self.inner.len()
}

/// Removes the least recently used item from the cache.
pub fn remove_lru(&mut self) -> Option<T> {
self.inner.remove_lru().map(|(_, v)| v)
}

/// Clear all cached statements from the cache.
#[cfg(any(feature = "sqlite"))]
pub fn clear(&mut self) {
self.inner.clear();
}

/// True if cache has a value for the given key.
pub fn contains_key(&mut self, k: &str) -> bool {
self.inner.contains_key(k)
}
}
19 changes: 18 additions & 1 deletion sqlx-core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::str::FromStr;
use futures_core::future::BoxFuture;
use futures_core::Future;

use crate::database::Database;
use crate::database::{Database, HasStatementCache};
use crate::error::{BoxDynError, Error};
use crate::transaction::Transaction;

Expand Down Expand Up @@ -64,6 +64,23 @@ pub trait Connection: Send {
})
}

/// The number of statements currently cached in the connection.
fn cached_statements_size(&self) -> usize
where
Self::Database: HasStatementCache,
{
0
}

/// Removes all statements from the cache, closing them on the server if
/// needed.
fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>>
where
Self::Database: HasStatementCache,
{
Box::pin(async move { Ok(()) })
}

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>>;

Expand Down
2 changes: 2 additions & 0 deletions sqlx-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,5 @@ pub trait HasArguments<'q> {
/// The concrete type used as a buffer for arguments while encoding.
type ArgumentBuffer: Default;
}

pub trait HasStatementCache {}
1 change: 1 addition & 0 deletions sqlx-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub mod transaction;
#[macro_use]
pub mod encode;

mod common;
pub mod database;
pub mod decode;
pub mod describe;
Expand Down
4 changes: 2 additions & 2 deletions sqlx-core/src/mysql/connection/establish.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bytes::Bytes;
use hashbrown::HashMap;

use crate::common::StatementCache;
use crate::error::Error;
use crate::mysql::connection::{tls, MySqlStream, COLLATE_UTF8MB4_UNICODE_CI, MAX_PACKET_SIZE};
use crate::mysql::protocol::connect::{
Expand Down Expand Up @@ -98,7 +98,7 @@ impl MySqlConnection {

Ok(Self {
stream,
cache_statement: HashMap::new(),
cache_statement: StatementCache::new(options.statement_cache_capacity),
scratch_row_columns: Default::default(),
scratch_row_column_names: Default::default(),
})
Expand Down
12 changes: 7 additions & 5 deletions sqlx-core/src/mysql/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::mysql::connection::stream::Busy;
use crate::mysql::io::MySqlBufExt;
use crate::mysql::protocol::response::Status;
use crate::mysql::protocol::statement::{
BinaryRow, Execute as StatementExecute, Prepare, PrepareOk,
BinaryRow, Execute as StatementExecute, Prepare, PrepareOk, StmtClose,
};
use crate::mysql::protocol::text::{ColumnDefinition, ColumnFlags, Query, TextRow};
use crate::mysql::protocol::Packet;
Expand All @@ -26,8 +26,8 @@ use crate::mysql::{

impl MySqlConnection {
async fn prepare(&mut self, query: &str) -> Result<u32, Error> {
if let Some(&statement) = self.cache_statement.get(query) {
return Ok(statement);
if let Some(statement) = self.cache_statement.get_mut(query) {
return Ok(*statement);
}

// https://dev.mysql.com/doc/internals/en/com-stmt-prepare.html
Expand Down Expand Up @@ -60,8 +60,10 @@ impl MySqlConnection {
self.stream.maybe_recv_eof().await?;
}

self.cache_statement
.insert(query.to_owned(), ok.statement_id);
// in case of the cache being full, close the least recently used statement
if let Some(statement) = self.cache_statement.insert(query, ok.statement_id) {
self.stream.send_packet(StmtClose { statement }).await?;
}

Ok(ok.statement_id)
}
Expand Down
18 changes: 17 additions & 1 deletion sqlx-core/src/mysql/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use hashbrown::HashMap;

use crate::common::StatementCache;
use crate::connection::{Connect, Connection};
use crate::error::Error;
use crate::executor::Executor;
use crate::ext::ustr::UStr;
use crate::mysql::protocol::statement::StmtClose;
use crate::mysql::protocol::text::{Ping, Quit};
use crate::mysql::row::MySqlColumn;
use crate::mysql::{MySql, MySqlConnectOptions};
Expand All @@ -34,7 +36,7 @@ pub struct MySqlConnection {
pub(crate) stream: MySqlStream,

// cache by query string to the statement id
cache_statement: HashMap<String, u32>,
cache_statement: StatementCache<u32>,

// working memory for the active row's column information
// this allows us to re-use these allocations unless the user is persisting the
Expand Down Expand Up @@ -75,6 +77,20 @@ impl Connection for MySqlConnection {
self.stream.wait_until_ready().boxed()
}

fn cached_statements_size(&self) -> usize {
self.cache_statement.len()
}

fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
while let Some(statement) = self.cache_statement.remove_lru() {
self.stream.send_packet(StmtClose { statement }).await?;
}

Ok(())
})
}

#[doc(hidden)]
fn should_flush(&self) -> bool {
!self.stream.wbuf.is_empty()
Expand Down
4 changes: 3 additions & 1 deletion sqlx-core/src/mysql/database.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::database::{Database, HasArguments, HasValueRef};
use crate::database::{Database, HasArguments, HasStatementCache, HasValueRef};
use crate::mysql::value::{MySqlValue, MySqlValueRef};
use crate::mysql::{
MySqlArguments, MySqlConnection, MySqlRow, MySqlTransactionManager, MySqlTypeInfo,
Expand Down Expand Up @@ -33,3 +33,5 @@ impl HasArguments<'_> for MySql {

type ArgumentBuffer = Vec<u8>;
}

impl HasStatementCache for MySql {}
27 changes: 27 additions & 0 deletions sqlx-core/src/mysql/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ impl FromStr for MySqlSslMode {
/// mysql://[host][/database][?properties]
/// ```
///
/// ## Properties
///
/// |Parameter|Default|Description|
/// |---------|-------|-----------|
/// | `ssl-mode` | `PREFERRED` | Determines whether or with what priority a secure SSL TCP/IP connection will be negotiated. See [`MySqlSslMode`]. |
/// | `ssl-ca` | `None` | Sets the name of a file containing a list of trusted SSL Certificate Authorities. |
/// | `statement-cache-capacity` | `100` | The maximum number of prepared statements stored in the cache. Set to `0` to disable. |
///
/// # Example
///
/// ```rust,no_run
Expand All @@ -92,6 +100,8 @@ impl FromStr for MySqlSslMode {
/// # })
/// # }
/// ```
///
/// [`MySqlSslMode`]: enum.MySqlSslMode.html
#[derive(Debug, Clone)]
pub struct MySqlConnectOptions {
pub(crate) host: String,
Expand All @@ -101,6 +111,7 @@ pub struct MySqlConnectOptions {
pub(crate) database: Option<String>,
pub(crate) ssl_mode: MySqlSslMode,
pub(crate) ssl_ca: Option<PathBuf>,
pub(crate) statement_cache_capacity: usize,
}

impl Default for MySqlConnectOptions {
Expand All @@ -120,6 +131,7 @@ impl MySqlConnectOptions {
database: None,
ssl_mode: MySqlSslMode::Preferred,
ssl_ca: None,
statement_cache_capacity: 100,
}
}

Expand Down Expand Up @@ -190,6 +202,17 @@ impl MySqlConnectOptions {
self.ssl_ca = Some(file_name.as_ref().to_owned());
self
}

/// Sets the capacity of the connection's statement cache in a number of stored
/// distinct statements. Caching is handled using LRU, meaning when the
/// amount of queries hits the defined limit, the oldest statement will get
/// dropped.
///
/// The default cache capacity is 100 statements.
pub fn statement_cache_capacity(mut self, capacity: usize) -> Self {
self.statement_cache_capacity = capacity;
self
}
}

impl FromStr for MySqlConnectOptions {
Expand Down Expand Up @@ -231,6 +254,10 @@ impl FromStr for MySqlConnectOptions {
options = options.ssl_ca(&*value);
}

"statement-cache-capacity" => {
options = options.statement_cache_capacity(value.parse()?);
}

_ => {}
}
}
Expand Down
2 changes: 2 additions & 0 deletions sqlx-core/src/mysql/protocol/statement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ mod execute;
mod prepare;
mod prepare_ok;
mod row;
mod stmt_close;

pub(crate) use execute::Execute;
pub(crate) use prepare::Prepare;
pub(crate) use prepare_ok::PrepareOk;
pub(crate) use row::BinaryRow;
pub(crate) use stmt_close::StmtClose;
16 changes: 16 additions & 0 deletions sqlx-core/src/mysql/protocol/statement/stmt_close.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use crate::io::Encode;
use crate::mysql::protocol::Capabilities;

// https://dev.mysql.com/doc/internals/en/com-stmt-close.html

#[derive(Debug)]
pub struct StmtClose {
pub statement: u32,
}

impl Encode<'_, Capabilities> for StmtClose {
fn encode_with(&self, buf: &mut Vec<u8>, _: Capabilities) {
buf.push(0x19); // COM_STMT_CLOSE
buf.extend(&self.statement.to_le_bytes());
}
}
3 changes: 2 additions & 1 deletion sqlx-core/src/postgres/connection/establish.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use hashbrown::HashMap;

use crate::common::StatementCache;
use crate::error::Error;
use crate::io::Decode;
use crate::postgres::connection::{sasl, stream::PgStream, tls};
Expand Down Expand Up @@ -138,7 +139,7 @@ impl PgConnection {
transaction_status,
pending_ready_for_query_count: 0,
next_statement_id: 1,
cache_statement: HashMap::with_capacity(10),
cache_statement: StatementCache::new(options.statement_cache_capacity),
cache_type_oid: HashMap::new(),
cache_type_info: HashMap::new(),
scratch_row_columns: Default::default(),
Expand Down
Loading

0 comments on commit 8600e5d

Please sign in to comment.