Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

downloader: rename TorrentFiles to AtomicTorrentFS #10005

Merged
merged 2 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ var createTorrent = &cobra.Command{
Example: "go run ./cmd/downloader torrent_create --datadir=<your_datadir> --file=<relative_file_path>",
RunE: func(cmd *cobra.Command, args []string) error {
dirs := datadir.New(datadirCli)
createdAmount, err := downloader.BuildTorrentFilesIfNeed(cmd.Context(), dirs, downloader.NewAtomicTorrentFiles(dirs.Snap), chain, nil)
createdAmount, err := downloader.BuildTorrentFilesIfNeed(cmd.Context(), dirs, downloader.NewAtomicTorrentFS(dirs.Snap), chain, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -505,7 +505,7 @@ func doPrintTorrentHashes(ctx context.Context, logger log.Logger) error {
return err
}

tf := downloader.NewAtomicTorrentFiles(dirs.Snap)
tf := downloader.NewAtomicTorrentFS(dirs.Snap)

if forceRebuild { // remove and create .torrent files (will re-read all snapshots)
//removePieceCompletionStorage(snapDir)
Expand Down
4 changes: 2 additions & 2 deletions cmd/snapshots/torrents/torrents.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func updateTorrents(ctx context.Context, srcSession *downloader.RCloneSession, f
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(16)

torrentFiles := downloader.NewAtomicTorrentFiles(srcSession.LocalFsRoot())
torrentFiles := downloader.NewAtomicTorrentFS(srcSession.LocalFsRoot())

for _, fi := range entries {
if filepath.Ext(fi.Name()) != ".torrent" {
Expand Down Expand Up @@ -407,7 +407,7 @@ func verifyTorrents(ctx context.Context, srcSession *downloader.RCloneSession, f
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(16)

torrentFiles := downloader.NewAtomicTorrentFiles(srcSession.LocalFsRoot())
torrentFiles := downloader.NewAtomicTorrentFS(srcSession.LocalFsRoot())

for _, fi := range entries {
if filepath.Ext(fi.Name()) != ".torrent" {
Expand Down
20 changes: 10 additions & 10 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type Downloader struct {
logger log.Logger
verbosity log.Lvl

torrentFiles *TorrentFiles
torrentFS *AtomicTorrentFS
snapshotLock *snapshotLock
webDownloadInfo map[string]webDownloadInfo
downloading map[string]struct{}
Expand Down Expand Up @@ -263,14 +263,14 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosi
webseeds: NewWebSeeds(cfg.WebSeedUrls, verbosity, logger),
logger: logger,
verbosity: verbosity,
torrentFiles: &TorrentFiles{dir: cfg.Dirs.Snap},
torrentFS: &AtomicTorrentFS{dir: cfg.Dirs.Snap},
snapshotLock: lock,
webDownloadInfo: map[string]webDownloadInfo{},
webDownloadSessions: map[string]*RCloneSession{},
downloading: map[string]struct{}{},
webseedsDiscover: discover,
}
d.webseeds.SetTorrent(d.torrentFiles, lock.Downloads, cfg.DownloadTorrentFilesFromWebseed)
d.webseeds.SetTorrent(d.torrentFS, lock.Downloads, cfg.DownloadTorrentFilesFromWebseed)

requestHandler.downloader = d

Expand Down Expand Up @@ -2276,11 +2276,11 @@ func (d *Downloader) AddNewSeedableFile(ctx context.Context, name string) error
}

// if we don't have the torrent file we build it if we have the .seg file
_, err := BuildTorrentIfNeed(ctx, name, d.SnapDir(), d.torrentFiles)
_, err := BuildTorrentIfNeed(ctx, name, d.SnapDir(), d.torrentFS)
if err != nil {
return fmt.Errorf("AddNewSeedableFile: %w", err)
}
ts, err := d.torrentFiles.LoadByName(name)
ts, err := d.torrentFS.LoadByName(name)
if err != nil {
return fmt.Errorf("AddNewSeedableFile: %w", err)
}
Expand Down Expand Up @@ -2309,12 +2309,12 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
if d.alreadyHaveThisName(name) || !IsSnapNameAllowed(name) {
return nil
}
isProhibited, err := d.torrentFiles.NewDownloadsAreProhibited(name)
isProhibited, err := d.torrentFS.NewDownloadsAreProhibited(name)
if err != nil {
return err
}

if isProhibited && !d.torrentFiles.Exists(name) {
if isProhibited && !d.torrentFS.Exists(name) {
return nil
}

Expand Down Expand Up @@ -2364,7 +2364,7 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
}

mi := t.Metainfo()
if _, err := d.torrentFiles.CreateWithMetaInfo(t.Info(), &mi); err != nil {
if _, err := d.torrentFS.CreateWithMetaInfo(t.Info(), &mi); err != nil {
d.logger.Warn("[snapshots] create torrent file", "err", err)
return
}
Expand Down Expand Up @@ -2408,7 +2408,7 @@ func (d *Downloader) addTorrentFilesFromDisk(quiet bool) error {
eg, ctx := errgroup.WithContext(d.ctx)
eg.SetLimit(ParallelVerifyFiles)

files, err := AllTorrentSpecs(d.cfg.Dirs, d.torrentFiles)
files, err := AllTorrentSpecs(d.cfg.Dirs, d.torrentFS)
if err != nil {
return err
}
Expand Down Expand Up @@ -2480,7 +2480,7 @@ func (d *Downloader) addTorrentFilesFromDisk(quiet bool) error {
return eg.Wait()
}
func (d *Downloader) BuildTorrentFilesIfNeed(ctx context.Context, chain string, ignore snapcfg.Preverified) error {
_, err := BuildTorrentFilesIfNeed(ctx, d.cfg.Dirs, d.torrentFiles, chain, ignore)
_, err := BuildTorrentFilesIfNeed(ctx, d.cfg.Dirs, d.torrentFS, chain, ignore)
return err
}
func (d *Downloader) Stats() AggStats {
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type GrpcServer struct {
}

func (s *GrpcServer) ProhibitNewDownloads(ctx context.Context, req *proto_downloader.ProhibitNewDownloadsRequest) (*emptypb.Empty, error) {
return &emptypb.Empty{}, s.d.torrentFiles.ProhibitNewDownloads(req.Type)
return &emptypb.Empty{}, s.d.torrentFS.ProhibitNewDownloads(req.Type)
}

// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
Expand Down Expand Up @@ -107,7 +107,7 @@ func (s *GrpcServer) Delete(ctx context.Context, request *proto_downloader.Delet

fPath := filepath.Join(s.d.SnapDir(), name)
_ = os.Remove(fPath)
s.d.torrentFiles.Delete(name)
s.d.torrentFS.Delete(name)
}
return &emptypb.Empty{}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestNoEscape(t *testing.T) {
dirs := datadir.New(t.TempDir())
ctx := context.Background()

tf := NewAtomicTorrentFiles(dirs.Snap)
tf := NewAtomicTorrentFS(dirs.Snap)
// allow adding files only if they are inside snapshots dir
_, err := BuildTorrentIfNeed(ctx, "a.seg", dirs.Snap, tf)
require.NoError(err)
Expand Down
38 changes: 19 additions & 19 deletions erigon-lib/downloader/torrent_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,42 @@ import (
"golang.org/x/exp/slices"
)

// TorrentFiles - does provide thread-safe CRUD operations on .torrent files
type TorrentFiles struct {
// AtomicTorrentFS - does provide thread-safe CRUD operations on .torrent files
type AtomicTorrentFS struct {
lock sync.Mutex
dir string
}

func NewAtomicTorrentFiles(dir string) *TorrentFiles {
return &TorrentFiles{dir: dir}
func NewAtomicTorrentFS(dir string) *AtomicTorrentFS {
return &AtomicTorrentFS{dir: dir}
}

func (tf *TorrentFiles) Exists(name string) bool {
func (tf *AtomicTorrentFS) Exists(name string) bool {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.exists(name)
}

func (tf *TorrentFiles) exists(name string) bool {
func (tf *AtomicTorrentFS) exists(name string) bool {
if !strings.HasSuffix(name, ".torrent") {
name += ".torrent"
}
return dir.FileExist(filepath.Join(tf.dir, name))
}
func (tf *TorrentFiles) Delete(name string) error {
func (tf *AtomicTorrentFS) Delete(name string) error {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.delete(name)
}

func (tf *TorrentFiles) delete(name string) error {
func (tf *AtomicTorrentFS) delete(name string) error {
if !strings.HasSuffix(name, ".torrent") {
name += ".torrent"
}
return os.Remove(filepath.Join(tf.dir, name))
}

func (tf *TorrentFiles) Create(name string, res []byte) (ts *torrent.TorrentSpec, prohibited, created bool, err error) {
func (tf *AtomicTorrentFS) Create(name string, res []byte) (ts *torrent.TorrentSpec, prohibited, created bool, err error) {
tf.lock.Lock()
defer tf.lock.Unlock()
prohibited, err = tf.newDownloadsAreProhibited(name)
Expand All @@ -72,7 +72,7 @@ func (tf *TorrentFiles) Create(name string, res []byte) (ts *torrent.TorrentSpec
return ts, prohibited, false, nil
}

func (tf *TorrentFiles) create(name string, res []byte) error {
func (tf *AtomicTorrentFS) create(name string, res []byte) error {
if !strings.HasSuffix(name, ".torrent") {
name += ".torrent"
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func (tf *TorrentFiles) create(name string, res []byte) error {
return nil
}

func (tf *TorrentFiles) createFromMetaInfo(fPath string, mi *metainfo.MetaInfo) error {
func (tf *AtomicTorrentFS) createFromMetaInfo(fPath string, mi *metainfo.MetaInfo) error {
file, err := os.Create(fPath + ".tmp")
if err != nil {
return err
Expand All @@ -123,7 +123,7 @@ func (tf *TorrentFiles) createFromMetaInfo(fPath string, mi *metainfo.MetaInfo)
return nil
}

func (tf *TorrentFiles) CreateWithMetaInfo(info *metainfo.Info, additionalMetaInfo *metainfo.MetaInfo) (created bool, err error) {
func (tf *AtomicTorrentFS) CreateWithMetaInfo(info *metainfo.Info, additionalMetaInfo *metainfo.MetaInfo) (created bool, err error) {
name := info.Name
if !strings.HasSuffix(name, ".torrent") {
name += ".torrent"
Expand Down Expand Up @@ -152,19 +152,19 @@ func (tf *TorrentFiles) CreateWithMetaInfo(info *metainfo.Info, additionalMetaIn
return true, nil
}

func (tf *TorrentFiles) LoadByName(name string) (*torrent.TorrentSpec, error) {
func (tf *AtomicTorrentFS) LoadByName(name string) (*torrent.TorrentSpec, error) {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.load(filepath.Join(tf.dir, name))
}

func (tf *TorrentFiles) LoadByPath(fPath string) (*torrent.TorrentSpec, error) {
func (tf *AtomicTorrentFS) LoadByPath(fPath string) (*torrent.TorrentSpec, error) {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.load(fPath)
}

func (tf *TorrentFiles) load(fPath string) (*torrent.TorrentSpec, error) {
func (tf *AtomicTorrentFS) load(fPath string) (*torrent.TorrentSpec, error) {
if !strings.HasSuffix(fPath, ".torrent") {
fPath += ".torrent"
}
Expand All @@ -181,13 +181,13 @@ const ProhibitNewDownloadsFileName = "prohibit_new_downloads.lock"
// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
// After "download once" - Erigon will produce and seed new files
// Downloader will able: seed new files (already existing on FS), download uncomplete parts of existing files (if Verify found some bad parts)
func (tf *TorrentFiles) ProhibitNewDownloads(t string) error {
func (tf *AtomicTorrentFS) ProhibitNewDownloads(t string) error {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.prohibitNewDownloads(t)
}

func (tf *TorrentFiles) prohibitNewDownloads(t string) error {
func (tf *AtomicTorrentFS) prohibitNewDownloads(t string) error {
// open or create file ProhibitNewDownloadsFileName
f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_RDONLY, 0644)
if err != nil {
Expand Down Expand Up @@ -227,13 +227,13 @@ func (tf *TorrentFiles) prohibitNewDownloads(t string) error {
return f.Sync()
}

func (tf *TorrentFiles) NewDownloadsAreProhibited(name string) (bool, error) {
func (tf *AtomicTorrentFS) NewDownloadsAreProhibited(name string) (bool, error) {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.newDownloadsAreProhibited(name)
}

func (tf *TorrentFiles) newDownloadsAreProhibited(name string) (bool, error) {
func (tf *AtomicTorrentFS) newDownloadsAreProhibited(name string) (bool, error) {
f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_APPEND|os.O_RDONLY, 0644)
if err != nil {
return false, err
Expand Down
6 changes: 3 additions & 3 deletions erigon-lib/downloader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func ensureCantLeaveDir(fName, root string) (string, error) {
return fName, nil
}

func BuildTorrentIfNeed(ctx context.Context, fName, root string, torrentFiles *TorrentFiles) (ok bool, err error) {
func BuildTorrentIfNeed(ctx context.Context, fName, root string, torrentFiles *AtomicTorrentFS) (ok bool, err error) {
select {
case <-ctx.Done():
return false, ctx.Err()
Expand Down Expand Up @@ -163,7 +163,7 @@ func BuildTorrentIfNeed(ctx context.Context, fName, root string, torrentFiles *T
}

// BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
func BuildTorrentFilesIfNeed(ctx context.Context, dirs datadir.Dirs, torrentFiles *TorrentFiles, chain string, ignore snapcfg.Preverified) (int, error) {
func BuildTorrentFilesIfNeed(ctx context.Context, dirs datadir.Dirs, torrentFiles *AtomicTorrentFS, chain string, ignore snapcfg.Preverified) (int, error) {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()

Expand Down Expand Up @@ -247,7 +247,7 @@ func AllTorrentPaths(dirs datadir.Dirs) ([]string, error) {
return files, nil
}

func AllTorrentSpecs(dirs datadir.Dirs, torrentFiles *TorrentFiles) (res []*torrent.TorrentSpec, err error) {
func AllTorrentSpecs(dirs datadir.Dirs, torrentFiles *AtomicTorrentFS) (res []*torrent.TorrentSpec, err error) {
files, err := AllTorrentPaths(dirs)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/downloader/webseed.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type WebSeeds struct {
logger log.Logger
verbosity log.Lvl

torrentFiles *TorrentFiles
torrentFiles *AtomicTorrentFS
}

func NewWebSeeds(seeds []*url.URL, verbosity log.Lvl, logger log.Logger) *WebSeeds {
Expand Down Expand Up @@ -97,7 +97,7 @@ func (d *WebSeeds) getWebDownloadInfo(ctx context.Context, t *torrent.Torrent) (
return infos, seedHashMismatches, nil
}

func (d *WebSeeds) SetTorrent(t *TorrentFiles, whiteList snapcfg.Preverified, downloadTorrentFile bool) {
func (d *WebSeeds) SetTorrent(t *AtomicTorrentFS, whiteList snapcfg.Preverified, downloadTorrentFile bool) {
d.downloadTorrentFile = downloadTorrentFile
d.torrentsWhitelist = whiteList
d.torrentFiles = t
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func StageSnapshotsCfg(db kv.RwDB,
cfg.snapshotUploader = &snapshotUploader{
cfg: &cfg,
uploadFs: uploadFs,
torrentFiles: downloader.NewAtomicTorrentFiles(cfg.dirs.Snap),
torrentFiles: downloader.NewAtomicTorrentFS(cfg.dirs.Snap),
}

cfg.blockRetire.SetWorkers(estimate.CompressSnapshot.Workers())
Expand Down Expand Up @@ -516,7 +516,7 @@ type snapshotUploader struct {
uploadScheduled atomic.Bool
uploading atomic.Bool
manifestMutex sync.Mutex
torrentFiles *downloader.TorrentFiles
torrentFiles *downloader.AtomicTorrentFS
}

func (u *snapshotUploader) init(ctx context.Context, logger log.Logger) {
Expand Down
Loading