Skip to content

Commit 5ddaf9a

Browse files
committed
perf(protocols): improve performance across all protocols
- Increase default queue sizes for better buffering - Eliminate unnecessary goroutines for channel cleanup - Apply optimizations consistently across all protocols Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
1 parent b93f5f3 commit 5ddaf9a

32 files changed

+1533
-153
lines changed

protocol/blockfetch/blockfetch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ type Config struct {
119119
const MaxRecvQueueSize = 512
120120

121121
// DefaultRecvQueueSize is the default receive queue size.
122-
const DefaultRecvQueueSize = 256
122+
const DefaultRecvQueueSize = 384
123123

124124
// MaxPendingMessageBytes is the maximum allowed pending message bytes (5MB).
125125
const MaxPendingMessageBytes = 5242880

protocol/blockfetch/client.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"errors"
1919
"fmt"
2020
"sync"
21+
"sync/atomic"
2122

2223
"github.com/blinklabs-io/gouroboros/cbor"
2324
"github.com/blinklabs-io/gouroboros/ledger"
@@ -36,6 +37,7 @@ type Client struct {
3637
blockUseCallback bool // Whether to use callback for blocks
3738
onceStart sync.Once // Ensures Start is only called once
3839
onceStop sync.Once // Ensures Stop is only called once
40+
started atomic.Bool // Whether the protocol has been started
3941
}
4042

4143
// NewClient creates a new Block Fetch protocol client with the given options and configuration.
@@ -93,13 +95,8 @@ func (c *Client) Start() {
9395
"protocol", ProtocolName,
9496
"connection_id", c.callbackContext.ConnectionId.String(),
9597
)
98+
c.started.Store(true)
9699
c.Protocol.Start()
97-
// Start goroutine to cleanup resources on protocol shutdown
98-
go func() {
99-
<-c.DoneChan()
100-
close(c.blockChan)
101-
close(c.startBatchResultChan)
102-
}()
103100
})
104101
}
105102

@@ -114,7 +111,22 @@ func (c *Client) Stop() error {
114111
"connection_id", c.callbackContext.ConnectionId.String(),
115112
)
116113
msg := NewMsgClientDone()
117-
err = c.SendMessage(msg)
114+
if sendErr := c.SendMessage(msg); sendErr != nil {
115+
err = sendErr
116+
}
117+
_ = c.Protocol.Stop() // Always stop to signal muxerDoneChan
118+
// Defer closing channels until protocol fully shuts down (only if started)
119+
if c.started.Load() {
120+
go func() {
121+
<-c.DoneChan()
122+
close(c.blockChan)
123+
close(c.startBatchResultChan)
124+
}()
125+
} else {
126+
// If protocol was never started, close channels immediately
127+
close(c.blockChan)
128+
close(c.startBatchResultChan)
129+
}
118130
})
119131
return err
120132
}
@@ -222,13 +234,11 @@ func (c *Client) handleStartBatch() error {
222234
"role", "client",
223235
"connection_id", c.callbackContext.ConnectionId.String(),
224236
)
225-
// Check for shutdown
226237
select {
227238
case <-c.DoneChan():
228239
return protocol.ErrProtocolShuttingDown
229-
default:
240+
case c.startBatchResultChan <- nil:
230241
}
231-
c.startBatchResultChan <- nil
232242
return nil
233243
}
234244

@@ -241,14 +251,12 @@ func (c *Client) handleNoBlocks() error {
241251
"role", "client",
242252
"connection_id", c.callbackContext.ConnectionId.String(),
243253
)
244-
// Check for shutdown
254+
err := errors.New("block(s) not found")
245255
select {
246256
case <-c.DoneChan():
247257
return protocol.ErrProtocolShuttingDown
248-
default:
258+
case c.startBatchResultChan <- err:
249259
}
250-
err := errors.New("block(s) not found")
251-
c.startBatchResultChan <- err
252260
return nil
253261
}
254262

@@ -298,7 +306,11 @@ func (c *Client) handleBlock(msgGeneric protocol.Message) error {
298306
return errors.New("received block-fetch Block message but no callback function is defined")
299307
}
300308
} else {
301-
c.blockChan <- block
309+
select {
310+
case <-c.DoneChan():
311+
return protocol.ErrProtocolShuttingDown
312+
case c.blockChan <- block:
313+
}
302314
}
303315
return nil
304316
}

protocol/blockfetch/client_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,3 +207,24 @@ func TestGetBlockNoBlocks(t *testing.T) {
207207
},
208208
)
209209
}
210+
211+
func TestClientShutdown(t *testing.T) {
212+
runTest(
213+
t,
214+
[]ouroboros_mock.ConversationEntry{
215+
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
216+
ouroboros_mock.ConversationEntryHandshakeNtNResponse,
217+
},
218+
func(t *testing.T, oConn *ouroboros.Connection) {
219+
if oConn.BlockFetch() == nil {
220+
t.Fatalf("BlockFetch client is nil")
221+
}
222+
// Start the client
223+
oConn.BlockFetch().Client.Start()
224+
// Stop the client
225+
if err := oConn.BlockFetch().Client.Stop(); err != nil {
226+
t.Fatalf("unexpected error when stopping client: %s", err)
227+
}
228+
},
229+
)
230+
}

protocol/blockfetch/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,9 @@ func (s *Server) handleClientDone() error {
176176
"connection_id", s.callbackContext.ConnectionId.String(),
177177
)
178178
// Restart protocol
179-
s.Stop()
179+
if err := s.Stop(); err != nil {
180+
return err
181+
}
180182
s.initProtocol()
181183
s.Start()
182184
return nil

protocol/chainsync/chainsync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ type Config struct {
223223
const (
224224
MaxPipelineLimit = 100 // Max pipelined requests
225225
MaxRecvQueueSize = 100 // Max receive queue size (messages)
226-
DefaultPipelineLimit = 50 // Default pipeline limit
227-
DefaultRecvQueueSize = 50 // Default queue size
226+
DefaultPipelineLimit = 75 // Default pipeline limit
227+
DefaultRecvQueueSize = 75 // Default queue size
228228
MaxPendingMessageBytes = 102400 // Max pending message bytes (100KB)
229229
)
230230

protocol/chainsync/client.go

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,8 @@ type Client struct {
3434
readyForNextBlockChan chan bool
3535
onceStart sync.Once
3636
onceStop sync.Once
37+
started atomic.Bool
3738
syncPipelinedRequestNext int
38-
39-
// waitingForCurrentTipChan will process all the requests for the current tip until the channel
40-
// is empty.
4139
//
4240
// want* only processes one request per message reply received from the server. If the message
4341
// request fails, it is the responsibility of the caller to clear the channel.
@@ -126,12 +124,8 @@ func (c *Client) Start() {
126124
"protocol", ProtocolName,
127125
"connection_id", c.callbackContext.ConnectionId.String(),
128126
)
127+
c.started.Store(true)
129128
c.Protocol.Start()
130-
// Start goroutine to cleanup resources on protocol shutdown
131-
go func() {
132-
<-c.DoneChan()
133-
close(c.readyForNextBlockChan)
134-
}()
135129
})
136130
}
137131

@@ -151,6 +145,25 @@ func (c *Client) Stop() error {
151145
if err = c.SendMessage(msg); err != nil {
152146
return
153147
}
148+
if stopErr := c.Protocol.Stop(); stopErr != nil {
149+
c.Protocol.Logger().
150+
Error("error stopping protocol",
151+
"component", "network",
152+
"protocol", ProtocolName,
153+
"connection_id", c.callbackContext.ConnectionId.String(),
154+
"error", stopErr,
155+
)
156+
}
157+
// Defer closing channel until protocol fully shuts down (only if started)
158+
if c.started.Load() {
159+
go func() {
160+
<-c.DoneChan()
161+
close(c.readyForNextBlockChan)
162+
}()
163+
} else {
164+
// If protocol was never started, close channel immediately
165+
close(c.readyForNextBlockChan)
166+
}
154167
})
155168
return err
156169
}
@@ -334,7 +347,15 @@ func (c *Client) GetAvailableBlockRange(
334347
)
335348
}
336349
start = firstBlock.point
337-
case <-c.readyForNextBlockChan:
350+
case ready, ok := <-c.readyForNextBlockChan:
351+
if !ok {
352+
// Channel closed, protocol shutting down
353+
return start, end, protocol.ErrProtocolShuttingDown
354+
}
355+
// Only proceed if ready is true
356+
if !ready {
357+
return start, end, ErrSyncCancelled
358+
}
338359
// Request the next block
339360
msg := NewMsgRequestNext()
340361
if err := c.SendMessage(msg); err != nil {
@@ -721,14 +742,22 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
721742
if callbackErr != nil {
722743
if errors.Is(callbackErr, ErrStopSyncProcess) {
723744
// Signal that we're cancelling the sync
724-
c.readyForNextBlockChan <- false
745+
select {
746+
case <-c.DoneChan():
747+
return protocol.ErrProtocolShuttingDown
748+
case c.readyForNextBlockChan <- false:
749+
}
725750
return nil
726751
} else {
727752
return callbackErr
728753
}
729754
}
730755
// Signal that we're ready for the next block
731-
c.readyForNextBlockChan <- true
756+
select {
757+
case <-c.DoneChan():
758+
return protocol.ErrProtocolShuttingDown
759+
case c.readyForNextBlockChan <- true:
760+
}
732761
return nil
733762
}
734763

@@ -752,15 +781,23 @@ func (c *Client) handleRollBackward(msgGeneric protocol.Message) error {
752781
if callbackErr := c.config.RollBackwardFunc(c.callbackContext, msgRollBackward.Point, msgRollBackward.Tip); callbackErr != nil {
753782
if errors.Is(callbackErr, ErrStopSyncProcess) {
754783
// Signal that we're cancelling the sync
755-
c.readyForNextBlockChan <- false
784+
select {
785+
case <-c.DoneChan():
786+
return protocol.ErrProtocolShuttingDown
787+
case c.readyForNextBlockChan <- false:
788+
}
756789
return nil
757790
} else {
758791
return callbackErr
759792
}
760793
}
761794
}
762795
// Signal that we're ready for the next block
763-
c.readyForNextBlockChan <- true
796+
select {
797+
case <-c.DoneChan():
798+
return protocol.ErrProtocolShuttingDown
799+
case c.readyForNextBlockChan <- true:
800+
}
764801
return nil
765802
}
766803

0 commit comments

Comments
 (0)