From ab28ed70305c61d97dfa28436afb4b0e747383a4 Mon Sep 17 00:00:00 2001 From: 0x5459 <0x5459@protonmail.com> Date: Thu, 31 Aug 2023 17:53:45 +0800 Subject: [PATCH] feat(store): store minor refactor --- damocles-manager/core/api.go | 2 + damocles-manager/core/client_gen.go | 4 + damocles-manager/go.mod | 2 +- damocles-manager/go.sum | 4 +- damocles-manager/modules/impl/mock/sealer.go | 40 ++++++--- damocles-manager/modules/sealer/sealer.go | 12 +++ damocles-manager/modules/sealer/sealer_cli.go | 10 ++- .../pkg/objstore/filestore/store.go | 27 ++++-- damocles-manager/pkg/objstore/mock_store.go | 4 +- .../src/infra/objstore/filestore.rs | 69 ++++---------- damocles-worker/src/infra/objstore/mod.rs | 6 +- damocles-worker/src/rpc/sealer/mod.rs | 3 + damocles-worker/src/run.rs | 20 +++-- damocles-worker/src/sealing/mod.rs | 3 +- .../src/sealing/sealing_thread/mod.rs | 2 +- .../sealing_thread/planner/common/sealing.rs | 16 ++-- .../sealing/sealing_thread/planner/snapup.rs | 32 ++++--- .../sealing/sealing_thread/planner/unseal.rs | 30 ++++--- .../sealing/sealing_thread/planner/wdpost.rs | 25 +++--- .../src/sealing/sealing_thread/util.rs | 5 +- .../src/builtin/processors/transfer/mod.rs | 45 ++++++---- .../src/builtin/processors/transfer/tests.rs | 90 ++++--------------- .../vc-processors/src/builtin/tasks.rs | 6 +- 23 files changed, 231 insertions(+), 226 deletions(-) diff --git a/damocles-manager/core/api.go b/damocles-manager/core/api.go index c0b528f94..ec2330f1b 100644 --- a/damocles-manager/core/api.go +++ b/damocles-manager/core/api.go @@ -83,6 +83,8 @@ type SealerAPI interface { AllocateUnsealSector(ctx context.Context, spec AllocateSectorSpec) (*SectorUnsealInfo, error) AchieveUnsealSector(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid, errInfo string) (Meta, error) AcquireUnsealDest(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid) ([]string, error) + + StoreUri(ctx context.Context, storeName string, resource string) (string, error) } type SealerCliAPI interface { diff --git a/damocles-manager/core/client_gen.go b/damocles-manager/core/client_gen.go index 0e3132545..9cc782899 100644 --- a/damocles-manager/core/client_gen.go +++ b/damocles-manager/core/client_gen.go @@ -38,6 +38,7 @@ type SealerAPIClient struct { AllocateUnsealSector func(ctx context.Context, spec AllocateSectorSpec) (*SectorUnsealInfo, error) AchieveUnsealSector func(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid, errInfo string) (Meta, error) AcquireUnsealDest func(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid) ([]string, error) + StoreUri func(ctx context.Context, storeName, resource []string) (map[string]string, error) } var UnavailableSealerAPIClient = SealerAPIClient{ @@ -108,6 +109,9 @@ var UnavailableSealerAPIClient = SealerAPIClient{ AcquireUnsealDest: func(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid) ([]string, error) { panic("SealerAPI client unavailable") }, + StoreUri: func(ctx context.Context, storeName, resource []string) (map[string]string, error) { + panic("SealerAPI client unavailable") + }, } // SealerCliAPIClient is generated client for SealerCliAPI interface. diff --git a/damocles-manager/go.mod b/damocles-manager/go.mod index 5c29cb832..8131fc7bb 100644 --- a/damocles-manager/go.mod +++ b/damocles-manager/go.mod @@ -27,7 +27,7 @@ require ( github.com/golang/mock v1.6.0 github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026 github.com/hashicorp/go-multierror v1.1.1 - github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230830062024-608c68ada10e + github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230831091846-caeb4acea7d7 github.com/ipfs-force-community/venus-cluster-assets v0.1.0 github.com/ipfs/go-cid v0.3.2 github.com/ipfs/go-datastore v0.6.0 diff --git a/damocles-manager/go.sum b/damocles-manager/go.sum index 67a5e7780..5060eec10 100644 --- a/damocles-manager/go.sum +++ b/damocles-manager/go.sum @@ -589,8 +589,8 @@ github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6 h1:8UsGZ2rr2ksmEru6lTo github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6/go.mod h1:xQig96I1VNBDIWGCdTt54nHt6EeI639SmHycLYL7FkA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= -github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230830062024-608c68ada10e h1:SEmUD7xCpHWlnTrZdyj++RExTy0T6bqX4yS/iXBHVAg= -github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230830062024-608c68ada10e/go.mod h1:me1u2cl7qdxBCZiVL0laDop8uBHDdUwlUNnQ7KkHF64= +github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230831091846-caeb4acea7d7 h1:tgh0JH97wqkDaFu8TKAqi7eQucm0gyPG0f4uaj9rLWM= +github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230831091846-caeb4acea7d7/go.mod h1:me1u2cl7qdxBCZiVL0laDop8uBHDdUwlUNnQ7KkHF64= github.com/ipfs-force-community/go-jsonrpc v0.1.7-0.20230220074347-8db78dbc20d4 h1:iu/3irYevdNpdc0B/gRi1vuS3+lRn+6Ro9G0FeBiAfE= github.com/ipfs-force-community/go-jsonrpc v0.1.7-0.20230220074347-8db78dbc20d4/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM= github.com/ipfs-force-community/venus-cluster-assets v0.1.0 h1:K/0+OV9Jm7HjSa7O9MAtgfLDIudQYZUTymhJsp8rGXg= diff --git a/damocles-manager/modules/impl/mock/sealer.go b/damocles-manager/modules/impl/mock/sealer.go index c0c059d9a..a9d11cec7 100644 --- a/damocles-manager/modules/impl/mock/sealer.go +++ b/damocles-manager/modules/impl/mock/sealer.go @@ -18,6 +18,7 @@ import ( "github.com/ipfs-force-community/damocles/damocles-manager/core" "github.com/ipfs-force-community/damocles/damocles-manager/modules" chainAPI "github.com/ipfs-force-community/damocles/damocles-manager/pkg/chain" + "github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore" "github.com/ipfs-force-community/damocles/damocles-manager/ver" ) @@ -25,24 +26,27 @@ var _ core.SealerAPI = (*Sealer)(nil) func NewSealer(rand core.RandomnessAPI, sector core.SectorManager, deal core.DealManager, commit core.CommitmentManager, api chainAPI.API, scfg modules.SafeConfig, + persistedStoreManager objstore.Manager, ) (*Sealer, error) { return &Sealer{ - rand: rand, - sector: sector, - deal: deal, - commit: commit, - api: api, - scfg: scfg, + rand: rand, + sector: sector, + deal: deal, + commit: commit, + api: api, + scfg: scfg, + persistedStoreManager: persistedStoreManager, }, nil } type Sealer struct { - rand core.RandomnessAPI - sector core.SectorManager - deal core.DealManager - commit core.CommitmentManager - api chainAPI.API - scfg modules.SafeConfig + rand core.RandomnessAPI + sector core.SectorManager + deal core.DealManager + commit core.CommitmentManager + api chainAPI.API + scfg modules.SafeConfig + persistedStoreManager objstore.Manager } func (s *Sealer) AllocateSector(ctx context.Context, spec core.AllocateSectorSpec) (*core.AllocatedSector, error) { @@ -315,3 +319,15 @@ func (s *Sealer) AcquireUnsealDest(ctx context.Context, sid abi.SectorID, pieceC func (s *Sealer) Version(context.Context) (string, error) { return ver.VersionStr(), nil } + +func (s *Sealer) StoreUri(ctx context.Context, storeName string, resource string) (string, error) { + store, err := s.persistedStoreManager.GetInstance(ctx, storeName) + if err != nil { + return "", err + } + uri, err := store.Uri(ctx, resource) + if err != nil { + return "", fmt.Errorf("get uri(%s): %w", resource, err) + } + return uri, nil +} diff --git a/damocles-manager/modules/sealer/sealer.go b/damocles-manager/modules/sealer/sealer.go index f52c581b0..6787dc4d9 100644 --- a/damocles-manager/modules/sealer/sealer.go +++ b/damocles-manager/modules/sealer/sealer.go @@ -833,3 +833,15 @@ func (s *Sealer) AchieveUnsealSector(ctx context.Context, sid abi.SectorID, piec func (s *Sealer) AcquireUnsealDest(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid) ([]string, error) { return s.unseal.AcquireDest(ctx, sid, pieceCid) } + +func (s *Sealer) StoreUri(ctx context.Context, storeName string, resource string) (string, error) { + store, err := s.sectorIdxer.StoreMgr().GetInstance(ctx, storeName) + if err != nil { + return "", err + } + uri, err := store.Uri(ctx, resource) + if err != nil { + return "", fmt.Errorf("get uri(%s): %w", resource, err) + } + return uri, nil +} diff --git a/damocles-manager/modules/sealer/sealer_cli.go b/damocles-manager/modules/sealer/sealer_cli.go index 360d1318e..c452accd7 100644 --- a/damocles-manager/modules/sealer/sealer_cli.go +++ b/damocles-manager/modules/sealer/sealer_cli.go @@ -258,13 +258,19 @@ func (s *Sealer) RemoveSector(ctx context.Context, sid abi.SectorID) error { sealed = util.SectorPath(util.SectorPathTypeSealed, state.ID) } - cachePath := cacheDir.FullPath(ctx, cache) + cachePath, err := cacheDir.Uri(ctx, cache) + if err != nil { + return fmt.Errorf("get uri(%s): %w", cache, err) + } err = os.RemoveAll(cachePath) if err != nil { return fmt.Errorf("remove cache: %w", err) } - sealedPath := sealedFile.FullPath(ctx, sealed) + sealedPath, err := sealedFile.Uri(ctx, sealed) + if err != nil { + return fmt.Errorf("get uri(%s): %w", sealed, err) + } err = os.Remove(sealedPath) if err != nil { return fmt.Errorf("remove sealed file: %w", err) diff --git a/damocles-manager/pkg/objstore/filestore/store.go b/damocles-manager/pkg/objstore/filestore/store.go index 7c2a3e734..c76f71a4b 100644 --- a/damocles-manager/pkg/objstore/filestore/store.go +++ b/damocles-manager/pkg/objstore/filestore/store.go @@ -215,7 +215,7 @@ func (s *Store) Del(ctx context.Context, p string) error { return objstore.ErrReadOnlyStore } - fpath, err := s.getAbsPath(p) + fpath, err := s.getAbsPath(ctx, p) if err != nil { return err } @@ -235,7 +235,14 @@ func (s *Store) Stat(ctx context.Context, p string) (objstore.Stat, error) { var res statOrErr - finfo, err := os.Stat(s.FullPath(ctx, p)) + p, err := s.getAbsPath(ctx, p) + if err != nil { + res.Err = fmt.Errorf("getAbsPath(%s): %w", p, err) + resCh <- res + return + } + + finfo, err := os.Stat(p) if err == nil { res.Stat.Size = finfo.Size() } else { @@ -254,14 +261,18 @@ func (s *Store) Stat(ctx context.Context, p string) (objstore.Stat, error) { } } -func (s *Store) getAbsPath(p string) (string, error) { - fpath, err := filepath.Abs(filepath.Join(s.cfg.Path, p)) +func (s *Store) getAbsPath(ctx context.Context, resourceName string) (string, error) { + uri, err := s.Uri(ctx, resourceName) + if err != nil { + return "", fmt.Errorf("get uri(%s): %w", resourceName, objstore.ErrInvalidObjectPath) + } + fpath, err := filepath.Abs(filepath.Join(s.cfg.Path, uri)) if err != nil { - return "", fmt.Errorf("obj %s: %w", p, objstore.ErrInvalidObjectPath) + return "", fmt.Errorf("obj %s: %w", uri, objstore.ErrInvalidObjectPath) } if !strings.HasPrefix(fpath, s.cfg.Path) { - return "", fmt.Errorf("obj %s: %w: outside of the dir", p, objstore.ErrInvalidObjectPath) + return "", fmt.Errorf("obj %s: %w: outside of the dir", uri, objstore.ErrInvalidObjectPath) } return fpath, nil @@ -287,6 +298,6 @@ func (s *Store) Put(ctx context.Context, p string, r io.Reader) (int64, error) { return io.Copy(file, r) } -func (s *Store) FullPath(ctx context.Context, sub string) string { - return filepath.Join(s.cfg.Path, sub) +func (s *Store) Uri(ctx context.Context, resourceName string) (string, error) { + return resourceName, nil } diff --git a/damocles-manager/pkg/objstore/mock_store.go b/damocles-manager/pkg/objstore/mock_store.go index a521498e9..0cfeebcae 100644 --- a/damocles-manager/pkg/objstore/mock_store.go +++ b/damocles-manager/pkg/objstore/mock_store.go @@ -144,6 +144,6 @@ func (ms *MockStore) Put(ctx context.Context, p string, r io.Reader) (int64, err return written, nil } -func (ms *MockStore) FullPath(ctx context.Context, p string) string { - return p +func (ms *MockStore) Uri(ctx context.Context, p string) (string, error) { + return p, nil } diff --git a/damocles-worker/src/infra/objstore/filestore.rs b/damocles-worker/src/infra/objstore/filestore.rs index 84b194968..974155663 100644 --- a/damocles-worker/src/infra/objstore/filestore.rs +++ b/damocles-worker/src/infra/objstore/filestore.rs @@ -1,21 +1,22 @@ //! ObjectStore implemented based on fs +use std::collections::HashMap; use std::fs::create_dir_all; use std::path::{Path, PathBuf, MAIN_SEPARATOR}; +use std::sync::Arc; use anyhow::{anyhow, Context, Result}; +use jsonrpc_core::futures_util::{FutureExt, TryFutureExt}; -use super::{ObjResult, ObjectStore}; -use crate::logging::trace; - -const LOG_TARGET: &str = "filestore"; +use super::{ObjResult, ObjectStore, ObjectStoreError}; +use crate::{logging::trace, rpc::sealer::SealerClient, sealing::call_rpc}; /// FileStore pub struct FileStore { - sep: String, local_path: PathBuf, instance: String, readonly: bool, + rpc: Arc, } impl FileStore { @@ -27,7 +28,7 @@ impl FileStore { } /// open the file store at given path - pub fn open>(p: P, ins: Option, readonly: bool) -> Result { + pub fn open>(p: P, ins: Option, readonly: bool, rpc: Arc) -> Result { let dir_path = p.as_ref().canonicalize().context("canonicalize dir path")?; if !dir_path.metadata().context("read dir metadata").map(|meta| meta.is_dir())? { return Err(anyhow!("base path of the file store should a dir")); @@ -39,32 +40,12 @@ impl FileStore { }; Ok(FileStore { - sep: MAIN_SEPARATOR.to_string(), local_path: dir_path, instance, readonly, + rpc, }) } - - fn path>(&self, sub: P) -> ObjResult { - let mut p = sub.as_ref(); - if p.starts_with(".") { - return Err(anyhow!("sub path starts with dot").into()); - } - - // try to strip the first any only the first sep - if let Ok(strip) = p.strip_prefix(&self.sep) { - p = strip; - } - - if p.starts_with(&self.sep) { - return Err(anyhow!("sub path starts with separator").into()); - } - - let res = self.local_path.join(p); - trace!(target: LOG_TARGET, ?res, "get full path"); - Ok(res) - } } impl ObjectStore for FileStore { @@ -72,34 +53,18 @@ impl ObjectStore for FileStore { self.instance.clone() } - fn uri(&self, rel: &Path) -> ObjResult { - self.path(rel) + fn uri(&self, resource_name: &str) -> ObjResult { + let uri = call_rpc! { + self.rpc=>store_uri( + self.instance(), + resource_name.to_string(), + ) + } + .map_err(|e| ObjectStoreError::Other(e.1))?; + Ok(self.local_path.join(uri).display().to_string()) } fn readonly(&self) -> bool { self.readonly } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_path() { - let fs = FileStore::open("/tmp", None, false).unwrap(); - assert_eq!(fs.path("/a/b").unwrap(), PathBuf::from("/tmp/a/b")); - assert_eq!(fs.path("a/b").unwrap(), PathBuf::from("/tmp/a/b")); - assert_eq!(fs.path("a/b/").unwrap(), PathBuf::from("/tmp/a/b")); - assert_eq!(fs.path("/a/b/").unwrap(), PathBuf::from("/tmp/a/b")); - } - - #[test] - fn test_store_uri() { - let fs = FileStore::open("/tmp/", Some("test_store".to_string()), false).unwrap(); - assert_eq!(fs.uri(Path::new("a/b")).unwrap(), PathBuf::from("/tmp/a/b")); - assert_eq!(fs.uri(Path::new("/a/b")).unwrap(), PathBuf::from("/tmp/a/b")); - assert_eq!(fs.uri(Path::new("a/b/")).unwrap(), PathBuf::from("/tmp/a/b")); - assert_eq!(fs.uri(Path::new("/a/b/")).unwrap(), PathBuf::from("/tmp/a/b")); - } -} diff --git a/damocles-worker/src/infra/objstore/mod.rs b/damocles-worker/src/infra/objstore/mod.rs index af8a24683..787603e4c 100644 --- a/damocles-worker/src/infra/objstore/mod.rs +++ b/damocles-worker/src/infra/objstore/mod.rs @@ -1,5 +1,6 @@ //! abstractions & implementations for object store +use std::collections::HashMap; use std::error::Error; use std::fmt; use std::io; @@ -69,7 +70,10 @@ pub trait ObjectStore: Send + Sync { /// unique identifier of the given resource. /// for fs-like stores, this should return an abs path. /// for other stores, this may return a url, or path part of a url. - fn uri(&self, resource: &Path) -> ObjResult; + /// + /// the resource value looks like this: + /// - "cache/sc-02-data-tree-r-last.dat" + fn uri(&self, resource: &str) -> ObjResult; /// if this instance is read-only fn readonly(&self) -> bool; diff --git a/damocles-worker/src/rpc/sealer/mod.rs b/damocles-worker/src/rpc/sealer/mod.rs index 086264704..7bd8f18f0 100644 --- a/damocles-worker/src/rpc/sealer/mod.rs +++ b/damocles-worker/src/rpc/sealer/mod.rs @@ -537,4 +537,7 @@ pub trait Sealer { #[rpc(name = "Venus.WdPoStFinishJob")] fn wdpost_finish(&self, job_id: String, output: Option, error_reason: String) -> Result<()>; + + #[rpc(name = "Venus.StoreUri")] + fn store_uri(&self, store_name: String, resources: String) -> Result; } diff --git a/damocles-worker/src/run.rs b/damocles-worker/src/run.rs index f6921791a..82a1b4063 100644 --- a/damocles-worker/src/run.rs +++ b/damocles-worker/src/run.rs @@ -50,9 +50,11 @@ pub fn start_daemon(cfg_path: impl AsRef) -> Result<()> { "rpc dial info" ); - let rpc_client: SealerClient = runtime - .block_on(async { http::connect(&dial_addr).await }) - .map_err(|e| anyhow!("jsonrpc connect to {}: {:?}", &dial_addr, e))?; + let rpc_client: Arc = Arc::new( + runtime + .block_on(async { http::connect(&dial_addr).await }) + .map_err(|e| anyhow!("jsonrpc connect to {}: {:?}", &dial_addr, e))?, + ); let mut attached: Vec> = Vec::new(); let mut attached_writable = 0; @@ -62,6 +64,7 @@ pub fn start_daemon(cfg_path: impl AsRef) -> Result<()> { remote_cfg.location.clone(), remote_cfg.name.clone(), remote_cfg.readonly.unwrap_or(false), + rpc_client.clone(), ) .with_context(|| format!("open remote filestore {}", remote_cfg.location))?, ); @@ -76,8 +79,13 @@ pub fn start_daemon(cfg_path: impl AsRef) -> Result<()> { if let Some(attach_cfgs) = cfg.attached.as_ref() { for (sidx, scfg) in attach_cfgs.iter().enumerate() { let attached_store = Box::new( - FileStore::open(scfg.location.clone(), scfg.name.clone(), scfg.readonly.unwrap_or(false)) - .with_context(|| format!("open attached filestore #{}", sidx))?, + FileStore::open( + scfg.location.clone(), + scfg.name.clone(), + scfg.readonly.unwrap_or(false), + rpc_client.clone(), + ) + .with_context(|| format!("open attached filestore #{}", sidx))?, ); if !attached_store.readonly() { @@ -174,7 +182,7 @@ pub fn start_daemon(cfg_path: impl AsRef) -> Result<()> { let rt = Arc::new(runtime); let global = GlobalModules { - rpc: Arc::new(rpc_client), + rpc: rpc_client, attached: Arc::new(attached_mgr), processors: Arc::new(processors), static_tree_d, diff --git a/damocles-worker/src/sealing/mod.rs b/damocles-worker/src/sealing/mod.rs index 7362b31ab..920691088 100644 --- a/damocles-worker/src/sealing/mod.rs +++ b/damocles-worker/src/sealing/mod.rs @@ -11,4 +11,5 @@ pub mod util; mod config; mod sealing_thread; -pub(crate) use sealing_thread::{build_sealing_threads, CtrlProcessor}; +pub(crate) use failure::IntoFailure; +pub(crate) use sealing_thread::{build_sealing_threads, call_rpc, CtrlProcessor}; diff --git a/damocles-worker/src/sealing/sealing_thread/mod.rs b/damocles-worker/src/sealing/sealing_thread/mod.rs index bb0bbf2ee..98ab85ffe 100644 --- a/damocles-worker/src/sealing/sealing_thread/mod.rs +++ b/damocles-worker/src/sealing/sealing_thread/mod.rs @@ -23,7 +23,7 @@ pub mod entry; #[macro_use] mod util; -use util::*; +pub use util::*; mod ctrl; diff --git a/damocles-worker/src/sealing/sealing_thread/planner/common/sealing.rs b/damocles-worker/src/sealing/sealing_thread/planner/common/sealing.rs index 8c15df97c..a7521905b 100644 --- a/damocles-worker/src/sealing/sealing_thread/planner/common/sealing.rs +++ b/damocles-worker/src/sealing/sealing_thread/planner/common/sealing.rs @@ -485,20 +485,22 @@ pub(crate) fn persist_sector_files(task: &Task, cache_dir: Entry, sealed_file: E let transfer_routes = wanted .into_iter() .map(|p| { - let rel_path = p.rel(); + let rel_path = p + .rel() + .to_str() + .with_context(|| format!("invalid path: {}", p.rel().display())) + .crit()?; Ok(TransferRoute { // local - src: TransferItem { - store_name: None, - uri: p.full().to_owned(), - }, + src: TransferItem::Local(p.full().to_owned()), // persist store - dest: TransferItem { - store_name: Some(ins_name.clone()), + dest: TransferItem::Store { + store: ins_name.clone(), uri: persist_store .uri(rel_path) .with_context(|| format!("get uri for {:?}", rel_path)) .perm()?, + resource_name: rel_path.to_string(), }, opt: None, }) diff --git a/damocles-worker/src/sealing/sealing_thread/planner/snapup.rs b/damocles-worker/src/sealing/sealing_thread/planner/snapup.rs index f82448772..d66ebced2 100644 --- a/damocles-worker/src/sealing/sealing_thread/planner/snapup.rs +++ b/damocles-worker/src/sealing/sealing_thread/planner/snapup.rs @@ -195,7 +195,11 @@ impl<'t> SnapUp<'t> { // sealed file & persisted cache files should be accessed inside persist store let sealed_file = self.task.sealed_file(sector_id); sealed_file.prepare().perm()?; - let sealed_rel = sealed_file.rel(); + let sealed_rel = sealed_file + .rel() + .to_str() + .with_context(|| format!("invalid path: {}", sealed_file.rel().display())) + .crit()?; let cache_dir = self.task.cache_dir(sector_id); @@ -203,20 +207,22 @@ impl<'t> SnapUp<'t> { .into_iter() .map(|fname| { let cached_file = cache_dir.join(fname); - let cached_rel = cached_file.rel(); + let cached_rel = cached_file + .rel() + .to_str() + .with_context(|| format!("invalid cached_file path: {}", cached_file.rel().display())) + .crit()?; Ok(TransferRoute { - src: TransferItem { - store_name: Some(access_instance.clone()), + src: TransferItem::Store { + store: access_instance.clone(), uri: access_store .uri(cached_rel) .with_context(|| format!("get uri for cache dir {:?} in {}", cached_rel, access_instance)) .perm()?, + resource_name: cached_rel.to_string(), }, - dest: TransferItem { - store_name: None, - uri: cached_file.full().clone(), - }, + dest: TransferItem::Local(cached_file.full().clone()), opt: Some(TransferOption { is_dir: false, allow_link: true, @@ -226,17 +232,15 @@ impl<'t> SnapUp<'t> { .collect::, Failure>>()?; let mut transfer_routes = vec![TransferRoute { - src: TransferItem { - store_name: Some(access_instance.clone()), + src: TransferItem::Store { + store: access_instance.clone(), uri: access_store .uri(sealed_rel) .with_context(|| format!("get uri for sealed file {:?} in {}", sealed_rel, access_instance)) .perm()?, + resource_name: sealed_rel.to_string(), }, - dest: TransferItem { - store_name: None, - uri: sealed_file.full().clone(), - }, + dest: TransferItem::Local(sealed_file.full().clone()), opt: Some(TransferOption { is_dir: false, allow_link: true, diff --git a/damocles-worker/src/sealing/sealing_thread/planner/unseal.rs b/damocles-worker/src/sealing/sealing_thread/planner/unseal.rs index 08708707c..116e3d344 100644 --- a/damocles-worker/src/sealing/sealing_thread/planner/unseal.rs +++ b/damocles-worker/src/sealing/sealing_thread/planner/unseal.rs @@ -133,10 +133,18 @@ impl<'t> Unseal<'t> { .perm()?; let sealed_temp = self.task.sealed_file(sector_id); - let sealed_rel = sealed_temp.rel(); + let sealed_rel = sealed_temp + .rel() + .to_str() + .with_context(|| format!("invalid sealed temp path: {}", sealed_temp.rel().display())) + .crit()?; let cache_temp = self.task.cache_dir(sector_id); - let cache_rel = cache_temp.rel(); + let cache_rel = cache_temp + .rel() + .to_str() + .with_context(|| format!("invalid cache temp path: {}", cache_temp.rel().display())) + .crit()?; let sealed_path = instance .uri(sealed_rel) @@ -178,8 +186,8 @@ impl<'t> Unseal<'t> { sector_id, comm_d: unseal_info.comm_d, ticket: ticket.ticket.0, - cache_dir: cache_path, - sealed_file: sealed_path, + cache_dir: PathBuf::from(cache_path), + sealed_file: PathBuf::from(sealed_path), unsealed_output: piece_file.into(), offset: UnpaddedByteIndex(unseal_info.offset), num_bytes: UnpaddedBytesAmount(unseal_info.size), @@ -286,7 +294,7 @@ impl<'t> Unseal<'t> { if p.is_empty() { return Err(anyhow!("path not found in {}", raw_url)).perm(); } - let des_path = PathBuf::from(p); + let des_path = p; match access_instance { Some(ins_name) => { @@ -309,13 +317,11 @@ impl<'t> Unseal<'t> { .perm()?; let transfer_routes = vec![TransferRoute { - src: TransferItem { - store_name: None, - uri: piece_file.full().clone(), - }, - dest: TransferItem { - store_name: Some(ins_name.clone()), - uri: access_store.uri(&des_path).perm()?, + src: TransferItem::Local(piece_file.full().clone()), + dest: TransferItem::Store { + store: ins_name.clone(), + uri: access_store.uri(des_path).perm()?, + resource_name: des_path.to_string(), }, opt: None, }]; diff --git a/damocles-worker/src/sealing/sealing_thread/planner/wdpost.rs b/damocles-worker/src/sealing/sealing_thread/planner/wdpost.rs index 709cbfa98..fba413ad5 100644 --- a/damocles-worker/src/sealing/sealing_thread/planner/wdpost.rs +++ b/damocles-worker/src/sealing/sealing_thread/planner/wdpost.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::fmt::Display; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -392,28 +393,30 @@ impl WdPost<'_> { } else { paths::sealed_file(sector_id) }; - let sealed_path = instances[§or.accesses.sealed_file].uri(&sealed_file).with_context(|| { - format!( - "get uri for sealed file {} in {}", - sealed_file.display(), - sector.accesses.sealed_file - ) - })?; + let sealed_file = sealed_file + .to_str() + .with_context(|| format!("invalid sealed file path: {}", sealed_file.display()))?; + let sealed_path = instances[§or.accesses.sealed_file] + .uri(sealed_file) + .with_context(|| format!("get uri for sealed file {} in {}", sealed_file, sector.accesses.sealed_file))?; let cache_dir = if sector.upgrade { paths::update_cache_dir(sector_id) } else { paths::cache_dir(sector_id) }; + let cache_dir = cache_dir + .to_str() + .with_context(|| format!("invalid cache dir path: {}", cache_dir.display()))?; let cache_path = instances[§or.accesses.cache_dir] - .uri(&cache_dir) - .with_context(|| format!("get uri for cache file {} in {}", cache_dir.display(), sector.accesses.cache_dir))?; + .uri(cache_dir) + .with_context(|| format!("get uri for cache file {} in {}", cache_dir, sector.accesses.cache_dir))?; let sector_id = sector.sector_id; let replica = PoStReplicaInfo { sector_id, comm_r: sector.comm_r, - cache_dir: cache_path, - sealed_file: sealed_path, + cache_dir: PathBuf::from(cache_path), + sealed_file: PathBuf::from(sealed_path), }; Ok(replica) }) diff --git a/damocles-worker/src/sealing/sealing_thread/util.rs b/damocles-worker/src/sealing/sealing_thread/util.rs index 3a76d9279..7050f3c70 100644 --- a/damocles-worker/src/sealing/sealing_thread/util.rs +++ b/damocles-worker/src/sealing/sealing_thread/util.rs @@ -16,20 +16,21 @@ macro_rules! call_rpc { }; ($client:expr=>$method:ident($($arg:expr,)*)) => { { + use crate::sealing::IntoFailure; + call_rpc!(raw, $client=>$method($($arg,)*)).map_err(|e| { if let jsonrpc_core_client::RpcError::JsonRpcError(ref je) = e { if je.code == jsonrpc_core::types::error::ErrorCode::ServerError(crate::rpc::APIErrCode::SectorStateNotFound as i64) { return anyhow::anyhow!("from error code: sector state not found, with msg: {}", je.message).abort() } } - anyhow::anyhow!("rpc error: {}", e).temp() }) } }; } -pub(super) use call_rpc; +pub(crate) use call_rpc; macro_rules! field_required { ($name:ident, $ex:expr) => { diff --git a/damocles-worker/vc-processors/src/builtin/processors/transfer/mod.rs b/damocles-worker/vc-processors/src/builtin/processors/transfer/mod.rs index 07e97156e..be081d866 100644 --- a/damocles-worker/vc-processors/src/builtin/processors/transfer/mod.rs +++ b/damocles-worker/vc-processors/src/builtin/processors/transfer/mod.rs @@ -1,11 +1,13 @@ use std::fs::{create_dir_all, remove_dir_all, remove_file, OpenOptions}; use std::io::copy; use std::os::unix::fs::symlink; -use std::path::Path; +use std::path::{Path, PathBuf}; use anyhow::{anyhow, Context, Result}; use tracing::info; +use crate::builtin::tasks::TransferItem; + use super::TransferRoute; #[cfg(test)] @@ -16,48 +18,57 @@ pub fn do_transfer(route: &TransferRoute) -> Result<()> { } pub fn do_transfer_inner(route: &TransferRoute, disable_link: bool) -> Result<()> { - if route.src.uri.is_relative() { + let src = match &route.src { + TransferItem::Store { uri, .. } => PathBuf::from(uri), + TransferItem::Local(p) => p.clone(), + }; + + let dest = match &route.dest { + TransferItem::Store { uri, .. } => PathBuf::from(uri), + TransferItem::Local(p) => p.clone(), + }; + + if src.is_relative() { return Err(anyhow!("src path is relative")); } - if !route.src.uri.exists() { - return Err(anyhow!("src path not exists: {}", route.src.uri.display())); + if !src.exists() { + return Err(anyhow!("src path not exists: {}", dest.display())); } - let src_is_dir = route.src.uri.is_dir(); + let src_is_dir = src.is_dir(); - if route.dest.uri.is_relative() { + if dest.is_relative() { return Err(anyhow!("dest path is relative")); } - if route.dest.uri.exists() { - let dest_is_dir = route.dest.uri.is_dir(); + if dest.exists() { + let dest_is_dir = dest.is_dir(); if src_is_dir != dest_is_dir { return Err(anyhow!("dest entry type is different with src, is_dir={}", src_is_dir)); } if dest_is_dir { - remove_dir_all(&route.dest.uri).context("remove exist dest dir")?; + remove_dir_all(&dest).context("remove exist dest dir")?; } else { - remove_file(&route.dest.uri).context("remove exist dest file")?; + remove_file(&dest).context("remove exist dest file")?; } } if !disable_link { if let Some(true) = route.opt.as_ref().map(|opt| opt.allow_link) { - link_entry(&route.src.uri, &route.dest.uri).context("link entry")?; - info!(src=?&route.src.uri, dest=?&route.dest.uri, "entry linked"); + link_entry(&src, &dest).context("link entry")?; + info!(src=?src.display(), dest=?dest.display(), "entry linked"); return Ok(()); } } if src_is_dir { - copy_dir(&route.src.uri, &route.dest.uri).with_context(|| format!("transfer dir {:?} to {:?}", &route.src.uri, &route.dest.uri))?; - info!(src=?&route.src.uri, dest=?&route.dest.uri, "dir copied"); + copy_dir(&src, &dest).with_context(|| format!("transfer dir {:?} to {:?}", &src, &dest))?; + info!(src = ?src.display(), dest = ?dest.display(), "dir copied"); } else { - let size = copy_file(&route.src.uri, &route.dest.uri) - .with_context(|| format!("transfer file {:?} to {:?}", &route.src.uri, &route.dest.uri))?; - info!(src=?&route.src.uri, dest=?&route.dest.uri, size, "file copied"); + let size = copy_file(&src, &dest).with_context(|| format!("transfer file {:?} to {:?}", &src, &dest))?; + info!(src=?&src, dest=?&dest, size, "file copied"); } Ok(()) diff --git a/damocles-worker/vc-processors/src/builtin/processors/transfer/tests.rs b/damocles-worker/vc-processors/src/builtin/processors/transfer/tests.rs index f0baa51b1..3e77d5fd3 100644 --- a/damocles-worker/vc-processors/src/builtin/processors/transfer/tests.rs +++ b/damocles-worker/vc-processors/src/builtin/processors/transfer/tests.rs @@ -76,14 +76,8 @@ fn transfer_failure_test() { let rel_dest = "dest/dir"; let res = do_transfer(&TransferRoute { - src: TransferItem { - store_name: None, - uri: rel_src.into(), - }, - dest: TransferItem { - store_name: None, - uri: rel_dest.into(), - }, + src: TransferItem::Local(rel_src.into()), + dest: TransferItem::Local(rel_dest.into()), opt: None, }); assert!( @@ -93,28 +87,16 @@ fn transfer_failure_test() { let src_path = tmp.0.join(rel_src); let res = do_transfer(&TransferRoute { - src: TransferItem { - store_name: None, - uri: src_path.clone(), - }, - dest: TransferItem { - store_name: None, - uri: rel_dest.into(), - }, + src: TransferItem::Local(src_path.clone()), + dest: TransferItem::Local(rel_dest.into()), opt: None, }); assert!(res.unwrap_err().to_string().contains("src path not exists"), "src path not exists"); tmp.create_dir(rel_src); let res = do_transfer(&TransferRoute { - src: TransferItem { - store_name: None, - uri: src_path.clone(), - }, - dest: TransferItem { - store_name: None, - uri: "b/dir".into(), - }, + src: TransferItem::Local(src_path.clone()), + dest: TransferItem::Local("b/dir".into()), opt: None, }); assert!( @@ -124,14 +106,8 @@ fn transfer_failure_test() { let dest_path = tmp.touch(rel_dest, None); let res = do_transfer(&TransferRoute { - src: TransferItem { - store_name: None, - uri: src_path.clone(), - }, - dest: TransferItem { - store_name: None, - uri: dest_path.clone(), - }, + src: TransferItem::Local(src_path.clone()), + dest: TransferItem::Local(dest_path.clone()), opt: None, }); assert!( @@ -143,14 +119,8 @@ fn transfer_failure_test() { create_dir_all(&dest_path).expect("create dest dir"); let res = do_transfer(&TransferRoute { - src: TransferItem { - store_name: None, - uri: src_path, - }, - dest: TransferItem { - store_name: None, - uri: dest_path, - }, + src: TransferItem::Local(src_path), + dest: TransferItem::Local(dest_path), opt: None, }); @@ -180,14 +150,8 @@ fn transfer_test() { // link dir let res = do_transfer(&TransferRoute { - src: TransferItem { - store_name: None, - uri: src_path.clone(), - }, - dest: TransferItem { - store_name: None, - uri: dest_path.clone(), - }, + src: TransferItem::Local(src_path.clone()), + dest: TransferItem::Local(dest_path.clone()), opt: Some(TransferOption { is_dir: true, allow_link: true, @@ -205,14 +169,8 @@ fn transfer_test() { remove_dir_all(&dest_path).expect("clean up dest dir"); for (sfp, dfp) in file_pairs.iter() { let res = do_transfer(&TransferRoute { - src: TransferItem { - store_name: None, - uri: sfp.clone(), - }, - dest: TransferItem { - store_name: None, - uri: dfp.clone(), - }, + src: TransferItem::Local(sfp.clone()), + dest: TransferItem::Local(dfp.clone()), opt: Some(TransferOption { is_dir: false, allow_link: true, @@ -229,14 +187,8 @@ fn transfer_test() { // copy dir remove_dir_all(&dest_path).expect("clean up dest dir"); let res = do_transfer(&TransferRoute { - src: TransferItem { - store_name: None, - uri: src_path.clone(), - }, - dest: TransferItem { - store_name: None, - uri: dest_path.clone(), - }, + src: TransferItem::Local(src_path.clone()), + dest: TransferItem::Local(dest_path.clone()), opt: None, }); @@ -251,14 +203,8 @@ fn transfer_test() { remove_dir_all(&dest_path).expect("clean up dest dir"); for (sfp, dfp) in file_pairs.iter().take(3) { let res = do_transfer(&TransferRoute { - src: TransferItem { - store_name: None, - uri: sfp.clone(), - }, - dest: TransferItem { - store_name: None, - uri: dfp.clone(), - }, + src: TransferItem::Local(sfp.clone()), + dest: TransferItem::Local(dfp.clone()), opt: None, }); assert!(res.is_ok(), "transfer file {:?} to {:?}, disallow_link", sfp, dfp); diff --git a/damocles-worker/vc-processors/src/builtin/tasks.rs b/damocles-worker/vc-processors/src/builtin/tasks.rs index be8c1f7b7..b7eaa8441 100644 --- a/damocles-worker/vc-processors/src/builtin/tasks.rs +++ b/damocles-worker/vc-processors/src/builtin/tasks.rs @@ -186,9 +186,9 @@ pub struct TransferStoreInfo { } #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct TransferItem { - pub store_name: Option, - pub uri: PathBuf, +pub enum TransferItem { + Store { store: String, uri: String, resource_name: String }, + Local(PathBuf), } #[derive(Clone, Debug, Serialize, Deserialize)]