Skip to content

Commit

Permalink
Dvovk/nsync (#9324)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvovk authored Jan 27, 2024
1 parent e38c424 commit 6760e0e
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 42 deletions.
48 changes: 47 additions & 1 deletion diagnostics/diagnostic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package diagnostics
import (
"context"
"net/http"
"sync"

"github.com/ledgerwatch/erigon-lib/common"
diaglib "github.com/ledgerwatch/erigon-lib/diagnostics"
Expand All @@ -17,6 +18,7 @@ type DiagnosticClient struct {
node *node.ErigonNode

syncStats diaglib.SyncStatistics
mu sync.Mutex
}

func NewDiagnosticClient(ctx *cli.Context, metricsMux *http.ServeMux, node *node.ErigonNode) *DiagnosticClient {
Expand All @@ -31,8 +33,39 @@ func (d *DiagnosticClient) Setup() {
d.runCurrentSyncStageListener()
d.runSyncStagesListListener()
d.runBlockExecutionListener()

//d.logDiagMsgs()
}

/*func (d *DiagnosticClient) logDiagMsgs() {
ticker := time.NewTicker(20 * time.Second)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
d.logStr()
case <-quit:
ticker.Stop()
return
}
}
}()
}
func (d *DiagnosticClient) logStr() {
d.mu.Lock()
defer d.mu.Unlock()
log.Info("SyncStatistics", "stats", interfaceToJSONString(d.syncStats))
}
func interfaceToJSONString(i interface{}) string {
b, err := json.Marshal(i)
if err != nil {
return ""
}
return string(b)
}*/

func (d *DiagnosticClient) runSnapshotListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.SnapshotDownloadStatistics](context.Background(), 1)
Expand All @@ -47,6 +80,7 @@ func (d *DiagnosticClient) runSnapshotListener() {
cancel()
return
case info := <-ch:
d.mu.Lock()
d.syncStats.SnapshotDownload.Downloaded = info.Downloaded
d.syncStats.SnapshotDownload.Total = info.Total
d.syncStats.SnapshotDownload.TotalTime = info.TotalTime
Expand All @@ -59,6 +93,7 @@ func (d *DiagnosticClient) runSnapshotListener() {
d.syncStats.SnapshotDownload.Sys = info.Sys
d.syncStats.SnapshotDownload.DownloadFinished = info.DownloadFinished
d.syncStats.SnapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady
d.mu.Unlock()

if info.DownloadFinished {
return
Expand All @@ -79,19 +114,20 @@ func (d *DiagnosticClient) runSegmentDownloadingListener() {
defer cancel()

rootCtx, _ := common.RootContext()

diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SegmentDownloadStatistics{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.mu.Lock()
if d.syncStats.SnapshotDownload.SegmentsDownloading == nil {
d.syncStats.SnapshotDownload.SegmentsDownloading = map[string]diaglib.SegmentDownloadStatistics{}
}

d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info
d.mu.Unlock()
}
}
}()
Expand Down Expand Up @@ -131,6 +167,7 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener() {
cancel()
return
case info := <-ch:
d.mu.Lock()
found := false
for i := range d.syncStats.SnapshotIndexing.Segments {
if d.syncStats.SnapshotIndexing.Segments[i].SegmentName == info.SegmentName {
Expand All @@ -147,12 +184,15 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener() {
Sys: 0,
})
}
d.mu.Unlock()
}
}
}()
}

func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd diaglib.SnapshotIndexingStatistics) {
d.mu.Lock()
defer d.mu.Unlock()
if d.syncStats.SnapshotIndexing.Segments == nil {
d.syncStats.SnapshotIndexing.Segments = []diaglib.SnapshotSegmentIndexingStatistics{}
}
Expand Down Expand Up @@ -191,7 +231,9 @@ func (d *DiagnosticClient) runSyncStagesListListener() {
cancel()
return
case info := <-ch:
d.mu.Lock()
d.syncStats.SyncStages.StagesList = info.Stages
d.mu.Unlock()
return
}
}
Expand All @@ -212,10 +254,12 @@ func (d *DiagnosticClient) runCurrentSyncStageListener() {
cancel()
return
case info := <-ch:
d.mu.Lock()
d.syncStats.SyncStages.CurrentStage = info.Stage
if int(d.syncStats.SyncStages.CurrentStage) >= len(d.syncStats.SyncStages.StagesList) {
return
}
d.mu.Unlock()
}
}
}()
Expand All @@ -235,7 +279,9 @@ func (d *DiagnosticClient) runBlockExecutionListener() {
cancel()
return
case info := <-ch:
d.mu.Lock()
d.syncStats.BlockExecution = info
d.mu.Unlock()

if int(d.syncStats.SyncStages.CurrentStage) >= len(d.syncStats.SyncStages.StagesList) {
return
Expand Down
17 changes: 10 additions & 7 deletions erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,16 @@ type SnapshotDownloadStatistics struct {
}

type SegmentDownloadStatistics struct {
Name string `json:"name"`
TotalBytes uint64 `json:"totalBytes"`
DownloadedBytes uint64 `json:"downloadedBytes"`
WebseedsCount int `json:"webseedsCount"`
PeersCount int `json:"peersCount"`
WebseedsRate uint64 `json:"webseedsRate"`
PeersRate uint64 `json:"peersRate"`
Name string `json:"name"`
TotalBytes uint64 `json:"totalBytes"`
DownloadedBytes uint64 `json:"downloadedBytes"`
Webseeds []SegmentPeer `json:"webseeds"`
Peers []SegmentPeer `json:"peers"`
}

type SegmentPeer struct {
Url string `json:"url"`
DownloadRate uint64 `json:"downloadRate"`
}

type SnapshotIndexingStatistics struct {
Expand Down
62 changes: 28 additions & 34 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,27 +359,22 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
zeroProgress = append(zeroProgress, torrentName)
}

webseedRates, websRates := getWebseedsRatesForlogs(weebseedPeersOfThisFile, torrentName)
rates, peersRates := getPeersRatesForlogs(peersOfThisFile, torrentName)
webseedRates, webseeds := getWebseedsRatesForlogs(weebseedPeersOfThisFile, torrentName)
rates, peers := getPeersRatesForlogs(peersOfThisFile, torrentName)
// more detailed statistic: download rate of each peer (for each file)
if !t.Complete.Bool() && progress != 0 {
d.logger.Log(d.verbosity, "[snapshots] progress", "file", torrentName, "progress", fmt.Sprintf("%.2f%%", progress), "peers", len(peersOfThisFile), "webseeds", len(weebseedPeersOfThisFile))
d.logger.Log(d.verbosity, "[snapshots] webseed peers", webseedRates...)
d.logger.Log(d.verbosity, "[snapshots] bittorrent peers", rates...)
}

isDiagEnabled := diagnostics.TypeOf(diagnostics.SegmentDownloadStatistics{}).Enabled()
if isDiagEnabled {
diagnostics.Send(diagnostics.SegmentDownloadStatistics{
Name: torrentName,
TotalBytes: uint64(tLen),
DownloadedBytes: uint64(bytesCompleted),
WebseedsCount: len(weebseedPeersOfThisFile),
PeersCount: len(peersOfThisFile),
WebseedsRate: websRates,
PeersRate: peersRates,
})
}
diagnostics.Send(diagnostics.SegmentDownloadStatistics{
Name: torrentName,
TotalBytes: uint64(tLen),
DownloadedBytes: uint64(bytesCompleted),
Webseeds: webseeds,
Peers: peers,
})

default:
noMetadata = append(noMetadata, t.Name())
Expand Down Expand Up @@ -420,48 +415,47 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
d.stats = stats
}

func getWebseedsRatesForlogs(weebseedPeersOfThisFile []*torrent.Peer, fName string) ([]interface{}, uint64) {
totalRate := uint64(0)
averageRate := uint64(0)
func getWebseedsRatesForlogs(weebseedPeersOfThisFile []*torrent.Peer, fName string) ([]interface{}, []diagnostics.SegmentPeer) {
seeds := make([]diagnostics.SegmentPeer, 0, len(weebseedPeersOfThisFile))
webseedRates := make([]interface{}, 0, len(weebseedPeersOfThisFile)*2)
webseedRates = append(webseedRates, "file", fName)
for _, peer := range weebseedPeersOfThisFile {
urlS := strings.Trim(strings.TrimPrefix(peer.String(), "webseed peer for "), "\"")
if urlObj, err := url.Parse(urlS); err == nil {
if shortUrl, err := url.JoinPath(urlObj.Host, urlObj.Path); err == nil {
rate := uint64(peer.DownloadRate())
totalRate += rate

seed := diagnostics.SegmentPeer{
Url: urlObj.Host,
DownloadRate: rate,
}
seeds = append(seeds, seed)
webseedRates = append(webseedRates, shortUrl, fmt.Sprintf("%s/s", common.ByteCount(rate)))
}
}
}

lenght := uint64(len(weebseedPeersOfThisFile))
if lenght > 0 {
averageRate = totalRate / lenght
}

return webseedRates, averageRate
return webseedRates, seeds
}

func getPeersRatesForlogs(peersOfThisFile []*torrent.PeerConn, fName string) ([]interface{}, uint64) {
totalRate := uint64(0)
averageRate := uint64(0)
func getPeersRatesForlogs(peersOfThisFile []*torrent.PeerConn, fName string) ([]interface{}, []diagnostics.SegmentPeer) {
peers := make([]diagnostics.SegmentPeer, 0, len(peersOfThisFile))
rates := make([]interface{}, 0, len(peersOfThisFile)*2)
rates = append(rates, "file", fName)

for _, peer := range peersOfThisFile {
dr := uint64(peer.DownloadRate())
rates = append(rates, peer.PeerClientName.Load(), fmt.Sprintf("%s/s", common.ByteCount(dr)))
totalRate += dr
}
url := fmt.Sprintf("%v", peer.PeerClientName.Load())

lenght := uint64(len(peersOfThisFile))
if lenght > 0 {
averageRate = totalRate / uint64(len(peersOfThisFile))
segPeer := diagnostics.SegmentPeer{
Url: url,
DownloadRate: dr,
}
peers = append(peers, segPeer)
rates = append(rates, peer.PeerClientName.Load(), fmt.Sprintf("%s/s", common.ByteCount(dr)))
}

return rates, averageRate
return rates, peers
}

func (d *Downloader) VerifyData(ctx context.Context, whiteList []string, failFast bool) error {
Expand Down

0 comments on commit 6760e0e

Please sign in to comment.