Skip to content

Commit

Permalink
downloader: rename TorrentFiles to AtomicTorrentFS (#10005)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Apr 20, 2024
1 parent 122f9f8 commit 86dd0e5
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 43 deletions.
4 changes: 2 additions & 2 deletions cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ var createTorrent = &cobra.Command{
Example: "go run ./cmd/downloader torrent_create --datadir=<your_datadir> --file=<relative_file_path>",
RunE: func(cmd *cobra.Command, args []string) error {
dirs := datadir.New(datadirCli)
createdAmount, err := downloader.BuildTorrentFilesIfNeed(cmd.Context(), dirs, downloader.NewAtomicTorrentFiles(dirs.Snap), chain, nil)
createdAmount, err := downloader.BuildTorrentFilesIfNeed(cmd.Context(), dirs, downloader.NewAtomicTorrentFS(dirs.Snap), chain, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -505,7 +505,7 @@ func doPrintTorrentHashes(ctx context.Context, logger log.Logger) error {
return err
}

tf := downloader.NewAtomicTorrentFiles(dirs.Snap)
tf := downloader.NewAtomicTorrentFS(dirs.Snap)

if forceRebuild { // remove and create .torrent files (will re-read all snapshots)
//removePieceCompletionStorage(snapDir)
Expand Down
4 changes: 2 additions & 2 deletions cmd/snapshots/torrents/torrents.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func updateTorrents(ctx context.Context, srcSession *downloader.RCloneSession, f
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(16)

torrentFiles := downloader.NewAtomicTorrentFiles(srcSession.LocalFsRoot())
torrentFiles := downloader.NewAtomicTorrentFS(srcSession.LocalFsRoot())

for _, fi := range entries {
if filepath.Ext(fi.Name()) != ".torrent" {
Expand Down Expand Up @@ -407,7 +407,7 @@ func verifyTorrents(ctx context.Context, srcSession *downloader.RCloneSession, f
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(16)

torrentFiles := downloader.NewAtomicTorrentFiles(srcSession.LocalFsRoot())
torrentFiles := downloader.NewAtomicTorrentFS(srcSession.LocalFsRoot())

for _, fi := range entries {
if filepath.Ext(fi.Name()) != ".torrent" {
Expand Down
20 changes: 10 additions & 10 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type Downloader struct {
logger log.Logger
verbosity log.Lvl

torrentFiles *TorrentFiles
torrentFS *AtomicTorrentFS
snapshotLock *snapshotLock
webDownloadInfo map[string]webDownloadInfo
downloading map[string]struct{}
Expand Down Expand Up @@ -263,14 +263,14 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosi
webseeds: NewWebSeeds(cfg.WebSeedUrls, verbosity, logger),
logger: logger,
verbosity: verbosity,
torrentFiles: &TorrentFiles{dir: cfg.Dirs.Snap},
torrentFS: &AtomicTorrentFS{dir: cfg.Dirs.Snap},
snapshotLock: lock,
webDownloadInfo: map[string]webDownloadInfo{},
webDownloadSessions: map[string]*RCloneSession{},
downloading: map[string]struct{}{},
webseedsDiscover: discover,
}
d.webseeds.SetTorrent(d.torrentFiles, lock.Downloads, cfg.DownloadTorrentFilesFromWebseed)
d.webseeds.SetTorrent(d.torrentFS, lock.Downloads, cfg.DownloadTorrentFilesFromWebseed)

requestHandler.downloader = d

Expand Down Expand Up @@ -2276,11 +2276,11 @@ func (d *Downloader) AddNewSeedableFile(ctx context.Context, name string) error
}

// if we don't have the torrent file we build it if we have the .seg file
_, err := BuildTorrentIfNeed(ctx, name, d.SnapDir(), d.torrentFiles)
_, err := BuildTorrentIfNeed(ctx, name, d.SnapDir(), d.torrentFS)
if err != nil {
return fmt.Errorf("AddNewSeedableFile: %w", err)
}
ts, err := d.torrentFiles.LoadByName(name)
ts, err := d.torrentFS.LoadByName(name)
if err != nil {
return fmt.Errorf("AddNewSeedableFile: %w", err)
}
Expand Down Expand Up @@ -2309,12 +2309,12 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
if d.alreadyHaveThisName(name) || !IsSnapNameAllowed(name) {
return nil
}
isProhibited, err := d.torrentFiles.NewDownloadsAreProhibited(name)
isProhibited, err := d.torrentFS.NewDownloadsAreProhibited(name)
if err != nil {
return err
}

if isProhibited && !d.torrentFiles.Exists(name) {
if isProhibited && !d.torrentFS.Exists(name) {
return nil
}

Expand Down Expand Up @@ -2364,7 +2364,7 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
}

mi := t.Metainfo()
if _, err := d.torrentFiles.CreateWithMetaInfo(t.Info(), &mi); err != nil {
if _, err := d.torrentFS.CreateWithMetaInfo(t.Info(), &mi); err != nil {
d.logger.Warn("[snapshots] create torrent file", "err", err)
return
}
Expand Down Expand Up @@ -2408,7 +2408,7 @@ func (d *Downloader) addTorrentFilesFromDisk(quiet bool) error {
eg, ctx := errgroup.WithContext(d.ctx)
eg.SetLimit(ParallelVerifyFiles)

files, err := AllTorrentSpecs(d.cfg.Dirs, d.torrentFiles)
files, err := AllTorrentSpecs(d.cfg.Dirs, d.torrentFS)
if err != nil {
return err
}
Expand Down Expand Up @@ -2480,7 +2480,7 @@ func (d *Downloader) addTorrentFilesFromDisk(quiet bool) error {
return eg.Wait()
}
func (d *Downloader) BuildTorrentFilesIfNeed(ctx context.Context, chain string, ignore snapcfg.Preverified) error {
_, err := BuildTorrentFilesIfNeed(ctx, d.cfg.Dirs, d.torrentFiles, chain, ignore)
_, err := BuildTorrentFilesIfNeed(ctx, d.cfg.Dirs, d.torrentFS, chain, ignore)
return err
}
func (d *Downloader) Stats() AggStats {
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type GrpcServer struct {
}

func (s *GrpcServer) ProhibitNewDownloads(ctx context.Context, req *proto_downloader.ProhibitNewDownloadsRequest) (*emptypb.Empty, error) {
return &emptypb.Empty{}, s.d.torrentFiles.ProhibitNewDownloads(req.Type)
return &emptypb.Empty{}, s.d.torrentFS.ProhibitNewDownloads(req.Type)
}

// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
Expand Down Expand Up @@ -107,7 +107,7 @@ func (s *GrpcServer) Delete(ctx context.Context, request *proto_downloader.Delet

fPath := filepath.Join(s.d.SnapDir(), name)
_ = os.Remove(fPath)
s.d.torrentFiles.Delete(name)
s.d.torrentFS.Delete(name)
}
return &emptypb.Empty{}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestNoEscape(t *testing.T) {
dirs := datadir.New(t.TempDir())
ctx := context.Background()

tf := NewAtomicTorrentFiles(dirs.Snap)
tf := NewAtomicTorrentFS(dirs.Snap)
// allow adding files only if they are inside snapshots dir
_, err := BuildTorrentIfNeed(ctx, "a.seg", dirs.Snap, tf)
require.NoError(err)
Expand Down
38 changes: 19 additions & 19 deletions erigon-lib/downloader/torrent_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,42 @@ import (
"golang.org/x/exp/slices"
)

// TorrentFiles - does provide thread-safe CRUD operations on .torrent files
type TorrentFiles struct {
// AtomicTorrentFS - does provide thread-safe CRUD operations on .torrent files
type AtomicTorrentFS struct {
lock sync.Mutex
dir string
}

func NewAtomicTorrentFiles(dir string) *TorrentFiles {
return &TorrentFiles{dir: dir}
func NewAtomicTorrentFS(dir string) *AtomicTorrentFS {
return &AtomicTorrentFS{dir: dir}
}

func (tf *TorrentFiles) Exists(name string) bool {
func (tf *AtomicTorrentFS) Exists(name string) bool {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.exists(name)
}

func (tf *TorrentFiles) exists(name string) bool {
func (tf *AtomicTorrentFS) exists(name string) bool {
if !strings.HasSuffix(name, ".torrent") {
name += ".torrent"
}
return dir.FileExist(filepath.Join(tf.dir, name))
}
func (tf *TorrentFiles) Delete(name string) error {
func (tf *AtomicTorrentFS) Delete(name string) error {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.delete(name)
}

func (tf *TorrentFiles) delete(name string) error {
func (tf *AtomicTorrentFS) delete(name string) error {
if !strings.HasSuffix(name, ".torrent") {
name += ".torrent"
}
return os.Remove(filepath.Join(tf.dir, name))
}

func (tf *TorrentFiles) Create(name string, res []byte) (ts *torrent.TorrentSpec, prohibited, created bool, err error) {
func (tf *AtomicTorrentFS) Create(name string, res []byte) (ts *torrent.TorrentSpec, prohibited, created bool, err error) {
tf.lock.Lock()
defer tf.lock.Unlock()
prohibited, err = tf.newDownloadsAreProhibited(name)
Expand All @@ -72,7 +72,7 @@ func (tf *TorrentFiles) Create(name string, res []byte) (ts *torrent.TorrentSpec
return ts, prohibited, false, nil
}

func (tf *TorrentFiles) create(name string, res []byte) error {
func (tf *AtomicTorrentFS) create(name string, res []byte) error {
if !strings.HasSuffix(name, ".torrent") {
name += ".torrent"
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func (tf *TorrentFiles) create(name string, res []byte) error {
return nil
}

func (tf *TorrentFiles) createFromMetaInfo(fPath string, mi *metainfo.MetaInfo) error {
func (tf *AtomicTorrentFS) createFromMetaInfo(fPath string, mi *metainfo.MetaInfo) error {
file, err := os.Create(fPath + ".tmp")
if err != nil {
return err
Expand All @@ -123,7 +123,7 @@ func (tf *TorrentFiles) createFromMetaInfo(fPath string, mi *metainfo.MetaInfo)
return nil
}

func (tf *TorrentFiles) CreateWithMetaInfo(info *metainfo.Info, additionalMetaInfo *metainfo.MetaInfo) (created bool, err error) {
func (tf *AtomicTorrentFS) CreateWithMetaInfo(info *metainfo.Info, additionalMetaInfo *metainfo.MetaInfo) (created bool, err error) {
name := info.Name
if !strings.HasSuffix(name, ".torrent") {
name += ".torrent"
Expand Down Expand Up @@ -152,19 +152,19 @@ func (tf *TorrentFiles) CreateWithMetaInfo(info *metainfo.Info, additionalMetaIn
return true, nil
}

func (tf *TorrentFiles) LoadByName(name string) (*torrent.TorrentSpec, error) {
func (tf *AtomicTorrentFS) LoadByName(name string) (*torrent.TorrentSpec, error) {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.load(filepath.Join(tf.dir, name))
}

func (tf *TorrentFiles) LoadByPath(fPath string) (*torrent.TorrentSpec, error) {
func (tf *AtomicTorrentFS) LoadByPath(fPath string) (*torrent.TorrentSpec, error) {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.load(fPath)
}

func (tf *TorrentFiles) load(fPath string) (*torrent.TorrentSpec, error) {
func (tf *AtomicTorrentFS) load(fPath string) (*torrent.TorrentSpec, error) {
if !strings.HasSuffix(fPath, ".torrent") {
fPath += ".torrent"
}
Expand All @@ -181,13 +181,13 @@ const ProhibitNewDownloadsFileName = "prohibit_new_downloads.lock"
// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
// After "download once" - Erigon will produce and seed new files
// Downloader will able: seed new files (already existing on FS), download uncomplete parts of existing files (if Verify found some bad parts)
func (tf *TorrentFiles) ProhibitNewDownloads(t string) error {
func (tf *AtomicTorrentFS) ProhibitNewDownloads(t string) error {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.prohibitNewDownloads(t)
}

func (tf *TorrentFiles) prohibitNewDownloads(t string) error {
func (tf *AtomicTorrentFS) prohibitNewDownloads(t string) error {
// open or create file ProhibitNewDownloadsFileName
f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_RDONLY, 0644)
if err != nil {
Expand Down Expand Up @@ -227,13 +227,13 @@ func (tf *TorrentFiles) prohibitNewDownloads(t string) error {
return f.Sync()
}

func (tf *TorrentFiles) NewDownloadsAreProhibited(name string) (bool, error) {
func (tf *AtomicTorrentFS) NewDownloadsAreProhibited(name string) (bool, error) {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.newDownloadsAreProhibited(name)
}

func (tf *TorrentFiles) newDownloadsAreProhibited(name string) (bool, error) {
func (tf *AtomicTorrentFS) newDownloadsAreProhibited(name string) (bool, error) {
f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_APPEND|os.O_RDONLY, 0644)
if err != nil {
return false, err
Expand Down
6 changes: 3 additions & 3 deletions erigon-lib/downloader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func ensureCantLeaveDir(fName, root string) (string, error) {
return fName, nil
}

func BuildTorrentIfNeed(ctx context.Context, fName, root string, torrentFiles *TorrentFiles) (ok bool, err error) {
func BuildTorrentIfNeed(ctx context.Context, fName, root string, torrentFiles *AtomicTorrentFS) (ok bool, err error) {
select {
case <-ctx.Done():
return false, ctx.Err()
Expand Down Expand Up @@ -163,7 +163,7 @@ func BuildTorrentIfNeed(ctx context.Context, fName, root string, torrentFiles *T
}

// BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
func BuildTorrentFilesIfNeed(ctx context.Context, dirs datadir.Dirs, torrentFiles *TorrentFiles, chain string, ignore snapcfg.Preverified) (int, error) {
func BuildTorrentFilesIfNeed(ctx context.Context, dirs datadir.Dirs, torrentFiles *AtomicTorrentFS, chain string, ignore snapcfg.Preverified) (int, error) {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()

Expand Down Expand Up @@ -247,7 +247,7 @@ func AllTorrentPaths(dirs datadir.Dirs) ([]string, error) {
return files, nil
}

func AllTorrentSpecs(dirs datadir.Dirs, torrentFiles *TorrentFiles) (res []*torrent.TorrentSpec, err error) {
func AllTorrentSpecs(dirs datadir.Dirs, torrentFiles *AtomicTorrentFS) (res []*torrent.TorrentSpec, err error) {
files, err := AllTorrentPaths(dirs)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/downloader/webseed.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type WebSeeds struct {
logger log.Logger
verbosity log.Lvl

torrentFiles *TorrentFiles
torrentFiles *AtomicTorrentFS
}

func NewWebSeeds(seeds []*url.URL, verbosity log.Lvl, logger log.Logger) *WebSeeds {
Expand Down Expand Up @@ -97,7 +97,7 @@ func (d *WebSeeds) getWebDownloadInfo(ctx context.Context, t *torrent.Torrent) (
return infos, seedHashMismatches, nil
}

func (d *WebSeeds) SetTorrent(t *TorrentFiles, whiteList snapcfg.Preverified, downloadTorrentFile bool) {
func (d *WebSeeds) SetTorrent(t *AtomicTorrentFS, whiteList snapcfg.Preverified, downloadTorrentFile bool) {
d.downloadTorrentFile = downloadTorrentFile
d.torrentsWhitelist = whiteList
d.torrentFiles = t
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func StageSnapshotsCfg(db kv.RwDB,
cfg.snapshotUploader = &snapshotUploader{
cfg: &cfg,
uploadFs: uploadFs,
torrentFiles: downloader.NewAtomicTorrentFiles(cfg.dirs.Snap),
torrentFiles: downloader.NewAtomicTorrentFS(cfg.dirs.Snap),
}

cfg.blockRetire.SetWorkers(estimate.CompressSnapshot.Workers())
Expand Down Expand Up @@ -516,7 +516,7 @@ type snapshotUploader struct {
uploadScheduled atomic.Bool
uploading atomic.Bool
manifestMutex sync.Mutex
torrentFiles *downloader.TorrentFiles
torrentFiles *downloader.AtomicTorrentFS
}

func (u *snapshotUploader) init(ctx context.Context, logger log.Logger) {
Expand Down

0 comments on commit 86dd0e5

Please sign in to comment.