Skip to content

Commit f458336

Browse files
committed
use SetGoroutineLabels
1 parent fee4854 commit f458336

File tree

10 files changed

+53
-42
lines changed

10 files changed

+53
-42
lines changed

agreement/demux.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/algorand/go-algorand/logging"
2626
"github.com/algorand/go-algorand/logging/logspec"
2727
"github.com/algorand/go-algorand/protocol"
28+
"github.com/algorand/go-algorand/util"
2829
)
2930

3031
const (
@@ -113,6 +114,7 @@ func (d *demux) tokenizeMessages(ctx context.Context, net Network, tag protocol.
113114
defer func() {
114115
close(decoded)
115116
}()
117+
util.SetGoroutineLabels("func", "demux.tokenizeMessages", "tag", string(tag))
116118
for {
117119
select {
118120
case raw, ok := <-networkMessages:

agreement/service.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ package agreement
1919
//go:generate dbgen -i agree.sql -p agreement -n agree -o agreeInstall.go -h ../scripts/LICENSE_HEADER
2020
import (
2121
"context"
22-
"runtime/pprof"
2322
"time"
2423

2524
"github.com/algorand/go-algorand/config"
2625
"github.com/algorand/go-algorand/logging"
2726
"github.com/algorand/go-algorand/protocol"
27+
"github.com/algorand/go-algorand/util"
2828
"github.com/algorand/go-algorand/util/db"
2929
"github.com/algorand/go-algorand/util/execpool"
3030
"github.com/algorand/go-algorand/util/timers"
@@ -144,12 +144,8 @@ func (s *Service) Start() {
144144
input := make(chan externalEvent)
145145
output := make(chan []action)
146146
ready := make(chan externalDemuxSignals)
147-
pprof.Do(context.Background(), pprof.Labels("worker", "agreement.demux"), func(_ context.Context) {
148-
go s.demuxLoop(ctx, input, output, ready)
149-
})
150-
pprof.Do(context.Background(), pprof.Labels("worker", "agreement.main"), func(_ context.Context) {
151-
go s.mainLoop(input, output, ready)
152-
})
147+
go s.demuxLoop(ctx, input, output, ready)
148+
go s.mainLoop(input, output, ready)
153149
}
154150

155151
// Shutdown the execution of the protocol.
@@ -164,6 +160,7 @@ func (s *Service) Shutdown() {
164160

165161
// demuxLoop repeatedly executes pending actions and then requests the next event from the Service.demux.
166162
func (s *Service) demuxLoop(ctx context.Context, input chan<- externalEvent, output <-chan []action, ready <-chan externalDemuxSignals) {
163+
util.SetGoroutineLabels("func", "agreement.demuxLoop")
167164
for a := range output {
168165
s.do(ctx, a)
169166
extSignals := <-ready
@@ -188,6 +185,7 @@ func (s *Service) demuxLoop(ctx context.Context, input chan<- externalEvent, out
188185
// 3. Drive the state machine with this input to obtain a slice of pending actions.
189186
// 4. If necessary, persist state to disk.
190187
func (s *Service) mainLoop(input <-chan externalEvent, output chan<- []action, ready chan<- externalDemuxSignals) {
188+
util.SetGoroutineLabels("func", "agreement.mainLoop")
191189
// setup
192190
var clock timers.Clock
193191
var router rootRouter

data/transactions/payset.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package transactions
1919
import (
2020
"github.com/algorand/go-algorand/crypto"
2121
"github.com/algorand/go-algorand/protocol"
22+
"github.com/algorand/msgp/msgp"
2223
)
2324

2425
type (
@@ -27,6 +28,12 @@ type (
2728
Payset []SignedTxnInBlock
2829
)
2930

31+
type (
32+
// A Payset represents a common, unforgeable, consistent, ordered set of SignedTxn objects.
33+
//msgp:allocbound Payset 100000
34+
RawPayset msgp.Raw
35+
)
36+
3037
// CommitFlat returns a commitment to the Payset, as a flat array.
3138
func (payset Payset) CommitFlat() crypto.Digest {
3239
return payset.commit(false)

data/txHandler.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"context"
2222
"fmt"
2323
"io"
24-
"runtime/pprof"
2524
"sync"
2625

2726
"github.com/algorand/go-algorand/config"
@@ -33,6 +32,7 @@ import (
3332
"github.com/algorand/go-algorand/logging"
3433
"github.com/algorand/go-algorand/network"
3534
"github.com/algorand/go-algorand/protocol"
35+
"github.com/algorand/go-algorand/util"
3636
"github.com/algorand/go-algorand/util/execpool"
3737
"github.com/algorand/go-algorand/util/metrics"
3838
)
@@ -103,9 +103,7 @@ func (handler *TxHandler) Start() {
103103
{Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)},
104104
})
105105
handler.backlogWg.Add(1)
106-
pprof.Do(context.Background(), pprof.Labels("worker", "TxHandler.backlogWorker"), func(_ context.Context) {
107-
go handler.backlogWorker()
108-
})
106+
go handler.backlogWorker()
109107
}
110108

111109
// Stop suspends the processing of incoming messages at the transaction handler
@@ -126,6 +124,7 @@ func reencode(stxns []transactions.SignedTxn) []byte {
126124
// and dispatches them further.
127125
func (handler *TxHandler) backlogWorker() {
128126
defer handler.backlogWg.Done()
127+
util.SetGoroutineLabels("func", "TxHandler.backlogWorker")
129128
for {
130129
// prioritize the postVerificationQueue
131130
select {

ledger/notifier.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ package ledger
1919
import (
2020
"context"
2121
"database/sql"
22-
"runtime/pprof"
2322
"sync"
2423

2524
"github.com/algorand/go-deadlock"
2625

2726
"github.com/algorand/go-algorand/data/basics"
2827
"github.com/algorand/go-algorand/data/bookkeeping"
2928
"github.com/algorand/go-algorand/ledger/ledgercore"
29+
"github.com/algorand/go-algorand/util"
3030
)
3131

3232
// BlockListener represents an object that needs to get notified on new blocks.
@@ -51,6 +51,7 @@ type blockNotifier struct {
5151

5252
func (bn *blockNotifier) worker() {
5353
defer bn.closing.Done()
54+
util.SetGoroutineLabels("func", "blockNotifier.worker")
5455
bn.mu.Lock()
5556

5657
for {
@@ -93,9 +94,7 @@ func (bn *blockNotifier) loadFromDisk(l ledgerForTracker, _ basics.Round) error
9394
bn.running = true
9495
bn.pendingBlocks = nil
9596
bn.closing.Add(1)
96-
pprof.Do(context.Background(), pprof.Labels("worker", "blockNotifier"), func(_ context.Context) {
97-
go bn.worker()
98-
})
97+
go bn.worker()
9998
return nil
10099
}
101100

network/wsNetwork.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"path"
3131
"regexp"
3232
"runtime"
33-
"runtime/pprof"
3433
"strconv"
3534
"strings"
3635
"sync"
@@ -812,9 +811,7 @@ func (wn *WebsocketNetwork) Start() {
812811
}
813812
if wn.listener != nil {
814813
wn.wg.Add(1)
815-
pprof.Do(context.Background(), pprof.Labels("worker", "httpdThread"), func(ctx context.Context) {
816-
go wn.httpdThread()
817-
})
814+
go wn.httpdThread()
818815
}
819816
wn.wg.Add(1)
820817
go wn.meshThread()
@@ -824,13 +821,11 @@ func (wn *WebsocketNetwork) Start() {
824821
wn.peersConnectivityCheckTicker.Stop()
825822
}
826823
wn.peersConnectivityCheckTicker = time.NewTicker(connectionActivityMonitorInterval)
827-
pprof.Do(context.Background(), pprof.Labels("worker", "messageHandlerThread"), func(_ context.Context) {
828-
for i := 0; i < incomingThreads; i++ {
829-
wn.wg.Add(1)
830-
// We pass the peersConnectivityCheckTicker.C here so that we don't need to syncronize the access to the ticker's data structure.
831-
go wn.messageHandlerThread(wn.peersConnectivityCheckTicker.C)
832-
}
833-
})
824+
for i := 0; i < incomingThreads; i++ {
825+
wn.wg.Add(1)
826+
// We pass the peersConnectivityCheckTicker.C here so that we don't need to syncronize the access to the ticker's data structure.
827+
go wn.messageHandlerThread(wn.peersConnectivityCheckTicker.C)
828+
}
834829
wn.wg.Add(1)
835830
go wn.broadcastThread()
836831
if wn.prioScheme != nil {
@@ -845,6 +840,7 @@ func (wn *WebsocketNetwork) Start() {
845840

846841
func (wn *WebsocketNetwork) httpdThread() {
847842
defer wn.wg.Done()
843+
util.SetGoroutineLabels("func", "network.httpdThread")
848844
var err error
849845
if wn.config.TLSCertFile != "" && wn.config.TLSKeyFile != "" {
850846
err = wn.server.ServeTLS(wn.listener, wn.config.TLSCertFile, wn.config.TLSKeyFile)
@@ -1190,6 +1186,7 @@ func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOf
11901186

11911187
func (wn *WebsocketNetwork) messageHandlerThread(peersConnectivityCheckCh <-chan time.Time) {
11921188
defer wn.wg.Done()
1189+
util.SetGoroutineLabels("func", "network.messageHandlerThread")
11931190

11941191
for {
11951192
select {

node/node.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"fmt"
2424
"os"
2525
"path/filepath"
26-
"runtime/pprof"
2726
"strings"
2827
"sync"
2928
"time"
@@ -50,6 +49,7 @@ import (
5049
"github.com/algorand/go-algorand/protocol"
5150
"github.com/algorand/go-algorand/rpcs"
5251
"github.com/algorand/go-algorand/stateproof"
52+
"github.com/algorand/go-algorand/util"
5353
"github.com/algorand/go-algorand/util/db"
5454
"github.com/algorand/go-algorand/util/execpool"
5555
"github.com/algorand/go-algorand/util/metrics"
@@ -384,9 +384,7 @@ func (node *AlgorandFullNode) startMonitoringRoutines() {
384384
node.monitoringRoutinesWaitGroup.Add(2)
385385
go node.txPoolGaugeThread(node.ctx.Done())
386386
// Delete old participation keys
387-
pprof.Do(context.Background(), pprof.Labels("worker", "oldKeyDeletionThread"), func(_ context.Context) {
388-
go node.oldKeyDeletionThread(node.ctx.Done())
389-
})
387+
go node.oldKeyDeletionThread(node.ctx.Done())
390388
// TODO re-enable with configuration flag post V1
391389
//go logging.UsageLogThread(node.ctx, node.log, 100*time.Millisecond, nil)
392390
}
@@ -990,6 +988,7 @@ var txPoolGuage = metrics.MakeGauge(metrics.MetricName{Name: "algod_tx_pool_coun
990988

991989
func (node *AlgorandFullNode) txPoolGaugeThread(done <-chan struct{}) {
992990
defer node.monitoringRoutinesWaitGroup.Done()
991+
util.SetGoroutineLabels("func", "node.txPoolGaugeThread")
993992
ticker := time.NewTicker(10 * time.Second)
994993
defer ticker.Stop()
995994
for true {
@@ -1029,6 +1028,8 @@ func (node *AlgorandFullNode) OnNewBlock(block bookkeeping.Block, delta ledgerco
10291028
// don't have to delete key for each block we received.
10301029
func (node *AlgorandFullNode) oldKeyDeletionThread(done <-chan struct{}) {
10311030
defer node.monitoringRoutinesWaitGroup.Done()
1031+
util.SetGoroutineLabels("func", "node.oldKeyDeletionThread")
1032+
10321033
for {
10331034
select {
10341035
case <-done:

util/execpool/backlog.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ package execpool
1818

1919
import (
2020
"context"
21-
"runtime/pprof"
2221
"sync"
22+
23+
"github.com/algorand/go-algorand/util"
2324
)
2425

2526
// A backlog for an execution pool. The typical usage of this is to
@@ -68,9 +69,7 @@ func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, own
6869
bl.buffer = make(chan backlogItemTask, backlogSize)
6970

7071
bl.wg.Add(1)
71-
pprof.Do(context.Background(), pprof.Labels(profLabels...), func(_ context.Context) {
72-
go bl.worker()
73-
})
72+
go bl.worker(profLabels)
7473
return bl
7574
}
7675

@@ -126,10 +125,11 @@ func (b *backlog) Shutdown() {
126125
}
127126
}
128127

129-
func (b *backlog) worker() {
128+
func (b *backlog) worker(profLabels []string) {
130129
var t backlogItemTask
131130
var ok bool
132131
defer b.wg.Done()
132+
util.SetGoroutineLabels(profLabels...)
133133

134134
for {
135135

util/execpool/pool.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ package execpool
1919
import (
2020
"context"
2121
"runtime"
22-
"runtime/pprof"
2322
"sync"
23+
24+
"github.com/algorand/go-algorand/util"
2425
)
2526

2627
// The list of all valid priority values. When adding new ones, add them before numPrios.
@@ -82,11 +83,9 @@ func MakePool(owner interface{}, profLabels ...string) ExecutionPool {
8283
}
8384

8485
p.wg.Add(p.numCPUs)
85-
pprof.Do(context.Background(), pprof.Labels(profLabels...), func(_ context.Context) {
86-
for i := 0; i < p.numCPUs; i++ {
87-
go p.worker()
88-
}
89-
})
86+
for i := 0; i < p.numCPUs; i++ {
87+
go p.worker(profLabels)
88+
}
9089
return p
9190
}
9291

@@ -138,12 +137,14 @@ func (p *pool) Shutdown() {
138137

139138
// worker function blocks until a new task is pending on any of the channels and execute the above task.
140139
// the implementation below would give higher priority for channels that are on higher priority slot.
141-
func (p *pool) worker() {
140+
func (p *pool) worker(profLabels []string) {
142141
var t enqueuedTask
143142
var ok bool
144143
lowPrio := p.inputs[LowPriority]
145144
highPrio := p.inputs[HighPriority]
146145
defer p.wg.Done()
146+
util.SetGoroutineLabels(profLabels...)
147+
147148
for {
148149

149150
select {

util/util.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
package util
2121

2222
import (
23+
"context"
2324
"fmt"
25+
"runtime/pprof"
2426
"syscall"
2527
)
2628

@@ -62,3 +64,8 @@ func GetCurrentProcessTimes() (utime int64, stime int64, err error) {
6264
}
6365
return
6466
}
67+
68+
// SetGoroutineLabels sets profiler labels for identifying goroutines using the pprof package.
69+
func SetGoroutineLabels(args ...string) {
70+
pprof.SetGoroutineLabels(pprof.WithLabels(context.Background(), pprof.Labels(args...)))
71+
}

0 commit comments

Comments
 (0)