Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agreement/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/logspec"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util"
)

const (
Expand Down Expand Up @@ -113,6 +114,7 @@ func (d *demux) tokenizeMessages(ctx context.Context, net Network, tag protocol.
defer func() {
close(decoded)
}()
util.SetGoroutineLabels("tokenizeTag", string(tag))
for {
select {
case raw, ok := <-networkMessages:
Expand Down
4 changes: 2 additions & 2 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ func (n *P2PNetwork) Start() {
for i := 0; i < incomingThreads; i++ {
n.wg.Add(1)
// We pass the peersConnectivityCheckTicker.C here so that we don't need to syncronize the access to the ticker's data structure.
go n.handler.messageHandlerThread(&n.wg, n.wsPeersConnectivityCheckTicker.C, n)
go n.handler.messageHandlerThread(&n.wg, n.wsPeersConnectivityCheckTicker.C, n, "network", "P2PNetwork")
}

n.wg.Add(1)
go n.broadcaster.broadcastThread(&n.wg, n)
go n.broadcaster.broadcastThread(&n.wg, n, "network", "P2PNetwork")
n.service.DialPeersUntilTargetCount(n.config.GossipFanout)

n.wg.Add(1)
Expand Down
10 changes: 6 additions & 4 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,10 +747,10 @@ func (wn *WebsocketNetwork) Start() {
for i := 0; i < incomingThreads; i++ {
wn.wg.Add(1)
// We pass the peersConnectivityCheckTicker.C here so that we don't need to syncronize the access to the ticker's data structure.
go wn.handler.messageHandlerThread(&wn.wg, wn.peersConnectivityCheckTicker.C, wn)
go wn.handler.messageHandlerThread(&wn.wg, wn.peersConnectivityCheckTicker.C, wn, "network", "WebsocketNetwork")
}
wn.wg.Add(1)
go wn.broadcaster.broadcastThread(&wn.wg, wn)
go wn.broadcaster.broadcastThread(&wn.wg, wn, "network", "WebsocketNetwork")
if wn.prioScheme != nil {
wn.wg.Add(1)
go wn.prioWeightRefresh()
Expand Down Expand Up @@ -1129,8 +1129,9 @@ func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOf
}
}

func (wn *msgHandler) messageHandlerThread(wg *sync.WaitGroup, peersConnectivityCheckCh <-chan time.Time, net networkPeerManager) {
func (wn *msgHandler) messageHandlerThread(wg *sync.WaitGroup, peersConnectivityCheckCh <-chan time.Time, net networkPeerManager, profLabels ...string) {
defer wg.Done()
util.SetGoroutineLabels(append(profLabels, "func", "msgHandler.messageHandlerThread")...)

for {
select {
Expand Down Expand Up @@ -1231,8 +1232,9 @@ func (wn *msgHandler) sendFilterMessage(msg IncomingMessage, net networkPeerMana
}
}

func (wn *msgBroadcaster) broadcastThread(wg *sync.WaitGroup, net networkPeerManager) {
func (wn *msgBroadcaster) broadcastThread(wg *sync.WaitGroup, net networkPeerManager, profLabels ...string) {
defer wg.Done()
util.SetGoroutineLabels(append(profLabels, "func", "msgHandler.broadcastThread")...)

slowWritingPeerCheckTicker := time.NewTicker(wn.slowWritingPeerMonitorInterval)
defer slowWritingPeerCheckTicker.Stop()
Expand Down
7 changes: 4 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
}
node.net = p2pNode

node.cryptoPool = execpool.MakePool(node)
node.lowPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.LowPriority, node)
node.highPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.HighPriority, node)
node.cryptoPool = execpool.MakePool(node, "worker", "cryptoPool")
node.lowPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.LowPriority, node, "worker", "lowPriorityCryptoVerificationPool")
node.highPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.HighPriority, node, "worker", "highPriorityCryptoVerificationPool")
ledgerPaths := ledger.DirsAndPrefix{
DBFilePrefix: config.LedgerFilenamePrefix,
ResolvedGenesisDirs: node.genesisDirs,
Expand Down Expand Up @@ -1056,6 +1056,7 @@ func (node *AlgorandFullNode) OnNewBlock(block bookkeeping.Block, delta ledgerco
// don't have to delete key for each block we received.
func (node *AlgorandFullNode) oldKeyDeletionThread(done <-chan struct{}) {
defer node.monitoringRoutinesWaitGroup.Done()

for {
select {
case <-done:
Expand Down
11 changes: 7 additions & 4 deletions util/execpool/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package execpool
import (
"context"
"sync"

"github.com/algorand/go-algorand/util"
)

// A backlog for an execution pool. The typical usage of this is to
Expand Down Expand Up @@ -47,7 +49,7 @@ type BacklogPool interface {
}

// MakeBacklog creates a backlog
func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, owner interface{}) BacklogPool {
func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, owner interface{}, profLabels ...string) BacklogPool {
if backlogSize < 0 {
return nil
}
Expand All @@ -59,7 +61,7 @@ func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, own
bl.ctx, bl.ctxCancel = context.WithCancel(context.Background())
if bl.pool == nil {
// create one internally.
bl.pool = MakePool(bl)
bl.pool = MakePool(bl, append(profLabels, "execpool", "internal")...)
}
if backlogSize == 0 {
// use the number of cpus in the system.
Expand All @@ -68,7 +70,7 @@ func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, own
bl.buffer = make(chan backlogItemTask, backlogSize)

bl.wg.Add(1)
go bl.worker()
go bl.worker(profLabels)
return bl
}

Expand Down Expand Up @@ -129,10 +131,11 @@ func (b *backlog) Shutdown() {
}
}

func (b *backlog) worker() {
func (b *backlog) worker(profLabels []string) {
var t backlogItemTask
var ok bool
defer b.wg.Done()
util.SetGoroutineLabels(profLabels...)

for {

Expand Down
11 changes: 7 additions & 4 deletions util/execpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"runtime"
"sync"

"github.com/algorand/go-algorand/util"
)

// The list of all valid priority values. When adding new ones, add them before numPrios.
Expand Down Expand Up @@ -68,7 +70,7 @@ type enqueuedTask struct {
}

// MakePool creates a pool.
func MakePool(owner interface{}) ExecutionPool {
func MakePool(owner interface{}, profLabels ...string) ExecutionPool {
p := &pool{
inputs: make([]chan enqueuedTask, numPrios),
numCPUs: runtime.NumCPU(),
Expand All @@ -82,9 +84,8 @@ func MakePool(owner interface{}) ExecutionPool {

p.wg.Add(p.numCPUs)
for i := 0; i < p.numCPUs; i++ {
go p.worker()
go p.worker(profLabels)
}

return p
}

Expand Down Expand Up @@ -136,12 +137,14 @@ func (p *pool) Shutdown() {

// worker function blocks until a new task is pending on any of the channels and execute the above task.
// the implementation below would give higher priority for channels that are on higher priority slot.
func (p *pool) worker() {
func (p *pool) worker(profLabels []string) {
var t enqueuedTask
var ok bool
lowPrio := p.inputs[LowPriority]
highPrio := p.inputs[HighPriority]
defer p.wg.Done()
util.SetGoroutineLabels(profLabels...)

for {

select {
Expand Down
7 changes: 7 additions & 0 deletions util/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package util

import (
"context"
"io"
"os"
"os/exec"
"runtime/pprof"
"sync"
"time"
)
Expand Down Expand Up @@ -73,3 +75,8 @@ func ExecAndCaptureOutput(command string, args ...string) (string, string, error

return string(outputStdout), string(outputStderr), err
}

// SetGoroutineLabels sets profiler labels for identifying goroutines using the pprof package.
func SetGoroutineLabels(args ...string) {
pprof.SetGoroutineLabels(pprof.WithLabels(context.Background(), pprof.Labels(args...)))
}