Skip to content

Commit

Permalink
Snapshots download and seed (#3117)
Browse files Browse the repository at this point in the history
* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* Squashed 'interfaces/' content from commit e5b1945d0

git-subtree-dir: interfaces
git-subtree-split: e5b1945d02da7a7f00e2289034ee90a6edd60184

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save
  • Loading branch information
AskAlexSharov authored Dec 14, 2021
1 parent d739810 commit ecb10e8
Show file tree
Hide file tree
Showing 55 changed files with 1,643 additions and 2,184 deletions.
6 changes: 6 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@
[submodule "libmdbx"]
path = libmdbx
url = https://github.com/erthink/libmdbx
[submodule "turbo/snapshotsync/snapshothashes/erigon-snapshots"]
path = turbo/snapshotsync/snapshothashes/erigon-snapshots
url = https://github.com/ledgerwatch/erigon-snapshot.git
[submodule "cmd/downloader/trackers/trackerslist"]
path = cmd/downloader/trackers/trackerslist
url = https://github.com/ngosang/trackerslist.git
291 changes: 291 additions & 0 deletions cmd/downloader/downloader/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
package downloader

import (
"context"
"fmt"
"path/filepath"
"sync"
"time"

lg "github.com/anacrolix/log"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/dustin/go-humanize"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
"github.com/ledgerwatch/log/v3"
)

type Client struct {
Cli *torrent.Client
pieceCompletionStore storage.PieceCompletion
}

func New(snapshotsDir string, seeding bool, peerID string) (*Client, error) {
torrentConfig := DefaultTorrentConfig()
torrentConfig.Seed = seeding
torrentConfig.DataDir = snapshotsDir
torrentConfig.UpnpID = torrentConfig.UpnpID + "leecher"
torrentConfig.PeerID = peerID

progressStore, err := storage.NewBoltPieceCompletion(snapshotsDir)
if err != nil {
panic(err)
}
torrentConfig.DefaultStorage = storage.NewMMapWithCompletion(snapshotsDir, progressStore)

torrentClient, err := torrent.NewClient(torrentConfig)
if err != nil {
log.Error("Fail to start torrnet client", "err", err)
return nil, fmt.Errorf("fail to start: %w", err)
}

log.Info(fmt.Sprintf("Seeding: %t, my peerID: %x", seeding, torrentClient.PeerID()))

return &Client{
Cli: torrentClient,
pieceCompletionStore: progressStore,
}, nil
}

func DefaultTorrentConfig() *torrent.ClientConfig {
torrentConfig := torrent.NewDefaultClientConfig()
torrentConfig.ListenPort = 0
// debug
torrentConfig.Debug = false
torrentConfig.Logger = NewAdapterLogger()
torrentConfig.Logger = torrentConfig.Logger.FilterLevel(lg.Debug)

// enable dht
torrentConfig.NoDHT = true
torrentConfig.DisableTrackers = false
//torrentConfig.DisableWebtorrent = true
//torrentConfig.DisableWebseeds = true

// Increase default timeouts, because we often run on commodity networks
torrentConfig.MinDialTimeout = 6 * time.Second // default: 3sec
torrentConfig.NominalDialTimeout = 20 * time.Second // default: 20sec
torrentConfig.HandshakesTimeout = 8 * time.Second // default: 4sec

//torrentConfig.MinPeerExtensions.SetBit(peer_protocol.ExtensionBitFast, true)
return torrentConfig
}

func (cli *Client) SavePeerID(db kv.Putter) error {
return db.Put(kv.BittorrentInfo, []byte(kv.BittorrentPeerID), cli.PeerID())
}

func (cli *Client) Close() {
cli.pieceCompletionStore.Close()
cli.Cli.Close()
}

func (cli *Client) PeerID() []byte {
peerID := cli.Cli.PeerID()
return peerID[:]
}

func MainLoop(ctx context.Context, torrentClient *torrent.Client) {
interval := time.Second * 5
logEvery := time.NewTicker(interval)
defer logEvery.Stop()
for {
var prevBytesReadUsefulData, aggByteRate int64
select {
case <-ctx.Done():
return
case <-logEvery.C:
torrents := torrentClient.Torrents()
allComplete := true
gotInfo := 0
for _, t := range torrents {
select {
case <-t.GotInfo(): // all good
gotInfo++
default:
t.AllowDataUpload()
t.AllowDataDownload()
}
allComplete = allComplete && t.Complete.Bool()
}
if gotInfo < len(torrents) {
log.Info(fmt.Sprintf("[torrent] Waiting for torrents metadata: %d/%d", gotInfo, len(torrents)))
continue
}
if allComplete {
peers := map[torrent.PeerID]struct{}{}
for _, t := range torrentClient.Torrents() {
for _, peer := range t.PeerConns() {
peers[peer.PeerID] = struct{}{}
}
}
log.Info("[torrent] Seeding", "peers", len(peers), "torrents", len(torrents))
continue
}

var aggBytesCompleted, aggLen int64
//var aggCompletedPieces, aggNumPieces, aggPartialPieces int
peers := map[torrent.PeerID]*torrent.PeerConn{}

for _, t := range torrents {
stats := t.Stats()
/*
var completedPieces, partialPieces int
psrs := t.PieceStateRuns()
for _, r := range psrs {
if r.Complete {
completedPieces += r.Length
}
if r.Partial {
partialPieces += r.Length
}
}
aggCompletedPieces += completedPieces
aggPartialPieces += partialPieces
aggNumPieces = t.NumPieces()
*/

byteRate := int64(time.Second)
bytesReadUsefulData := stats.BytesReadUsefulData.Int64()
byteRate *= stats.BytesReadUsefulData.Int64() - prevBytesReadUsefulData
byteRate /= int64(interval)
aggByteRate += byteRate

prevBytesReadUsefulData = bytesReadUsefulData
aggBytesCompleted += t.BytesCompleted()
aggLen += t.Length()

for _, peer := range t.PeerConns() {
peers[peer.PeerID] = peer
}

}

line := fmt.Sprintf(
"[torrent] Downloading: %d%%, %v/s, peers: %d",
int(100*(float64(aggBytesCompleted)/float64(aggLen))),
//humanize.Bytes(uint64(aggBytesCompleted)),
// humanize.Bytes(uint64(aggLen)),
humanize.Bytes(uint64(aggByteRate)),
len(peers),
)
log.Info(line)
}
}
}

func (cli *Client) StopSeeding(hash metainfo.Hash) error {
t, ok := cli.Cli.Torrent(hash)
if !ok {
return nil
}
ch := t.Closed()
t.Drop()
<-ch
return nil
}

// AddTorrentFiles - adding .torrent files to torrentClient (and checking their hashes), if .torrent file
// added first time - pieces verification process will start (disk IO heavy) - progress
// kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again.
// Don't need call torrent.VerifyData manually
func AddTorrentFiles(ctx context.Context, snapshotsDir string, torrentClient *torrent.Client, preverifiedHashes snapshothashes.Preverified) error {
if err := ForEachTorrentFile(snapshotsDir, func(torrentFilePath string) error {
mi, err := metainfo.LoadFromFile(torrentFilePath)
if err != nil {
return err
}
mi.AnnounceList = Trackers

// skip non-preverified files
_, torrentFileName := filepath.Split(torrentFilePath)
segmentFileName := segmentFileNameFromTorrentFileName(torrentFileName)
hashString, ok := preverifiedHashes[segmentFileName]
if !ok {
return nil
}
expect := metainfo.NewHashFromHex(hashString)
if mi.HashInfoBytes() != expect {
return fmt.Errorf("file %s has unexpected hash %x, expected %x", torrentFileName, mi.HashInfoBytes(), expect)
}

if _, err = torrentClient.AddTorrent(mi); err != nil {
return err
}
return nil
}); err != nil {
return err
}

waitForChecksumVerify(ctx, torrentClient)
return nil
}

// ResolveAbsentTorrents - add hard-coded hashes (if client doesn't have) as magnet links and download everything
func ResolveAbsentTorrents(ctx context.Context, torrentClient *torrent.Client, preverifiedHashes []metainfo.Hash, snapshotDir string) error {
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
wg := &sync.WaitGroup{}
for _, infoHash := range preverifiedHashes {
if _, ok := torrentClient.Torrent(infoHash); ok {
continue
}
magnet := mi.Magnet(&infoHash, nil)
t, err := torrentClient.AddMagnet(magnet.String())
if err != nil {
return err
}
t.AllowDataDownload()
t.AllowDataUpload()

wg.Add(1)
go func(t *torrent.Torrent, infoHash metainfo.Hash) {
defer wg.Done()

select {
case <-ctx.Done():
t.Drop()
return
case <-t.GotInfo():
mi := t.Metainfo()
_ = CreateTorrentFileIfNotExists(snapshotDir, t.Info(), &mi)
}
}(t, infoHash)
}

wg.Wait()

return nil
}

func waitForChecksumVerify(ctx context.Context, torrentClient *torrent.Client) {
//TODO: tr.VerifyData() - find when to call it
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
interval := time.Second * 5
logEvery := time.NewTicker(interval)
defer logEvery.Stop()

for {
select {
case <-ctx.Done():
return
case <-logEvery.C:
var aggBytesCompleted, aggLen int64
for _, t := range torrentClient.Torrents() {
aggBytesCompleted += t.BytesCompleted()
aggLen += t.Length()
}

line := fmt.Sprintf(
"[torrent] verifying snapshots: %s/%s",
humanize.Bytes(uint64(aggBytesCompleted)),
humanize.Bytes(uint64(aggLen)),
)
log.Info(line)
}
}
}()
torrentClient.WaitAll() // wait for checksum verify
}
74 changes: 74 additions & 0 deletions cmd/downloader/downloader/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package downloader

import (
stdlog "log"
"strings"

utp "github.com/anacrolix/go-libutp"
lg "github.com/anacrolix/log"
"github.com/ledgerwatch/log/v3"
)

func init() {
lg.Default = NewAdapterLogger()
utp.Logger = stdlog.New(NullWriter(1), "", stdlog.LstdFlags)
}

func NewAdapterLogger() lg.Logger {
return lg.Logger{
LoggerImpl: lg.LoggerImpl(adapterLogger{}),
}
}

type adapterLogger struct{}

func (b adapterLogger) Log(msg lg.Msg) {
lvl, ok := msg.GetLevel()
if !ok {
lvl = lg.Info
}

switch lvl {
case lg.Debug:
log.Debug(msg.String())
case lg.Info:
str := msg.String()
if strings.Contains(str, "EOF") { // suppress useless errors
break
}

log.Info(str)
case lg.Warning:
str := msg.String()
if strings.Contains(str, "could not find offer for id") { // suppress useless errors
break
}

log.Warn(str)
case lg.Error:
str := msg.String()
if strings.Contains(str, "EOF") { // suppress useless errors
break
}

log.Error(str)
case lg.Critical:
str := msg.String()
if strings.Contains(str, "EOF") { // suppress useless errors
break
}
if strings.Contains(str, "don't want conns") { // suppress useless errors
break
}

log.Error(str)
default:
log.Warn("unknown log type", "msg", msg.String())
}
}

// NullWriter implements the io.Write interface but doesn't do anything.
type NullWriter int

// Write implements the io.Write interface but is a noop.
func (NullWriter) Write([]byte) (int, error) { return 0, nil }
Loading

0 comments on commit ecb10e8

Please sign in to comment.