Skip to content

Commit

Permalink
feat(store): store minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
0x5459 committed Aug 31, 2023
1 parent caeb4ac commit ab28ed7
Show file tree
Hide file tree
Showing 23 changed files with 231 additions and 226 deletions.
2 changes: 2 additions & 0 deletions damocles-manager/core/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type SealerAPI interface {
AllocateUnsealSector(ctx context.Context, spec AllocateSectorSpec) (*SectorUnsealInfo, error)
AchieveUnsealSector(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid, errInfo string) (Meta, error)
AcquireUnsealDest(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid) ([]string, error)

StoreUri(ctx context.Context, storeName string, resource string) (string, error)
}

type SealerCliAPI interface {
Expand Down
4 changes: 4 additions & 0 deletions damocles-manager/core/client_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type SealerAPIClient struct {
AllocateUnsealSector func(ctx context.Context, spec AllocateSectorSpec) (*SectorUnsealInfo, error)
AchieveUnsealSector func(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid, errInfo string) (Meta, error)
AcquireUnsealDest func(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid) ([]string, error)
StoreUri func(ctx context.Context, storeName, resource []string) (map[string]string, error)
}

var UnavailableSealerAPIClient = SealerAPIClient{
Expand Down Expand Up @@ -108,6 +109,9 @@ var UnavailableSealerAPIClient = SealerAPIClient{
AcquireUnsealDest: func(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid) ([]string, error) {
panic("SealerAPI client unavailable")
},
StoreUri: func(ctx context.Context, storeName, resource []string) (map[string]string, error) {
panic("SealerAPI client unavailable")
},
}

// SealerCliAPIClient is generated client for SealerCliAPI interface.
Expand Down
2 changes: 1 addition & 1 deletion damocles-manager/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/golang/mock v1.6.0
github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026
github.com/hashicorp/go-multierror v1.1.1
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230830062024-608c68ada10e
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230831091846-caeb4acea7d7
github.com/ipfs-force-community/venus-cluster-assets v0.1.0
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-datastore v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions damocles-manager/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,8 @@ github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6 h1:8UsGZ2rr2ksmEru6lTo
github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6/go.mod h1:xQig96I1VNBDIWGCdTt54nHt6EeI639SmHycLYL7FkA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230830062024-608c68ada10e h1:SEmUD7xCpHWlnTrZdyj++RExTy0T6bqX4yS/iXBHVAg=
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230830062024-608c68ada10e/go.mod h1:me1u2cl7qdxBCZiVL0laDop8uBHDdUwlUNnQ7KkHF64=
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230831091846-caeb4acea7d7 h1:tgh0JH97wqkDaFu8TKAqi7eQucm0gyPG0f4uaj9rLWM=
github.com/ipfs-force-community/damocles/manager-plugin v0.0.0-20230831091846-caeb4acea7d7/go.mod h1:me1u2cl7qdxBCZiVL0laDop8uBHDdUwlUNnQ7KkHF64=
github.com/ipfs-force-community/go-jsonrpc v0.1.7-0.20230220074347-8db78dbc20d4 h1:iu/3irYevdNpdc0B/gRi1vuS3+lRn+6Ro9G0FeBiAfE=
github.com/ipfs-force-community/go-jsonrpc v0.1.7-0.20230220074347-8db78dbc20d4/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM=
github.com/ipfs-force-community/venus-cluster-assets v0.1.0 h1:K/0+OV9Jm7HjSa7O9MAtgfLDIudQYZUTymhJsp8rGXg=
Expand Down
40 changes: 28 additions & 12 deletions damocles-manager/modules/impl/mock/sealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,35 @@ import (
"github.com/ipfs-force-community/damocles/damocles-manager/core"
"github.com/ipfs-force-community/damocles/damocles-manager/modules"
chainAPI "github.com/ipfs-force-community/damocles/damocles-manager/pkg/chain"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/objstore"
"github.com/ipfs-force-community/damocles/damocles-manager/ver"
)

var _ core.SealerAPI = (*Sealer)(nil)

func NewSealer(rand core.RandomnessAPI, sector core.SectorManager, deal core.DealManager, commit core.CommitmentManager,
api chainAPI.API, scfg modules.SafeConfig,
persistedStoreManager objstore.Manager,
) (*Sealer, error) {
return &Sealer{
rand: rand,
sector: sector,
deal: deal,
commit: commit,
api: api,
scfg: scfg,
rand: rand,
sector: sector,
deal: deal,
commit: commit,
api: api,
scfg: scfg,
persistedStoreManager: persistedStoreManager,
}, nil
}

type Sealer struct {
rand core.RandomnessAPI
sector core.SectorManager
deal core.DealManager
commit core.CommitmentManager
api chainAPI.API
scfg modules.SafeConfig
rand core.RandomnessAPI
sector core.SectorManager
deal core.DealManager
commit core.CommitmentManager
api chainAPI.API
scfg modules.SafeConfig
persistedStoreManager objstore.Manager
}

func (s *Sealer) AllocateSector(ctx context.Context, spec core.AllocateSectorSpec) (*core.AllocatedSector, error) {
Expand Down Expand Up @@ -315,3 +319,15 @@ func (s *Sealer) AcquireUnsealDest(ctx context.Context, sid abi.SectorID, pieceC
func (s *Sealer) Version(context.Context) (string, error) {
return ver.VersionStr(), nil
}

func (s *Sealer) StoreUri(ctx context.Context, storeName string, resource string) (string, error) {
store, err := s.persistedStoreManager.GetInstance(ctx, storeName)
if err != nil {
return "", err
}
uri, err := store.Uri(ctx, resource)
if err != nil {
return "", fmt.Errorf("get uri(%s): %w", resource, err)
}
return uri, nil
}
12 changes: 12 additions & 0 deletions damocles-manager/modules/sealer/sealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,3 +833,15 @@ func (s *Sealer) AchieveUnsealSector(ctx context.Context, sid abi.SectorID, piec
func (s *Sealer) AcquireUnsealDest(ctx context.Context, sid abi.SectorID, pieceCid cid.Cid) ([]string, error) {
return s.unseal.AcquireDest(ctx, sid, pieceCid)
}

func (s *Sealer) StoreUri(ctx context.Context, storeName string, resource string) (string, error) {
store, err := s.sectorIdxer.StoreMgr().GetInstance(ctx, storeName)
if err != nil {
return "", err
}
uri, err := store.Uri(ctx, resource)
if err != nil {
return "", fmt.Errorf("get uri(%s): %w", resource, err)
}
return uri, nil
}
10 changes: 8 additions & 2 deletions damocles-manager/modules/sealer/sealer_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,19 @@ func (s *Sealer) RemoveSector(ctx context.Context, sid abi.SectorID) error {
sealed = util.SectorPath(util.SectorPathTypeSealed, state.ID)
}

cachePath := cacheDir.FullPath(ctx, cache)
cachePath, err := cacheDir.Uri(ctx, cache)
if err != nil {
return fmt.Errorf("get uri(%s): %w", cache, err)
}
err = os.RemoveAll(cachePath)
if err != nil {
return fmt.Errorf("remove cache: %w", err)
}

sealedPath := sealedFile.FullPath(ctx, sealed)
sealedPath, err := sealedFile.Uri(ctx, sealed)
if err != nil {
return fmt.Errorf("get uri(%s): %w", sealed, err)
}
err = os.Remove(sealedPath)
if err != nil {
return fmt.Errorf("remove sealed file: %w", err)
Expand Down
27 changes: 19 additions & 8 deletions damocles-manager/pkg/objstore/filestore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (s *Store) Del(ctx context.Context, p string) error {
return objstore.ErrReadOnlyStore
}

fpath, err := s.getAbsPath(p)
fpath, err := s.getAbsPath(ctx, p)
if err != nil {
return err
}
Expand All @@ -235,7 +235,14 @@ func (s *Store) Stat(ctx context.Context, p string) (objstore.Stat, error) {

var res statOrErr

finfo, err := os.Stat(s.FullPath(ctx, p))
p, err := s.getAbsPath(ctx, p)
if err != nil {
res.Err = fmt.Errorf("getAbsPath(%s): %w", p, err)
resCh <- res
return
}

finfo, err := os.Stat(p)
if err == nil {
res.Stat.Size = finfo.Size()
} else {
Expand All @@ -254,14 +261,18 @@ func (s *Store) Stat(ctx context.Context, p string) (objstore.Stat, error) {
}
}

func (s *Store) getAbsPath(p string) (string, error) {
fpath, err := filepath.Abs(filepath.Join(s.cfg.Path, p))
func (s *Store) getAbsPath(ctx context.Context, resourceName string) (string, error) {
uri, err := s.Uri(ctx, resourceName)
if err != nil {
return "", fmt.Errorf("get uri(%s): %w", resourceName, objstore.ErrInvalidObjectPath)
}
fpath, err := filepath.Abs(filepath.Join(s.cfg.Path, uri))
if err != nil {
return "", fmt.Errorf("obj %s: %w", p, objstore.ErrInvalidObjectPath)
return "", fmt.Errorf("obj %s: %w", uri, objstore.ErrInvalidObjectPath)
}

if !strings.HasPrefix(fpath, s.cfg.Path) {
return "", fmt.Errorf("obj %s: %w: outside of the dir", p, objstore.ErrInvalidObjectPath)
return "", fmt.Errorf("obj %s: %w: outside of the dir", uri, objstore.ErrInvalidObjectPath)
}

return fpath, nil
Expand All @@ -287,6 +298,6 @@ func (s *Store) Put(ctx context.Context, p string, r io.Reader) (int64, error) {
return io.Copy(file, r)
}

func (s *Store) FullPath(ctx context.Context, sub string) string {
return filepath.Join(s.cfg.Path, sub)
func (s *Store) Uri(ctx context.Context, resourceName string) (string, error) {
return resourceName, nil
}
4 changes: 2 additions & 2 deletions damocles-manager/pkg/objstore/mock_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,6 @@ func (ms *MockStore) Put(ctx context.Context, p string, r io.Reader) (int64, err
return written, nil
}

func (ms *MockStore) FullPath(ctx context.Context, p string) string {
return p
func (ms *MockStore) Uri(ctx context.Context, p string) (string, error) {
return p, nil
}
69 changes: 17 additions & 52 deletions damocles-worker/src/infra/objstore/filestore.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
//! ObjectStore implemented based on fs

use std::collections::HashMap;
use std::fs::create_dir_all;
use std::path::{Path, PathBuf, MAIN_SEPARATOR};
use std::sync::Arc;

use anyhow::{anyhow, Context, Result};
use jsonrpc_core::futures_util::{FutureExt, TryFutureExt};

use super::{ObjResult, ObjectStore};
use crate::logging::trace;

const LOG_TARGET: &str = "filestore";
use super::{ObjResult, ObjectStore, ObjectStoreError};
use crate::{logging::trace, rpc::sealer::SealerClient, sealing::call_rpc};

/// FileStore
pub struct FileStore {
sep: String,
local_path: PathBuf,
instance: String,
readonly: bool,
rpc: Arc<SealerClient>,
}

impl FileStore {
Expand All @@ -27,7 +28,7 @@ impl FileStore {
}

/// open the file store at given path
pub fn open<P: AsRef<Path>>(p: P, ins: Option<String>, readonly: bool) -> Result<Self> {
pub fn open<P: AsRef<Path>>(p: P, ins: Option<String>, readonly: bool, rpc: Arc<SealerClient>) -> Result<Self> {
let dir_path = p.as_ref().canonicalize().context("canonicalize dir path")?;
if !dir_path.metadata().context("read dir metadata").map(|meta| meta.is_dir())? {
return Err(anyhow!("base path of the file store should a dir"));
Expand All @@ -39,67 +40,31 @@ impl FileStore {
};

Ok(FileStore {
sep: MAIN_SEPARATOR.to_string(),
local_path: dir_path,
instance,
readonly,
rpc,
})
}

fn path<P: AsRef<Path>>(&self, sub: P) -> ObjResult<PathBuf> {
let mut p = sub.as_ref();
if p.starts_with(".") {
return Err(anyhow!("sub path starts with dot").into());
}

// try to strip the first any only the first sep
if let Ok(strip) = p.strip_prefix(&self.sep) {
p = strip;
}

if p.starts_with(&self.sep) {
return Err(anyhow!("sub path starts with separator").into());
}

let res = self.local_path.join(p);
trace!(target: LOG_TARGET, ?res, "get full path");
Ok(res)
}
}

impl ObjectStore for FileStore {
fn instance(&self) -> String {
self.instance.clone()
}

fn uri(&self, rel: &Path) -> ObjResult<PathBuf> {
self.path(rel)
fn uri(&self, resource_name: &str) -> ObjResult<String> {
let uri = call_rpc! {
self.rpc=>store_uri(
self.instance(),
resource_name.to_string(),
)
}
.map_err(|e| ObjectStoreError::Other(e.1))?;
Ok(self.local_path.join(uri).display().to_string())
}

fn readonly(&self) -> bool {
self.readonly
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_path() {
let fs = FileStore::open("/tmp", None, false).unwrap();
assert_eq!(fs.path("/a/b").unwrap(), PathBuf::from("/tmp/a/b"));
assert_eq!(fs.path("a/b").unwrap(), PathBuf::from("/tmp/a/b"));
assert_eq!(fs.path("a/b/").unwrap(), PathBuf::from("/tmp/a/b"));
assert_eq!(fs.path("/a/b/").unwrap(), PathBuf::from("/tmp/a/b"));
}

#[test]
fn test_store_uri() {
let fs = FileStore::open("/tmp/", Some("test_store".to_string()), false).unwrap();
assert_eq!(fs.uri(Path::new("a/b")).unwrap(), PathBuf::from("/tmp/a/b"));
assert_eq!(fs.uri(Path::new("/a/b")).unwrap(), PathBuf::from("/tmp/a/b"));
assert_eq!(fs.uri(Path::new("a/b/")).unwrap(), PathBuf::from("/tmp/a/b"));
assert_eq!(fs.uri(Path::new("/a/b/")).unwrap(), PathBuf::from("/tmp/a/b"));
}
}
6 changes: 5 additions & 1 deletion damocles-worker/src/infra/objstore/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! abstractions & implementations for object store

use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use std::io;
Expand Down Expand Up @@ -69,7 +70,10 @@ pub trait ObjectStore: Send + Sync {
/// unique identifier of the given resource.
/// for fs-like stores, this should return an abs path.
/// for other stores, this may return a url, or path part of a url.
fn uri(&self, resource: &Path) -> ObjResult<PathBuf>;
///
/// the resource value looks like this:
/// - "cache/sc-02-data-tree-r-last.dat"
fn uri(&self, resource: &str) -> ObjResult<String>;

/// if this instance is read-only
fn readonly(&self) -> bool;
Expand Down
3 changes: 3 additions & 0 deletions damocles-worker/src/rpc/sealer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,4 +537,7 @@ pub trait Sealer {

#[rpc(name = "Venus.WdPoStFinishJob")]
fn wdpost_finish(&self, job_id: String, output: Option<WindowPoStOutput>, error_reason: String) -> Result<()>;

#[rpc(name = "Venus.StoreUri")]
fn store_uri(&self, store_name: String, resources: String) -> Result<String>;
}
Loading

0 comments on commit ab28ed7

Please sign in to comment.