Skip to content

Commit

Permalink
Merge pull request sourcefrog#274 from sourcefrog/transport-ref
Browse files Browse the repository at this point in the history
Cleaner Transport object

Previously, clients needed to deal with an Arc<dyn Transport>, which was unfortunate because it's widely used. Now they can just use a &Transport, which is cheaply cloneable. An object pointer to the particular implementation is held internally.
  • Loading branch information
sourcefrog authored Oct 20, 2024
2 parents 1952d6a + 62e56b0 commit 2d631fb
Show file tree
Hide file tree
Showing 18 changed files with 350 additions and 287 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
matrix:
os: [ubuntu-latest, windows-latest, macOS-latest]
features: ["", "s3"]
version: [stable, nightly, "1.78"]
version: [stable, nightly, "1.79"]

steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ name = "conserve"
readme = "README.md"
repository = "https://github.com/sourcefrog/conserve/"
version = "24.8.0"
rust-version = "1.78"
rust-version = "1.79"

[features]
s3 = [
Expand Down
33 changes: 15 additions & 18 deletions src/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tracing::{debug, warn};

use crate::jsonio::{read_json, write_json};
use crate::monitor::Monitor;
use crate::transport::local::LocalTransport;
use crate::transport::Transport;
use crate::*;

const HEADER_FILENAME: &str = "CONSERVE";
Expand All @@ -39,7 +39,7 @@ pub struct Archive {
pub(crate) block_dir: Arc<BlockDir>,

/// Transport to the root directory of the archive.
transport: Arc<dyn Transport>,
transport: Transport,
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -56,24 +56,21 @@ pub struct DeleteOptions {
impl Archive {
/// Make a new archive in a local directory.
pub fn create_path(path: &Path) -> Result<Archive> {
Archive::create(Arc::new(LocalTransport::new(path)))
Archive::create(Transport::local(path))
}

/// Make a new archive in a new directory accessed by a Transport.
pub fn create(transport: Arc<dyn Transport>) -> Result<Archive> {
pub fn create(transport: Transport) -> Result<Archive> {
transport.create_dir("")?;
let names = transport.list_dir("")?;
if !names.files.is_empty() || !names.dirs.is_empty() {
return Err(Error::NewArchiveDirectoryNotEmpty);
}
let block_dir = Arc::new(BlockDir::create(transport.sub_transport(BLOCK_DIR))?);
write_json(
&transport,
HEADER_FILENAME,
&ArchiveHeader {
conserve_archive_version: String::from(ARCHIVE_VERSION),
},
)?;
let block_dir = Arc::new(BlockDir::create(transport.chdir(BLOCK_DIR))?);
let header = ArchiveHeader {
conserve_archive_version: String::from(ARCHIVE_VERSION),
};
write_json(&transport, HEADER_FILENAME, &header)?;
Ok(Archive {
block_dir,
transport,
Expand All @@ -84,18 +81,18 @@ impl Archive {
///
/// Checks that the header is correct.
pub fn open_path(path: &Path) -> Result<Archive> {
Archive::open(Arc::new(LocalTransport::new(path)))
Archive::open(Transport::local(path))
}

pub fn open(transport: Arc<dyn Transport>) -> Result<Archive> {
pub fn open(transport: Transport) -> Result<Archive> {
let header: ArchiveHeader =
read_json(&transport, HEADER_FILENAME)?.ok_or(Error::NotAnArchive)?;
if header.conserve_archive_version != ARCHIVE_VERSION {
return Err(Error::UnsupportedArchiveVersion {
version: header.conserve_archive_version,
});
}
let block_dir = Arc::new(BlockDir::open(transport.sub_transport(BLOCK_DIR)));
let block_dir = Arc::new(BlockDir::open(transport.chdir(BLOCK_DIR)));
debug!(?header, "Opened archive");
Ok(Archive {
block_dir,
Expand All @@ -108,7 +105,7 @@ impl Archive {
}

pub fn band_exists(&self, band_id: BandId) -> Result<bool> {
self.transport
self.transport()
.is_file(&format!("{}/{}", band_id, crate::BAND_HEAD_FILENAME))
.map_err(Error::from)
}
Expand Down Expand Up @@ -138,8 +135,8 @@ impl Archive {
Ok(band_ids)
}

pub(crate) fn transport(&self) -> &dyn Transport {
self.transport.as_ref()
pub(crate) fn transport(&self) -> &Transport {
&self.transport
}

pub fn resolve_band_id(&self, band_selection: BandSelectionPolicy) -> Result<BandId> {
Expand Down
11 changes: 6 additions & 5 deletions src/band.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use std::borrow::Cow;
use std::sync::Arc;

use crate::transport::Transport;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
Expand Down Expand Up @@ -75,7 +76,7 @@ pub struct Band {
band_id: BandId,

/// Transport pointing to the archive directory.
transport: Arc<dyn Transport>,
transport: Transport,

/// Deserialized band head info.
head: Head,
Expand Down Expand Up @@ -143,7 +144,7 @@ impl Band {
let band_id = archive
.last_band_id()?
.map_or_else(BandId::zero, |b| b.next_sibling());
let transport = archive.transport().sub_transport(&band_id.to_string());
let transport = archive.transport().chdir(&band_id.to_string());
transport.create_dir("")?;
transport.create_dir(INDEX_DIR)?;
let band_format_version = if format_flags.is_empty() {
Expand Down Expand Up @@ -179,7 +180,7 @@ impl Band {

/// Open the band with the given id.
pub fn open(archive: &Archive, band_id: BandId) -> Result<Band> {
let transport = archive.transport().sub_transport(&band_id.to_string());
let transport = archive.transport().chdir(&band_id.to_string());
let head: Head =
read_json(&transport, BAND_HEAD_FILENAME)?.ok_or(Error::BandHeadMissing { band_id })?;
if let Some(version) = &head.band_format_version {
Expand Down Expand Up @@ -250,12 +251,12 @@ impl Band {
}

pub fn index_builder(&self) -> IndexWriter {
IndexWriter::new(self.transport.sub_transport(INDEX_DIR))
IndexWriter::new(self.transport.chdir(INDEX_DIR))
}

/// Get read-only access to the index of this band.
pub fn index(&self) -> IndexRead {
IndexRead::open(self.transport.sub_transport(INDEX_DIR))
IndexRead::open(self.transport.chdir(INDEX_DIR))
}

/// Return info about the state of this band.
Expand Down
23 changes: 12 additions & 11 deletions src/bin/conserve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use time::UtcOffset;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn, Level};

use crate::transport::Transport;
use conserve::termui::{enable_tracing, TermUiMonitor, TraceTimeStyle};
use conserve::*;

Expand Down Expand Up @@ -327,7 +328,7 @@ impl Command {
..Default::default()
};
let stats = backup(
&Archive::open(open_transport(archive)?)?,
&Archive::open(Transport::new(archive)?)?,
source,
&options,
monitor,
Expand All @@ -338,7 +339,7 @@ impl Command {
}
Command::Debug(Debug::Blocks { archive }) => {
let mut bw = BufWriter::new(stdout);
for hash in Archive::open(open_transport(archive)?)?
for hash in Archive::open(Transport::new(archive)?)?
.block_dir()
.blocks(monitor)?
.collect::<Vec<BlockHash>>()
Expand All @@ -353,15 +354,15 @@ impl Command {
}
Command::Debug(Debug::Referenced { archive }) => {
let mut bw = BufWriter::new(stdout);
let archive = Archive::open(open_transport(archive)?)?;
let archive = Archive::open(Transport::new(archive)?)?;
for hash in archive.referenced_blocks(&archive.list_band_ids()?, monitor)? {
writeln!(bw, "{hash}")?;
}
}
Command::Debug(Debug::Unreferenced { archive }) => {
print!(
"{}",
Archive::open(open_transport(archive)?)?
Archive::open(Transport::new(archive)?)?
.unreferenced_blocks(monitor)?
.map(|hash| format!("{}\n", hash))
.collect::<Vec<String>>()
Expand All @@ -375,7 +376,7 @@ impl Command {
break_lock,
no_stats,
} => {
let stats = Archive::open(open_transport(archive)?)?.delete_bands(
let stats = Archive::open(Transport::new(archive)?)?.delete_bands(
backup,
&DeleteOptions {
dry_run: *dry_run,
Expand Down Expand Up @@ -418,7 +419,7 @@ impl Command {
break_lock,
no_stats,
} => {
let archive = Archive::open(open_transport(archive)?)?;
let archive = Archive::open(Transport::new(archive)?)?;
let stats = archive.delete_bands(
&[],
&DeleteOptions {
Expand All @@ -432,7 +433,7 @@ impl Command {
}
}
Command::Init { archive } => {
Archive::create(open_transport(archive)?)?;
Archive::create(Transport::new(archive)?)?;
debug!("Created new archive in {archive:?}");
}
Command::Ls {
Expand Down Expand Up @@ -481,7 +482,7 @@ impl Command {
no_stats,
} => {
let band_selection = band_selection_policy_from_opt(backup);
let archive = Archive::open(open_transport(archive)?)?;
let archive = Archive::open(Transport::new(archive)?)?;
let _ = no_stats; // accepted but ignored; we never currently print stats
let options = RestoreOptions {
exclude: Exclude::from_patterns_and_files(exclude, exclude_from)?,
Expand Down Expand Up @@ -524,7 +525,7 @@ impl Command {
let options = ValidateOptions {
skip_block_hashes: *quick,
};
Archive::open(open_transport(archive)?)?.validate(&options, monitor.clone())?;
Archive::open(Transport::new(archive)?)?.validate(&options, monitor.clone())?;
if monitor.error_count() != 0 {
warn!("Archive has some problems.");
} else {
Expand All @@ -543,7 +544,7 @@ impl Command {
} else {
Some(*LOCAL_OFFSET.read().unwrap())
};
let archive = Archive::open(open_transport(archive)?)?;
let archive = Archive::open(Transport::new(archive)?)?;
let options = ShowVersionsOptions {
newest_first: *newest,
tree_size: *sizes,
Expand All @@ -559,7 +560,7 @@ impl Command {
}

fn stored_tree_from_opt(archive_location: &str, backup: &Option<BandId>) -> Result<StoredTree> {
let archive = Archive::open(open_transport(archive_location)?)?;
let archive = Archive::open(Transport::new(archive_location)?)?;
let policy = band_selection_policy_from_opt(backup);
archive.open_stored_tree(policy)
}
Expand Down
21 changes: 10 additions & 11 deletions src/blockdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tracing::{instrument, trace};
use crate::compress::snappy::{Compressor, Decompressor};
use crate::counters::Counter;
use crate::monitor::Monitor;
use crate::transport::ListDir;
use crate::transport::{ListDir, Transport};
use crate::*;

// const BLOCKDIR_FILE_NAME_LEN: usize = crate::BLAKE_HASH_SIZE_BYTES * 2;
Expand Down Expand Up @@ -65,7 +65,7 @@ pub struct Address {
/// A readable, writable directory within a band holding data blocks.
#[derive(Debug)]
pub struct BlockDir {
transport: Arc<dyn Transport>,
transport: Transport,
pub stats: BlockDirStats,
// TODO: There are fancier caches and they might help, but this one works, and Stretto did not work for me.
cache: RwLock<LruCache<BlockHash, Bytes>>,
Expand All @@ -85,7 +85,7 @@ pub fn block_relpath(hash: &BlockHash) -> String {
}

impl BlockDir {
pub fn open(transport: Arc<dyn Transport>) -> BlockDir {
pub fn open(transport: Transport) -> BlockDir {
/// Cache this many blocks in memory.
// TODO: Change to a cache that tracks the size of stored blocks?
// As a safe conservative value, 100 blocks of 20MB each would be 2GB.
Expand All @@ -102,7 +102,7 @@ impl BlockDir {
}
}

pub fn create(transport: Arc<dyn Transport>) -> Result<BlockDir> {
pub fn create(transport: Transport) -> Result<BlockDir> {
transport.create_dir("")?;
Ok(BlockDir::open(transport))
}
Expand Down Expand Up @@ -343,7 +343,6 @@ mod test {
use tempfile::TempDir;

use crate::monitor::test::TestMonitor;
use crate::transport::open_local_transport;

use super::*;

Expand All @@ -353,7 +352,7 @@ mod test {
// file with 0 bytes. It's not valid compressed data. We just treat
// the block as not present at all.
let tempdir = TempDir::new().unwrap();
let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
let blockdir = BlockDir::open(Transport::local(tempdir.path()));
let mut stats = BackupStats::default();
let monitor = TestMonitor::arc();
let hash = blockdir
Expand All @@ -367,7 +366,7 @@ mod test {
assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheHit), 1); // Since we just wrote it, we know it's there.

// Open again to get a fresh cache
let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
let blockdir = BlockDir::open(Transport::local(tempdir.path()));
let monitor = TestMonitor::arc();
OpenOptions::new()
.write(true)
Expand All @@ -383,7 +382,7 @@ mod test {
#[test]
fn temp_files_are_not_returned_as_blocks() {
let tempdir = TempDir::new().unwrap();
let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
let blockdir = BlockDir::open(Transport::local(tempdir.path()));
let monitor = TestMonitor::arc();
let subdir = tempdir.path().join(subdir_relpath("123"));
create_dir(&subdir).unwrap();
Expand All @@ -402,7 +401,7 @@ mod test {
#[test]
fn cache_hit() {
let tempdir = TempDir::new().unwrap();
let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
let blockdir = BlockDir::open(Transport::local(tempdir.path()));
let mut stats = BackupStats::default();
let content = Bytes::from("stuff");
let hash = blockdir
Expand Down Expand Up @@ -432,7 +431,7 @@ mod test {
#[test]
fn existence_cache_hit() {
let tempdir = TempDir::new().unwrap();
let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
let blockdir = BlockDir::open(Transport::local(tempdir.path()));
let mut stats = BackupStats::default();
let content = Bytes::from("stuff");
let monitor = TestMonitor::arc();
Expand All @@ -442,7 +441,7 @@ mod test {

// reopen
let monitor = TestMonitor::arc();
let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
let blockdir = BlockDir::open(Transport::local(tempdir.path()));
assert!(blockdir.contains(&hash, monitor.clone()).unwrap());
assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 0);
assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheHit), 0);
Expand Down
Loading

0 comments on commit 2d631fb

Please sign in to comment.