Skip to content

Commit

Permalink
remove global vars
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Mar 29, 2024
1 parent 5fad36e commit 455e887
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 16 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd
github.com/ipni/go-indexer-core v0.8.9
github.com/ipni/go-libipni v0.5.15
github.com/libp2p/go-libp2p v0.33.1
github.com/libp2p/go-libp2p v0.33.2
github.com/libp2p/go-msgio v0.3.0
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.12.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,8 @@ github.com/libp2p/go-libp2p v0.7.4/go.mod h1:oXsBlTLF1q7pxr+9w6lqzS1ILpyHsaBPniV
github.com/libp2p/go-libp2p v0.8.1/go.mod h1:QRNH9pwdbEBpx5DTJYg+qxcVaDMAz3Ee/qDKwXujH5o=
github.com/libp2p/go-libp2p v0.13.0/go.mod h1:pM0beYdACRfHO1WcJlp65WXyG2A6NqYM+t2DTVAJxMo=
github.com/libp2p/go-libp2p v0.14.0/go.mod h1:dsQrWLAoIn+GkHPN/U+yypizkHiB9tnv79Os+kSgQ4Q=
github.com/libp2p/go-libp2p v0.33.1 h1:tvJl9b9M6nSLBtZSXSguq+/lRhRj2oLRkyhBmQNMFLA=
github.com/libp2p/go-libp2p v0.33.1/go.mod h1:zOUTMjG4I7TXwMndNyOBn/CNtVBLlvBlnxfi+8xzx+E=
github.com/libp2p/go-libp2p v0.33.2 h1:vCdwnFxoGOXMKmaGHlDSnL4bM3fQeW8pgIa9DECnb40=
github.com/libp2p/go-libp2p v0.33.2/go.mod h1:zTeppLuCvUIkT118pFVzA8xzP/p2dJYOMApCkFh0Yww=
github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94=
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
github.com/libp2p/go-libp2p-autonat v0.1.0/go.mod h1:1tLf2yXxiE/oKGtDwPYWTSYG3PtvYlJmg7NeVtPRqH8=
Expand Down
20 changes: 9 additions & 11 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,6 @@ const (
metricsUpdateInterval = time.Minute
)

// Metrics
var (
totalNonRmAds atomic.Int64
totalRmAds atomic.Int64
workersActive atomic.Int32
)

type adProcessedEvent struct {
publisher peer.ID
// Head of the chain being processed.
Expand Down Expand Up @@ -149,6 +142,11 @@ type Ingester struct {
ingestRates *rate.Map

skip500EntsErr atomic.Bool

// metrics
totalNonRmAds atomic.Int64
totalRmAds atomic.Int64
workersActive atomic.Int32
}

// NewIngester creates a new Ingester that uses a dagsync Subscriber to handle
Expand Down Expand Up @@ -869,13 +867,13 @@ func (ing *Ingester) ingestWorker(ctx context.Context, syncFinishedEvents <-chan
continue
}

stats.Record(ctx, metrics.AdIngestActive.M(int64(workersActive.Add(1))))
stats.Record(ctx, metrics.AdIngestActive.M(int64(ing.workersActive.Add(1))))

for syncFin := ing.getNextSyncFin(pubID); syncFin != nil; syncFin = ing.getNextSyncFin(pubID) {
ing.processRawAdChain(ctx, *syncFin, wkrNum)
}

stats.Record(ctx, metrics.AdIngestActive.M(int64(workersActive.Add(-1))))
stats.Record(ctx, metrics.AdIngestActive.M(int64(ing.workersActive.Add(-1))))
case <-ctx.Done():
log.Info("ingest worker canceled")
return
Expand Down Expand Up @@ -1011,8 +1009,8 @@ func (ing *Ingester) processRawAdChain(ctx context.Context, syncFinished dagsync
log.Debugw("Created ad stack", "providers", len(adsGroupedByProvider), "ads", totalAds, "rmCount", rmCount)

stats.Record(ctx,
metrics.RemoveAdCount.M(totalRmAds.Add(rmCount)),
metrics.NonRemoveAdCount.M(totalNonRmAds.Add(nonRmCount)))
metrics.RemoveAdCount.M(ing.totalRmAds.Add(rmCount)),
metrics.NonRemoveAdCount.M(ing.totalNonRmAds.Add(nonRmCount)))

// 2. For each provider put the ad stack to the worker msg channel. Each ad
// stack contains ads for a single provider, from a single publisher.
Expand Down
4 changes: 2 additions & 2 deletions internal/ingest/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1468,7 +1468,7 @@ func TestAnnounceIsDeferredWhenProcessingAd(t *testing.T) {
// Asset that the head ad multihash is not indexed.
requireNotIndexed(t, te.ingester.indexer, te.pubHost.ID(), mhs[3:])

require.Equal(t, 1, int(workersActive.Load()))
require.Equal(t, 1, int(te.ingester.workersActive.Load()))

// Announce an ad CID and assert that call to announce is deferred since
// we have blocked the processing.
Expand Down Expand Up @@ -1531,7 +1531,7 @@ func TestAnnounceIsNotDeferredOnNoInProgressIngest(t *testing.T) {
}, testRetryTimeout, testRetryInterval)

require.Eventually(t, func() bool {
return workersActive.Load() == 0
return te.ingester.workersActive.Load() == 0
}, testRetryTimeout, testRetryInterval)
}

Expand Down

0 comments on commit 455e887

Please sign in to comment.