Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: index cache in SQLite3 #13584

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix(index): change sqlite schema to (name, ver, blob)
  • Loading branch information
weihanglo committed Apr 2, 2024
commit aadab5fb97aa3c50007bd9cd209e8d583b708d51
81 changes: 63 additions & 18 deletions src/cargo/sources/registry/index/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ use crate::CargoResult;
use crate::GlobalContext;

use super::split;
use super::Summaries;
use super::MaybeIndexSummary;
use super::INDEX_V_MAX;

/// The current version of [`SummariesCache`].
Expand Down Expand Up @@ -231,18 +233,27 @@ impl<'a> SummariesCache<'a> {
/// An abstraction of the actual cache store.
trait CacheStore {
/// Gets the cache associated with the key.
fn get(&self, key: &str) -> Option<Vec<u8>>;
fn get(&self, key: &str) -> Option<MaybeSummaries>;

/// Associates the value with the key.
fn put(&self, key: &str, value: &[u8]);

/// Associates the value with the key + version tuple.
fn put_summary(&self, key: (&str, &Version), value: &[u8]);

/// Invalidates the cache associated with the key.
fn invalidate(&self, key: &str);
}

pub enum MaybeSummaries {
Unparsed(Vec<u8>),
Parsed(Summaries),
}

/// Manages the on-disk index caches.
pub struct CacheManager<'gctx> {
store: Box<dyn CacheStore + 'gctx>,
is_sqlite: bool,
}

impl<'gctx> CacheManager<'gctx> {
Expand All @@ -258,11 +269,15 @@ impl<'gctx> CacheManager<'gctx> {
} else {
Box::new(LocalFileSystem::new(cache_root, gctx))
};
CacheManager { store }
CacheManager { store, is_sqlite: use_sqlite }
}

pub fn is_sqlite(&self) -> bool {
self.is_sqlite
}

/// Gets the cache associated with the key.
pub fn get(&self, key: &str) -> Option<Vec<u8>> {
pub fn get(&self, key: &str) -> Option<MaybeSummaries> {
self.store.get(key)
}

Expand All @@ -271,6 +286,11 @@ impl<'gctx> CacheManager<'gctx> {
self.store.put(key, value)
}

/// Associates the value with the key + version tuple.
pub fn put_summary(&self, key: (&str, &Version), value: &[u8]) {
self.store.put_summary(key, value)
}

/// Invalidates the cache associated with the key.
pub fn invalidate(&self, key: &str) {
self.store.invalidate(key)
Expand Down Expand Up @@ -301,10 +321,10 @@ impl LocalFileSystem<'_> {
}

impl CacheStore for LocalFileSystem<'_> {
fn get(&self, key: &str) -> Option<Vec<u8>> {
fn get(&self, key: &str) -> Option<MaybeSummaries> {
let cache_path = &self.cache_path(key);
match fs::read(cache_path) {
Ok(contents) => Some(contents),
Ok(contents) => Some(MaybeSummaries::Unparsed(contents)),
Err(e) => {
tracing::debug!(?cache_path, "cache missing: {e}");
None
Expand All @@ -324,6 +344,10 @@ impl CacheStore for LocalFileSystem<'_> {
}
}

fn put_summary(&self, _key: (&str, &Version), _value: &[u8]) {
panic!("unsupported");
}

fn invalidate(&self, key: &str) {
let cache_path = &self.cache_path(key);
if let Err(e) = fs::remove_file(cache_path) {
Expand All @@ -341,7 +365,7 @@ struct LocalDatabase<'gctx> {
/// Connection to the SQLite database.
conn: OnceCell<Option<RefCell<Connection>>>,
/// [`GlobalContext`] reference for convenience.
deferred_writes: RefCell<BTreeMap<String, Vec<u8>>>,
deferred_writes: RefCell<BTreeMap<String, Vec<(String, Vec<u8>)>>>,
gctx: &'gctx GlobalContext,
}

Expand All @@ -351,7 +375,7 @@ impl LocalDatabase<'_> {
LocalDatabase {
cache_root,
conn: OnceCell::new(),
deferred_writes: RefCell::new(BTreeMap::new()),
deferred_writes: Default::default(),
gctx,
}
}
Expand Down Expand Up @@ -386,9 +410,11 @@ impl LocalDatabase<'_> {
let mut conn = conn.borrow_mut();
let tx = conn.transaction()?;
let mut stmt =
tx.prepare_cached("INSERT OR REPLACE INTO summaries (name, value) VALUES (?, ?)")?;
for (key, value) in self.deferred_writes.borrow().iter() {
stmt.execute(params!(key, value))?;
tx.prepare_cached("INSERT OR REPLACE INTO summaries (name, version, value) VALUES (?, ?, ?)")?;
for (name, summaries) in self.deferred_writes.borrow().iter() {
for (version, value) in summaries {
stmt.execute(params!(name, version, value))?;
}
}
drop(stmt);
tx.commit()?;
Expand All @@ -406,19 +432,36 @@ impl Drop for LocalDatabase<'_> {
}

impl CacheStore for LocalDatabase<'_> {
fn get(&self, key: &str) -> Option<Vec<u8>> {
fn get(&self, key: &str) -> Option<MaybeSummaries> {
self.conn()?
.borrow()
.prepare_cached("SELECT value FROM summaries WHERE name = ? LIMIT 1")
.and_then(|mut stmt| stmt.query_row([key], |row| row.get(0)))
.map_err(|e| tracing::debug!(key, "cache missing: {e}"))
.prepare_cached("SELECT version, value FROM summaries WHERE name = ?")
.and_then(|mut stmt| {
let rows = stmt.query_map([key], |row| Ok((row.get(0)?, row.get(1)?)))?;
let mut summaries = Summaries::default();
for row in rows {
let (version, raw_data): (String, Vec<u8>) = row?;
let version = Version::parse(&version).expect("semver");
summaries.versions.insert(version, MaybeIndexSummary::UnparsedData(raw_data));
}
Ok(MaybeSummaries::Parsed(summaries))
})
.map_err(|e| {
tracing::debug!(key, "cache missing: {e}");
})
.ok()
}

fn put(&self, key: &str, value: &[u8]) {
fn put(&self, _key: &str, _value: &[u8]) {
panic!("unsupported");
}

fn put_summary(&self, (name, version): (&str, &Version), value: &[u8]) {
self.deferred_writes
.borrow_mut()
.insert(key.into(), value.into());
.entry(name.into())
.or_insert(Default::default())
.push((version.to_string(), value.to_vec()));
}

fn invalidate(&self, key: &str) {
Expand All @@ -440,8 +483,10 @@ impl CacheStore for LocalDatabase<'_> {
fn migrations() -> Vec<Migration> {
vec![basic_migration(
"CREATE TABLE IF NOT EXISTS summaries (
name TEXT PRIMARY KEY NOT NULL,
value BLOB NOT NULL
name TEXT NOT NULL,
version TEXT NOT NULL,
value BLOB NOT NULL,
PRIMARY KEY (name, version)
)",
)]
}
46 changes: 32 additions & 14 deletions src/cargo/sources/registry/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use std::task::{ready, Poll};
use tracing::{debug, info};

mod cache;
use self::cache::CacheManager;
use self::cache::{CacheManager, MaybeSummaries};
use self::cache::SummariesCache;

/// The maximum schema version of the `v` field in the index this version of
Expand Down Expand Up @@ -115,7 +115,8 @@ struct Summaries {
enum MaybeIndexSummary {
/// A summary which has not been parsed, The `start` and `end` are pointers
/// into [`Summaries::raw_data`] which this is an entry of.
Unparsed { start: usize, end: usize },
Unparsed(std::ops::Range<usize>),
UnparsedData(Vec<u8>),

/// An actually parsed summary.
Parsed(IndexSummary),
Expand Down Expand Up @@ -551,14 +552,20 @@ impl Summaries {

let mut cached_summaries = None;
let mut index_version = None;
if let Some(contents) = cache_manager.get(name) {
match Summaries::parse_cache(contents) {
Ok((s, v)) => {
cached_summaries = Some(s);
index_version = Some(v);
if let Some(maybe_summaries) = cache_manager.get(name) {
match maybe_summaries {
MaybeSummaries::Unparsed(contents) => match Summaries::parse_cache(contents) {
Ok((s, v)) => {
cached_summaries = Some(s);
index_version = Some(v);
}
Err(e) => {
tracing::debug!("failed to parse {name:?} cache: {e}");
}
}
Err(e) => {
tracing::debug!("failed to parse {name:?} cache: {e}");
MaybeSummaries::Parsed(summaries) => {
cached_summaries = Some(summaries);
index_version = Some("2".into());
}
}
}
Expand Down Expand Up @@ -611,9 +618,18 @@ impl Summaries {
}
};
let version = summary.package_id().version().clone();
cache.versions.push((version.clone(), line));
if cache_manager.is_sqlite() {
cache_manager.put_summary((&name, &version), line);
} else {
cache.versions.push((version.clone(), line));
}
ret.versions.insert(version, summary.into());
}

if cache_manager.is_sqlite() {
return Poll::Ready(Ok(Some(ret)));
}

if let Some(index_version) = index_version {
tracing::trace!("caching index_version {}", index_version);
let cache_bytes = cache.serialize(index_version.as_str());
Expand Down Expand Up @@ -649,7 +665,7 @@ impl Summaries {
for (version, summary) in cache.versions {
let (start, end) = subslice_bounds(&contents, summary);
ret.versions
.insert(version, MaybeIndexSummary::Unparsed { start, end });
.insert(version, MaybeIndexSummary::Unparsed(start..end));
}
ret.raw_data = contents;
return Ok((ret, index_version));
Expand Down Expand Up @@ -680,14 +696,16 @@ impl MaybeIndexSummary {
source_id: SourceId,
bindeps: bool,
) -> CargoResult<&IndexSummary> {
let (start, end) = match self {
MaybeIndexSummary::Unparsed { start, end } => (*start, *end),
let data = match self {
MaybeIndexSummary::Unparsed(range) => &raw_data[range.clone()],
MaybeIndexSummary::UnparsedData(data) => data,
MaybeIndexSummary::Parsed(summary) => return Ok(summary),
};
let summary = IndexSummary::parse(&raw_data[start..end], source_id, bindeps)?;
let summary = IndexSummary::parse(data, source_id, bindeps)?;
*self = MaybeIndexSummary::Parsed(summary);
match self {
MaybeIndexSummary::Unparsed { .. } => unreachable!(),
MaybeIndexSummary::UnparsedData { .. } => unreachable!(),
MaybeIndexSummary::Parsed(summary) => Ok(summary),
}
}
Expand Down
Loading