Skip to content

Commit

Permalink
downloader: webseed better error messages (#8611)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Oct 30, 2023
1 parent 8f67a8c commit b311da9
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 42 deletions.
2 changes: 0 additions & 2 deletions cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net"
"os"
"path/filepath"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -178,7 +177,6 @@ func Downloader(ctx context.Context, logger log.Logger) error {
return err
}

cfg.ClientConfig.PieceHashersPerTorrent = runtime.NumCPU() * 4
cfg.ClientConfig.DisableIPv6 = disableIPV6
cfg.ClientConfig.DisableIPv4 = disableIPV4

Expand Down
3 changes: 0 additions & 3 deletions erigon-lib/chain/snapcfg/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ func maxBlockNum(preverified Preverified) uint64 {
}
onlyName := fileName[:len(fileName)-len(ext)]
parts := strings.Split(onlyName, "-")
if parts[0] != "v1" {
panic("not implemented")
}
if parts[3] != "headers" {
continue
}
Expand Down
3 changes: 2 additions & 1 deletion erigon-lib/downloader/downloadercfg/downloadercfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
lg "github.com/anacrolix/log"
"github.com/anacrolix/torrent"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/log/v3"
Expand Down Expand Up @@ -59,7 +60,7 @@ type Cfg struct {

func Default() *torrent.ClientConfig {
torrentConfig := torrent.NewDefaultClientConfig()
torrentConfig.PieceHashersPerTorrent = runtime.NumCPU()
torrentConfig.PieceHashersPerTorrent = cmp.Max(1, runtime.NumCPU()-1)

torrentConfig.MinDialTimeout = 6 * time.Second //default: 3s
torrentConfig.HandshakesTimeout = 8 * time.Second //default: 4s
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/downloader/snaptype/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func FilesWithExt(dir, expectExt string) ([]FileInfo, error) {

func IsCorrectFileName(name string) bool {
parts := strings.Split(name, "-")
return len(parts) == 4 && parts[3] != "v1"
return len(parts) == 4
}

func IsCorrectHistoryFileName(name string) bool {
Expand Down
41 changes: 19 additions & 22 deletions erigon-lib/downloader/webseed.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (d *WebSeeds) downloadWebseedTomlFromProviders(ctx context.Context, s3Provi
}
response, err := d.callHttpProvider(ctx, webSeedProviderURL)
if err != nil { // don't fail on error
d.logger.Debug("[snapshots] downloadWebseedTomlFromProviders", "err", err, "url", webSeedProviderURL.EscapedPath())
d.logger.Debug("[snapshots.webseed] get from HTTP provider", "err", err, "url", webSeedProviderURL.EscapedPath())
continue
}
list = append(list, response)
Expand All @@ -69,7 +69,7 @@ func (d *WebSeeds) downloadWebseedTomlFromProviders(ctx context.Context, s3Provi
}
response, err := d.callS3Provider(ctx, webSeedProviderURL)
if err != nil { // don't fail on error
d.logger.Debug("[snapshots] downloadWebseedTomlFromProviders", "err", err, "url", "s3")
d.logger.Debug("[snapshots.webseed] get from S3 provider", "err", err)
continue
}
list = append(list, response)
Expand All @@ -78,13 +78,9 @@ func (d *WebSeeds) downloadWebseedTomlFromProviders(ctx context.Context, s3Provi
for _, webSeedFile := range diskProviders {
response, err := d.readWebSeedsFile(webSeedFile)
if err != nil { // don't fail on error
_, fileName := filepath.Split(webSeedFile)
d.logger.Debug("[snapshots] downloadWebseedTomlFromProviders", "err", err, "file", fileName)
d.logger.Debug("[snapshots.webseed] get from File provider", "err", err)
continue
}
if len(diskProviders) > 0 {
d.logger.Log(d.verbosity, "[snapshots] see webseed.toml file", "files", webSeedFile)
}
list = append(list, response)
}

Expand Down Expand Up @@ -189,13 +185,14 @@ func (d *WebSeeds) callHttpProvider(ctx context.Context, webSeedProviderUrl *url
request = request.WithContext(ctx)
resp, err := http.DefaultClient.Do(request)
if err != nil {
return nil, err
return nil, fmt.Errorf("webseed.http: host=%s, url=%s, %w", webSeedProviderUrl.Hostname(), webSeedProviderUrl.EscapedPath(), err)
}
defer resp.Body.Close()
response := snaptype.WebSeedsFromProvider{}
if err := toml.NewDecoder(resp.Body).Decode(&response); err != nil {
return nil, err
return nil, fmt.Errorf("webseed.http: host=%s, url=%s, %w", webSeedProviderUrl.Hostname(), webSeedProviderUrl.EscapedPath(), err)
}
d.logger.Debug("[snapshots.webseed] get from HTTP provider", "urls", len(response), "host", webSeedProviderUrl.Hostname(), "url", webSeedProviderUrl.EscapedPath())
return response, nil
}
func (d *WebSeeds) callS3Provider(ctx context.Context, token string) (snaptype.WebSeedsFromProvider, error) {
Expand Down Expand Up @@ -235,13 +232,14 @@ func (d *WebSeeds) callS3Provider(ctx context.Context, token string) (snaptype.W
// }
resp, err := client.GetObject(ctx, &s3.GetObjectInput{Bucket: &bucketName, Key: &fileName})
if err != nil {
return nil, err
return nil, fmt.Errorf("webseed.s3: bucket=%s, %w", bucketName, err)
}
defer resp.Body.Close()
response := snaptype.WebSeedsFromProvider{}
if err := toml.NewDecoder(resp.Body).Decode(&response); err != nil {
return nil, err
return nil, fmt.Errorf("webseed.s3: bucket=%s, %w", bucketName, err)
}
d.logger.Debug("[snapshots.webseed] get from S3 provider", "urls", len(response), "bucket", bucketName)
return response, nil
}
func (d *WebSeeds) callTorrentHttpProvider(ctx context.Context, url *url.URL) ([]byte, error) {
Expand All @@ -252,7 +250,7 @@ func (d *WebSeeds) callTorrentHttpProvider(ctx context.Context, url *url.URL) ([
request = request.WithContext(ctx)
resp, err := http.DefaultClient.Do(request)
if err != nil {
return nil, err
return nil, fmt.Errorf("webseed.downloadTorrentFile: host=%s, url=%s, %w", url.Hostname(), url.EscapedPath(), err)
}
defer resp.Body.Close()
//protect against too small and too big data
Expand All @@ -261,28 +259,27 @@ func (d *WebSeeds) callTorrentHttpProvider(ctx context.Context, url *url.URL) ([
}
res, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
return nil, fmt.Errorf("webseed.downloadTorrentFile: host=%s, url=%s, %w", url.Hostname(), url.EscapedPath(), err)
}
if err = validateTorrentBytes(res, url.Path); err != nil {
return nil, err
if err = validateTorrentBytes(res); err != nil {
return nil, fmt.Errorf("webseed.downloadTorrentFile: host=%s, url=%s, %w", url.Hostname(), url.EscapedPath(), err)
}
return res, nil
}
func validateTorrentBytes(b []byte, url string) error {
func validateTorrentBytes(b []byte) error {
var mi metainfo.MetaInfo
if err := bencode.NewDecoder(bytes.NewBuffer(b)).Decode(&mi); err != nil {
return fmt.Errorf("invalid bytes received from url %s, err=%w", url, err)
}
return nil
return bencode.NewDecoder(bytes.NewBuffer(b)).Decode(&mi)
}
func (d *WebSeeds) readWebSeedsFile(webSeedProviderPath string) (snaptype.WebSeedsFromProvider, error) {
_, fileName := filepath.Split(webSeedProviderPath)
data, err := os.ReadFile(webSeedProviderPath)
if err != nil {
return nil, err
return nil, fmt.Errorf("webseed.readWebSeedsFile: file=%s, %w", fileName, err)
}
response := snaptype.WebSeedsFromProvider{}
if err := toml.Unmarshal(data, &response); err != nil {
return nil, err
return nil, fmt.Errorf("webseed.readWebSeedsFile: file=%s, %w", fileName, err)
}
d.logger.Debug("[snapshots.webseed] get from File provider", "urls", len(response), "file", fileName)
return response, nil
}
6 changes: 0 additions & 6 deletions turbo/app/snapshots_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
Expand Down Expand Up @@ -128,11 +127,6 @@ var (
Usage: "Do operation every N blocks",
Value: 1_000,
}
SnapshotSegmentSizeFlag = cli.Uint64Flag{
Name: "segment.size",
Usage: "Amount of blocks in each segment",
Value: snaptype.Erigon2SegmentSize,
}
SnapshotRebuildFlag = cli.BoolFlag{
Name: "rebuild",
Usage: "Force rebuild",
Expand Down
8 changes: 4 additions & 4 deletions turbo/snapshotsync/freezeblocks/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,8 +1233,8 @@ func canRetire(from, to uint64) (blockFrom, blockTo uint64, can bool) {
blockFrom = (from / 1_000) * 1_000
roundedTo1K := (to / 1_000) * 1_000
var maxJump uint64 = 1_000
if blockFrom%500_000 == 0 {
maxJump = 500_000
if blockFrom%snaptype.Erigon2SegmentSize == 0 {
maxJump = snaptype.Erigon2SegmentSize
} else if blockFrom%100_000 == 0 {
maxJump = 100_000
} else if blockFrom%10_000 == 0 {
Expand All @@ -1243,8 +1243,8 @@ func canRetire(from, to uint64) (blockFrom, blockTo uint64, can bool) {
//roundedTo1K := (to / 1_000) * 1_000
jump := cmp.Min(maxJump, roundedTo1K-blockFrom)
switch { // only next segment sizes are allowed
case jump >= 500_000:
blockTo = blockFrom + 500_000
case jump >= snaptype.Erigon2SegmentSize:
blockTo = blockFrom + snaptype.Erigon2SegmentSize
case jump >= 100_000:
blockTo = blockFrom + 100_000
case jump >= 10_000:
Expand Down
3 changes: 0 additions & 3 deletions turbo/snapshotsync/snapshotsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ func BuildProtoRequest(downloadRequest []services.DownloadRequest) *proto_downlo
})
}
} else {
if r.Ranges.To-r.Ranges.From != snaptype.Erigon2SegmentSize {
continue
}
if r.Bor {
for _, t := range []snaptype.Type{snaptype.BorEvents, snaptype.BorSpans} {
req.Items = append(req.Items, &proto_downloader.DownloadItem{
Expand Down

0 comments on commit b311da9

Please sign in to comment.