Skip to content

Commit 1c271c7

Browse files
committed
perf(protocols): improve performance across all protocols
- Increase default queue sizes for better buffering - Eliminate unnecessary goroutines for channel cleanup - Apply optimizations consistently across all protocols Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
1 parent b93f5f3 commit 1c271c7

File tree

27 files changed

+1405
-138
lines changed

27 files changed

+1405
-138
lines changed

protocol/blockfetch/blockfetch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ type Config struct {
119119
const MaxRecvQueueSize = 512
120120

121121
// DefaultRecvQueueSize is the default receive queue size.
122-
const DefaultRecvQueueSize = 256
122+
const DefaultRecvQueueSize = 384
123123

124124
// MaxPendingMessageBytes is the maximum allowed pending message bytes (5MB).
125125
const MaxPendingMessageBytes = 5242880

protocol/blockfetch/client.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"errors"
1919
"fmt"
2020
"sync"
21+
"sync/atomic"
2122

2223
"github.com/blinklabs-io/gouroboros/cbor"
2324
"github.com/blinklabs-io/gouroboros/ledger"
@@ -36,6 +37,7 @@ type Client struct {
3637
blockUseCallback bool // Whether to use callback for blocks
3738
onceStart sync.Once // Ensures Start is only called once
3839
onceStop sync.Once // Ensures Stop is only called once
40+
started atomic.Bool // Whether the protocol has been started
3941
}
4042

4143
// NewClient creates a new Block Fetch protocol client with the given options and configuration.
@@ -93,13 +95,8 @@ func (c *Client) Start() {
9395
"protocol", ProtocolName,
9496
"connection_id", c.callbackContext.ConnectionId.String(),
9597
)
98+
c.started.Store(true)
9699
c.Protocol.Start()
97-
// Start goroutine to cleanup resources on protocol shutdown
98-
go func() {
99-
<-c.DoneChan()
100-
close(c.blockChan)
101-
close(c.startBatchResultChan)
102-
}()
103100
})
104101
}
105102

@@ -115,6 +112,21 @@ func (c *Client) Stop() error {
115112
)
116113
msg := NewMsgClientDone()
117114
err = c.SendMessage(msg)
115+
if err == nil {
116+
c.Protocol.Stop()
117+
}
118+
// Defer closing channels until protocol fully shuts down (only if started)
119+
if c.started.Load() {
120+
go func() {
121+
<-c.DoneChan()
122+
close(c.blockChan)
123+
close(c.startBatchResultChan)
124+
}()
125+
} else {
126+
// If protocol was never started, close channels immediately
127+
close(c.blockChan)
128+
close(c.startBatchResultChan)
129+
}
118130
})
119131
return err
120132
}
@@ -222,13 +234,11 @@ func (c *Client) handleStartBatch() error {
222234
"role", "client",
223235
"connection_id", c.callbackContext.ConnectionId.String(),
224236
)
225-
// Check for shutdown
226237
select {
227238
case <-c.DoneChan():
228239
return protocol.ErrProtocolShuttingDown
229-
default:
240+
case c.startBatchResultChan <- nil:
230241
}
231-
c.startBatchResultChan <- nil
232242
return nil
233243
}
234244

@@ -241,14 +251,12 @@ func (c *Client) handleNoBlocks() error {
241251
"role", "client",
242252
"connection_id", c.callbackContext.ConnectionId.String(),
243253
)
244-
// Check for shutdown
254+
err := errors.New("block(s) not found")
245255
select {
246256
case <-c.DoneChan():
247257
return protocol.ErrProtocolShuttingDown
248-
default:
258+
case c.startBatchResultChan <- err:
249259
}
250-
err := errors.New("block(s) not found")
251-
c.startBatchResultChan <- err
252260
return nil
253261
}
254262

@@ -298,7 +306,11 @@ func (c *Client) handleBlock(msgGeneric protocol.Message) error {
298306
return errors.New("received block-fetch Block message but no callback function is defined")
299307
}
300308
} else {
301-
c.blockChan <- block
309+
select {
310+
case <-c.DoneChan():
311+
return protocol.ErrProtocolShuttingDown
312+
case c.blockChan <- block:
313+
}
302314
}
303315
return nil
304316
}

protocol/blockfetch/client_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,3 +207,24 @@ func TestGetBlockNoBlocks(t *testing.T) {
207207
},
208208
)
209209
}
210+
211+
func TestClientShutdown(t *testing.T) {
212+
runTest(
213+
t,
214+
[]ouroboros_mock.ConversationEntry{
215+
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
216+
ouroboros_mock.ConversationEntryHandshakeNtNResponse,
217+
},
218+
func(t *testing.T, oConn *ouroboros.Connection) {
219+
if oConn.BlockFetch() == nil {
220+
t.Fatalf("BlockFetch client is nil")
221+
}
222+
// Start the client
223+
oConn.BlockFetch().Client.Start()
224+
// Stop the client
225+
if err := oConn.BlockFetch().Client.Stop(); err != nil {
226+
t.Fatalf("unexpected error when stopping client: %s", err)
227+
}
228+
},
229+
)
230+
}

protocol/chainsync/chainsync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ type Config struct {
223223
const (
224224
MaxPipelineLimit = 100 // Max pipelined requests
225225
MaxRecvQueueSize = 100 // Max receive queue size (messages)
226-
DefaultPipelineLimit = 50 // Default pipeline limit
227-
DefaultRecvQueueSize = 50 // Default queue size
226+
DefaultPipelineLimit = 75 // Default pipeline limit
227+
DefaultRecvQueueSize = 75 // Default queue size
228228
MaxPendingMessageBytes = 102400 // Max pending message bytes (100KB)
229229
)
230230

protocol/chainsync/client.go

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,8 @@ type Client struct {
3434
readyForNextBlockChan chan bool
3535
onceStart sync.Once
3636
onceStop sync.Once
37+
started atomic.Bool
3738
syncPipelinedRequestNext int
38-
39-
// waitingForCurrentTipChan will process all the requests for the current tip until the channel
40-
// is empty.
4139
//
4240
// want* only processes one request per message reply received from the server. If the message
4341
// request fails, it is the responsibility of the caller to clear the channel.
@@ -126,12 +124,8 @@ func (c *Client) Start() {
126124
"protocol", ProtocolName,
127125
"connection_id", c.callbackContext.ConnectionId.String(),
128126
)
127+
c.started.Store(true)
129128
c.Protocol.Start()
130-
// Start goroutine to cleanup resources on protocol shutdown
131-
go func() {
132-
<-c.DoneChan()
133-
close(c.readyForNextBlockChan)
134-
}()
135129
})
136130
}
137131

@@ -151,6 +145,17 @@ func (c *Client) Stop() error {
151145
if err = c.SendMessage(msg); err != nil {
152146
return
153147
}
148+
c.Protocol.Stop()
149+
// Defer closing channel until protocol fully shuts down (only if started)
150+
if c.started.Load() {
151+
go func() {
152+
<-c.DoneChan()
153+
close(c.readyForNextBlockChan)
154+
}()
155+
} else {
156+
// If protocol was never started, close channel immediately
157+
close(c.readyForNextBlockChan)
158+
}
154159
})
155160
return err
156161
}
@@ -334,7 +339,15 @@ func (c *Client) GetAvailableBlockRange(
334339
)
335340
}
336341
start = firstBlock.point
337-
case <-c.readyForNextBlockChan:
342+
case ready, ok := <-c.readyForNextBlockChan:
343+
if !ok {
344+
// Channel closed, protocol shutting down
345+
return start, end, protocol.ErrProtocolShuttingDown
346+
}
347+
// Only proceed if ready is true
348+
if !ready {
349+
return start, end, ErrSyncCancelled
350+
}
338351
// Request the next block
339352
msg := NewMsgRequestNext()
340353
if err := c.SendMessage(msg); err != nil {
@@ -721,14 +734,22 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
721734
if callbackErr != nil {
722735
if errors.Is(callbackErr, ErrStopSyncProcess) {
723736
// Signal that we're cancelling the sync
724-
c.readyForNextBlockChan <- false
737+
select {
738+
case <-c.DoneChan():
739+
return protocol.ErrProtocolShuttingDown
740+
case c.readyForNextBlockChan <- false:
741+
}
725742
return nil
726743
} else {
727744
return callbackErr
728745
}
729746
}
730747
// Signal that we're ready for the next block
731-
c.readyForNextBlockChan <- true
748+
select {
749+
case <-c.DoneChan():
750+
return protocol.ErrProtocolShuttingDown
751+
case c.readyForNextBlockChan <- true:
752+
}
732753
return nil
733754
}
734755

@@ -752,15 +773,23 @@ func (c *Client) handleRollBackward(msgGeneric protocol.Message) error {
752773
if callbackErr := c.config.RollBackwardFunc(c.callbackContext, msgRollBackward.Point, msgRollBackward.Tip); callbackErr != nil {
753774
if errors.Is(callbackErr, ErrStopSyncProcess) {
754775
// Signal that we're cancelling the sync
755-
c.readyForNextBlockChan <- false
776+
select {
777+
case <-c.DoneChan():
778+
return protocol.ErrProtocolShuttingDown
779+
case c.readyForNextBlockChan <- false:
780+
}
756781
return nil
757782
} else {
758783
return callbackErr
759784
}
760785
}
761786
}
762787
// Signal that we're ready for the next block
763-
c.readyForNextBlockChan <- true
788+
select {
789+
case <-c.DoneChan():
790+
return protocol.ErrProtocolShuttingDown
791+
case c.readyForNextBlockChan <- true:
792+
}
764793
return nil
765794
}
766795

0 commit comments

Comments
 (0)