Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions crates/spfs-cli/cmd-render/src/cmd_render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

use clap::Parser;
use clap::builder::TypedValueParser;
use miette::{Context, Result};
use miette::{Context, IntoDiagnostic, Result};
use spfs::prelude::*;
use spfs::storage::fallback::FallbackProxy;
use spfs::storage::fs::{MaybeRenderStore, RenderStore};
use spfs::{Error, RenderResult, graph};
use spfs_cli_common as cli;
use spfs_cli_common::CommandName;
Expand Down Expand Up @@ -74,7 +75,11 @@ impl CmdRender {

let rendered = match &self.target {
Some(target) => self.render_to_dir(fallback, env_spec, target).await?,
None => self.render_to_repo(fallback, env_spec).await?,
None => {
// This path requires a repository that supports renders.
let fallback: FallbackProxy<RenderStore> = fallback.try_into().into_diagnostic()?;
self.render_to_repo(fallback, env_spec).await?
}
};

tracing::debug!("render(s) completed successfully");
Expand All @@ -85,7 +90,7 @@ impl CmdRender {

async fn render_to_dir(
&self,
repo: FallbackProxy,
repo: FallbackProxy<MaybeRenderStore>,
env_spec: spfs::tracking::EnvSpec,
target: &std::path::Path,
) -> Result<RenderResult> {
Expand Down Expand Up @@ -134,7 +139,7 @@ impl CmdRender {

async fn render_to_repo(
&self,
repo: FallbackProxy,
repo: FallbackProxy<RenderStore>,
env_spec: spfs::tracking::EnvSpec,
) -> Result<RenderResult> {
let mut stack = graph::Stack::default();
Expand Down
4 changes: 2 additions & 2 deletions crates/spfs-cli/common/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use miette::{Error, IntoDiagnostic, Result, WrapErr};
#[cfg(feature = "sentry")]
use once_cell::sync::OnceCell;
use spfs::io::Pluralize;
use spfs::storage::LocalRepository;
use spfs::storage::LocalPayloads;
use tracing_subscriber::prelude::*;

const SPFS_LOG: &str = "SPFS_LOG";
Expand Down Expand Up @@ -138,7 +138,7 @@ impl Render {
reporter: Reporter,
) -> spfs::storage::fs::Renderer<'repo, Repo, Reporter>
where
Repo: spfs::storage::Repository + LocalRepository,
Repo: spfs::storage::Repository + LocalPayloads,
Reporter: spfs::storage::fs::RenderReporter,
{
spfs::storage::fs::Renderer::new(repo)
Expand Down
3 changes: 2 additions & 1 deletion crates/spfs-cli/main/src/cmd_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::path::PathBuf;

use clap::{Args, Subcommand};
use miette::Result;
use spfs::storage::fs::NoRenderStore;

/// Create an empty filesystem repository
#[derive(Debug, Args)]
Expand Down Expand Up @@ -36,7 +37,7 @@ impl InitSubcommand {
pub async fn run(&self, _config: &spfs::Config) -> Result<i32> {
match self {
Self::Repo { path } => {
spfs::storage::fs::MaybeOpenFsRepository::create(&path).await?;
spfs::storage::fs::MaybeOpenFsRepository::<NoRenderStore>::create(&path).await?;
Ok(0)
}
}
Expand Down
6 changes: 5 additions & 1 deletion crates/spfs-cli/main/src/cmd_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use clap::Args;
use miette::Result;
use spfs::prelude::*;
use spfs::storage::fs::NoRenderStore;
use tokio_stream::StreamExt;

/// Search for available tags by substring
Expand All @@ -29,7 +30,10 @@ impl CmdSearch {
};
repos.push(remote);
}
repos.insert(0, config.get_local_repository().await?.into());
repos.insert(
0,
config.get_local_repository::<NoRenderStore>().await?.into(),
);
for repo in repos.into_iter() {
let mut tag_streams = repo.iter_tags();
while let Some(tag) = tag_streams.next().await {
Expand Down
23 changes: 19 additions & 4 deletions crates/spfs-vfs/src/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use fuser::{
};
use spfs::OsError;
use spfs::prelude::*;
use spfs::storage::LocalRepository;
use spfs::storage::LocalPayloads;
#[cfg(feature = "fuse-backend-abi-7-31")]
use spfs::tracking::BlobRead;
use spfs::tracking::{Entry, EntryKind, EnvSpec, Manifest};
Expand Down Expand Up @@ -376,9 +376,13 @@ impl Filesystem {
#[allow(unused_mut)]
let mut flags = FOPEN_KEEP_CACHE;
for repo in self.repos.iter() {
match &**repo {
spfs::storage::RepositoryHandle::FS(fs_repo) => {
let Ok(fs_repo) = fs_repo.opened().await else {
// XXX: Using a macro here for an easy fix but it would be nicer
// if there was a way to borrow the RepositoryHandle as a
// `&MaybeOpenFsRepository<NoRenderStore>` since this code
// doesn't need to access renders.
macro_rules! read_fs {
($fs_repo:ident) => {
let Ok(fs_repo) = $fs_repo.opened().await else {
reply.error(libc::ENOENT);
return;
};
Expand All @@ -393,6 +397,17 @@ impl Filesystem {
}
Err(err) => err!(reply, err),
}
};
}
match &**repo {
spfs::storage::RepositoryHandle::FSWithMaybeRenders(fs_repo) => {
read_fs!(fs_repo);
}
spfs::storage::RepositoryHandle::FSWithRenders(fs_repo) => {
read_fs!(fs_repo);
}
spfs::storage::RepositoryHandle::FSWithoutRenders(fs_repo) => {
read_fs!(fs_repo);
}
#[cfg(feature = "fuse-backend-abi-7-31")]
repo => match repo.open_payload(*digest).await {
Expand Down
23 changes: 19 additions & 4 deletions crates/spfs-vfs/src/winfsp/mount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use dashmap::DashMap;
use libc::c_void;
use spfs::OsError;
use spfs::prelude::*;
use spfs::storage::LocalRepository;
use spfs::storage::LocalPayloads;
use spfs::tracking::{Entry, EntryKind};
use tokio::io::AsyncReadExt;
use windows::Win32::Foundation::{ERROR_SEEK_ON_DEVICE, STATUS_NOT_A_DIRECTORY};
Expand Down Expand Up @@ -281,9 +281,13 @@ impl winfsp::filesystem::FileSystemContext for Mount {
let digest = entry.object;
self.rt.spawn(async move {
for repo in repos.into_iter() {
match &*repo {
spfs::storage::RepositoryHandle::FS(fs_repo) => {
let Ok(fs_repo) = fs_repo.opened().await else {
// XXX: Using a macro here for an easy fix but it would be nicer
// if there was a way to borrow the RepositoryHandle as a
// `&MaybeOpenFsRepository<NoRenderStore>` since this code
// doesn't need to access renders.
macro_rules! read_fs {
($fs_repo:ident) => {
let Ok(fs_repo) = $fs_repo.opened().await else {
let _ =
send.send(Err(winfsp::FspError::IO(std::io::ErrorKind::NotFound)));
return;
Expand All @@ -299,6 +303,17 @@ impl winfsp::filesystem::FileSystemContext for Mount {
}
Err(err) => err!(send, err),
}
};
}
match &*repo {
spfs::storage::RepositoryHandle::FSWithMaybeRenders(fs_repo) => {
read_fs!(fs_repo);
}
spfs::storage::RepositoryHandle::FSWithRenders(fs_repo) => {
read_fs!(fs_repo);
}
spfs::storage::RepositoryHandle::FSWithoutRenders(fs_repo) => {
read_fs!(fs_repo);
}
repo => match repo.open_payload(digest).await {
Ok((stream, _)) => {
Expand Down
9 changes: 6 additions & 3 deletions crates/spfs/benches/spfs_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::time::Duration;

use criterion::{Criterion, Throughput, criterion_group, criterion_main};
use spfs::prelude::*;
use spfs::storage::fs::NoRenderStore;

pub fn commit_benchmark(c: &mut Criterion) {
const NUM_FILES: usize = 1024;
Expand Down Expand Up @@ -44,9 +45,11 @@ pub fn commit_benchmark(c: &mut Criterion) {
.expect("create a temp directory for spfs repo");
let repo: Arc<RepositoryHandle> = Arc::new(
tokio_runtime
.block_on(spfs::storage::fs::MaybeOpenFsRepository::create(
repo_path.path().join("repo"),
))
.block_on(
spfs::storage::fs::MaybeOpenFsRepository::<NoRenderStore>::create(
repo_path.path().join("repo"),
),
)
.expect("create spfs repo")
.into(),
);
Expand Down
5 changes: 3 additions & 2 deletions crates/spfs/src/bootstrap_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use super::build_shell_initialized_command;
use crate::fixtures::*;
use crate::resolve::which;
use crate::runtime;
use crate::storage::fs::RenderStore;

#[rstest]
#[case::bash("bash", "test.sh", "echo hi; export TEST_VALUE='spfs-test-value'")]
Expand All @@ -33,7 +34,7 @@ async fn test_shell_initialization_startup_scripts(
};
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::MaybeOpenFsRepository::create(&root)
crate::storage::fs::MaybeOpenFsRepository::<RenderStore>::create(&root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -118,7 +119,7 @@ async fn test_shell_initialization_no_startup_scripts(
};
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::MaybeOpenFsRepository::create(&root)
crate::storage::fs::MaybeOpenFsRepository::<RenderStore>::create(&root)
.await
.unwrap(),
);
Expand Down
66 changes: 52 additions & 14 deletions crates/spfs/src/clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::future::ready;
use std::num::NonZero;
#[cfg(unix)]
use std::os::linux::fs::MetadataExt;
use std::path::Path;

use chrono::{DateTime, Duration, Local, Utc};
use colored::Colorize;
Expand All @@ -20,7 +21,7 @@ use super::prune::PruneParameters;
use crate::io::Pluralize;
use crate::prelude::*;
use crate::runtime::makedirs_with_perms;
use crate::storage::fs::FsRepositoryOps;
use crate::storage::fs::{FsRepositoryOps, MaybeOpenFsRepository};
use crate::storage::{TagNamespace, TagNamespaceBuf};
use crate::{Digest, Error, Result, encoding, graph, storage, tracking};

Expand Down Expand Up @@ -664,11 +665,19 @@ where
/// This function should only be called once the discovery of all attached
/// objects has completed successfully and with no errors. Otherwise, it may
/// remove data that is still being used
async unsafe fn remove_unvisited_renders_and_proxies(&self) -> Result<CleanResult> {
async unsafe fn remove_unvisited_renders_and_proxies_on_repo<RS>(
&self,
repo: &MaybeOpenFsRepository<RS>,
) -> Result<CleanResult>
where
RS: storage::DefaultRenderStoreCreationPolicy
+ storage::RenderStoreForUser<RenderStore = RS>
+ storage::TryRenderStore
+ Send
+ Sync
+ 'static,
{
let mut result = CleanResult::default();
let storage::RepositoryHandle::FS(repo) = self.repo else {
return Ok(result);
};
let repo = repo.opened().await?;

result += match self
Expand All @@ -689,11 +698,16 @@ where
// therefore failing the whole clean attempt before any work is
// performed. The missing proxy directory is likely a symptom of
// some other problem elsewhere.
if !sub_repo.has_renders() {
#[cfg(feature = "sentry")]
tracing::error!(target: "sentry", %username, "Skipping clean of user's renders (NoRenderStorage)");
continue;
}
//
// TODO: simulate this scenario in a test and make it not error if
// the proxy directory is missing; still want to clean any renders
// belonging to the user.
//
//if !sub_repo.has_renders() {
// #[cfg(feature = "sentry")]
// tracing::error!(target: "sentry", %username, "Skipping clean of user's renders (NoRenderStorage)");
// continue;
//}

result += self
.remove_unvisited_renders_and_proxies_for_storage(Some(username.clone()), sub_repo)
Expand All @@ -702,6 +716,28 @@ where
Ok(result)
}

/// # Safety
/// This function should only be called once the discovery of all attached
/// objects has completed successfully and with no errors. Otherwise, it may
/// remove data that is still being used
async unsafe fn remove_unvisited_renders_and_proxies(&self) -> Result<CleanResult> {
match self.repo {
storage::RepositoryHandle::FSWithMaybeRenders(repo) => unsafe {
// Convert this repo into one that will not create renders
// on demand. We only want to clean existing renders.
let repo = repo.clone().without_render_creation();

self.remove_unvisited_renders_and_proxies_on_repo(&repo)
.await
},
storage::RepositoryHandle::FSWithRenders(repo) => unsafe {
self.remove_unvisited_renders_and_proxies_on_repo(repo)
.await
},
_ => Ok(CleanResult::default()),
}
}

async fn remove_unvisited_renders_and_proxies_for_storage(
&self,
username: Option<String>,
Expand Down Expand Up @@ -744,7 +780,7 @@ where
drop(stream);

if let Some(proxy_path) = repo.proxy_path() {
result += self.clean_proxies(username, proxy_path.to_owned()).await?;
result += self.clean_proxies(username, &proxy_path).await?;
}
Ok(result)
}
Expand All @@ -754,7 +790,7 @@ where
async fn clean_proxies(
&self,
username: Option<String>,
proxy_path: std::path::PathBuf,
proxy_path: &Path,
) -> Result<CleanResult> {
let mut result = CleanResult::default();
let removed = result.removed_proxies.entry(username).or_default();
Expand Down Expand Up @@ -811,8 +847,10 @@ where
let future = async move {
if !self.dry_run {
tracing::trace!(?path, "removing proxy render");
storage::fs::OpenFsRepository::remove_dir_atomically(&path, &workdir)
.await?;
storage::fs::OpenFsRepository::<storage::fs::RenderStore>::remove_dir_atomically(
&path, &workdir,
)
.await?;
}
Ok(digest)
};
Expand Down
Loading
Loading