Skip to content

Commit 1d4e741

Browse files
committed
perf(protocols): improve performance across all protocols
- Increase default queue sizes for better buffering - Eliminate unnecessary goroutines for channel cleanup - Apply optimizations consistently across all protocols Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
1 parent fd01888 commit 1d4e741

32 files changed

+1784
-174
lines changed

protocol/blockfetch/client.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"errors"
1919
"fmt"
2020
"sync"
21+
"sync/atomic"
2122

2223
"github.com/blinklabs-io/gouroboros/cbor"
2324
"github.com/blinklabs-io/gouroboros/ledger"
@@ -36,6 +37,7 @@ type Client struct {
3637
blockUseCallback bool // Whether to use callback for blocks
3738
onceStart sync.Once // Ensures Start is only called once
3839
onceStop sync.Once // Ensures Stop is only called once
40+
started atomic.Bool // Whether the protocol has been started
3941
}
4042

4143
// NewClient creates a new Block Fetch protocol client with the given options and configuration.
@@ -93,13 +95,8 @@ func (c *Client) Start() {
9395
"protocol", ProtocolName,
9496
"connection_id", c.callbackContext.ConnectionId.String(),
9597
)
98+
c.started.Store(true)
9699
c.Protocol.Start()
97-
// Start goroutine to cleanup resources on protocol shutdown
98-
go func() {
99-
<-c.DoneChan()
100-
close(c.blockChan)
101-
close(c.startBatchResultChan)
102-
}()
103100
})
104101
}
105102

@@ -114,7 +111,22 @@ func (c *Client) Stop() error {
114111
"connection_id", c.callbackContext.ConnectionId.String(),
115112
)
116113
msg := NewMsgClientDone()
117-
err = c.SendMessage(msg)
114+
if sendErr := c.SendMessage(msg); sendErr != nil {
115+
err = sendErr
116+
}
117+
_ = c.Protocol.Stop() // Always stop to signal muxerDoneChan
118+
// Defer closing channels until protocol fully shuts down (only if started)
119+
if c.started.Load() {
120+
go func() {
121+
<-c.DoneChan()
122+
close(c.blockChan)
123+
close(c.startBatchResultChan)
124+
}()
125+
} else {
126+
// If protocol was never started, close channels immediately
127+
close(c.blockChan)
128+
close(c.startBatchResultChan)
129+
}
118130
})
119131
return err
120132
}
@@ -222,13 +234,11 @@ func (c *Client) handleStartBatch() error {
222234
"role", "client",
223235
"connection_id", c.callbackContext.ConnectionId.String(),
224236
)
225-
// Check for shutdown
226237
select {
227238
case <-c.DoneChan():
228239
return protocol.ErrProtocolShuttingDown
229-
default:
240+
case c.startBatchResultChan <- nil:
230241
}
231-
c.startBatchResultChan <- nil
232242
return nil
233243
}
234244

@@ -241,14 +251,12 @@ func (c *Client) handleNoBlocks() error {
241251
"role", "client",
242252
"connection_id", c.callbackContext.ConnectionId.String(),
243253
)
244-
// Check for shutdown
254+
err := errors.New("block(s) not found")
245255
select {
246256
case <-c.DoneChan():
247257
return protocol.ErrProtocolShuttingDown
248-
default:
258+
case c.startBatchResultChan <- err:
249259
}
250-
err := errors.New("block(s) not found")
251-
c.startBatchResultChan <- err
252260
return nil
253261
}
254262

@@ -298,7 +306,11 @@ func (c *Client) handleBlock(msgGeneric protocol.Message) error {
298306
return errors.New("received block-fetch Block message but no callback function is defined")
299307
}
300308
} else {
301-
c.blockChan <- block
309+
select {
310+
case <-c.DoneChan():
311+
return protocol.ErrProtocolShuttingDown
312+
case c.blockChan <- block:
313+
}
302314
}
303315
return nil
304316
}

protocol/blockfetch/client_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,3 +207,24 @@ func TestGetBlockNoBlocks(t *testing.T) {
207207
},
208208
)
209209
}
210+
211+
func TestClientShutdown(t *testing.T) {
212+
runTest(
213+
t,
214+
[]ouroboros_mock.ConversationEntry{
215+
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
216+
ouroboros_mock.ConversationEntryHandshakeNtNResponse,
217+
},
218+
func(t *testing.T, oConn *ouroboros.Connection) {
219+
if oConn.BlockFetch() == nil {
220+
t.Fatalf("BlockFetch client is nil")
221+
}
222+
// Start the client
223+
oConn.BlockFetch().Client.Start()
224+
// Stop the client
225+
if err := oConn.BlockFetch().Client.Stop(); err != nil {
226+
t.Fatalf("unexpected error when stopping client: %s", err)
227+
}
228+
},
229+
)
230+
}

protocol/blockfetch/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,9 @@ func (s *Server) handleClientDone() error {
176176
"connection_id", s.callbackContext.ConnectionId.String(),
177177
)
178178
// Restart protocol
179-
s.Stop()
179+
if err := s.Stop(); err != nil {
180+
return err
181+
}
180182
s.initProtocol()
181183
s.Start()
182184
return nil

protocol/chainsync/client.go

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,9 @@ type Client struct {
3434
readyForNextBlockChan chan bool
3535
onceStart sync.Once
3636
onceStop sync.Once
37+
started atomic.Bool
38+
stopped atomic.Bool // prevents Start() after Stop()
3739
syncPipelinedRequestNext int
38-
39-
// waitingForCurrentTipChan will process all the requests for the current tip until the channel
40-
// is empty.
4140
//
4241
// want* only processes one request per message reply received from the server. If the message
4342
// request fails, it is the responsibility of the caller to clear the channel.
@@ -120,25 +119,25 @@ func NewClient(
120119

121120
func (c *Client) Start() {
122121
c.onceStart.Do(func() {
122+
if c.stopped.Load() {
123+
return
124+
}
123125
c.Protocol.Logger().
124126
Debug("starting client protocol",
125127
"component", "network",
126128
"protocol", ProtocolName,
127129
"connection_id", c.callbackContext.ConnectionId.String(),
128130
)
131+
c.started.Store(true)
129132
c.Protocol.Start()
130-
// Start goroutine to cleanup resources on protocol shutdown
131-
go func() {
132-
<-c.DoneChan()
133-
close(c.readyForNextBlockChan)
134-
}()
135133
})
136134
}
137135

138136
// Stop transitions the protocol to the Done state. No more protocol operations will be possible afterward
139137
func (c *Client) Stop() error {
140138
var err error
141139
c.onceStop.Do(func() {
140+
c.stopped.Store(true)
142141
c.Protocol.Logger().
143142
Debug("stopping client protocol",
144143
"component", "network",
@@ -148,8 +147,30 @@ func (c *Client) Stop() error {
148147
c.busyMutex.Lock()
149148
defer c.busyMutex.Unlock()
150149
msg := NewMsgDone()
151-
if err = c.SendMessage(msg); err != nil {
152-
return
150+
if c.started.Load() {
151+
if sendErr := c.SendMessage(msg); sendErr != nil {
152+
err = sendErr
153+
// Still proceed to stopping the protocol
154+
}
155+
}
156+
if stopErr := c.Protocol.Stop(); stopErr != nil {
157+
c.Protocol.Logger().
158+
Error("error stopping protocol",
159+
"component", "network",
160+
"protocol", ProtocolName,
161+
"connection_id", c.callbackContext.ConnectionId.String(),
162+
"error", stopErr,
163+
)
164+
}
165+
// Defer closing channel until protocol fully shuts down (only if started)
166+
if c.started.Load() {
167+
go func() {
168+
<-c.DoneChan()
169+
close(c.readyForNextBlockChan)
170+
}()
171+
} else {
172+
// If protocol was never started, close channel immediately
173+
close(c.readyForNextBlockChan)
153174
}
154175
})
155176
return err
@@ -334,7 +355,15 @@ func (c *Client) GetAvailableBlockRange(
334355
)
335356
}
336357
start = firstBlock.point
337-
case <-c.readyForNextBlockChan:
358+
case ready, ok := <-c.readyForNextBlockChan:
359+
if !ok {
360+
// Channel closed, protocol shutting down
361+
return start, end, protocol.ErrProtocolShuttingDown
362+
}
363+
// Only proceed if ready is true
364+
if !ready {
365+
return start, end, ErrSyncCancelled
366+
}
338367
// Request the next block
339368
msg := NewMsgRequestNext()
340369
if err := c.SendMessage(msg); err != nil {
@@ -721,14 +750,22 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
721750
if callbackErr != nil {
722751
if errors.Is(callbackErr, ErrStopSyncProcess) {
723752
// Signal that we're cancelling the sync
724-
c.readyForNextBlockChan <- false
753+
select {
754+
case <-c.DoneChan():
755+
return protocol.ErrProtocolShuttingDown
756+
case c.readyForNextBlockChan <- false:
757+
}
725758
return nil
726759
} else {
727760
return callbackErr
728761
}
729762
}
730763
// Signal that we're ready for the next block
731-
c.readyForNextBlockChan <- true
764+
select {
765+
case <-c.DoneChan():
766+
return protocol.ErrProtocolShuttingDown
767+
case c.readyForNextBlockChan <- true:
768+
}
732769
return nil
733770
}
734771

@@ -752,15 +789,23 @@ func (c *Client) handleRollBackward(msgGeneric protocol.Message) error {
752789
if callbackErr := c.config.RollBackwardFunc(c.callbackContext, msgRollBackward.Point, msgRollBackward.Tip); callbackErr != nil {
753790
if errors.Is(callbackErr, ErrStopSyncProcess) {
754791
// Signal that we're cancelling the sync
755-
c.readyForNextBlockChan <- false
792+
select {
793+
case <-c.DoneChan():
794+
return protocol.ErrProtocolShuttingDown
795+
case c.readyForNextBlockChan <- false:
796+
}
756797
return nil
757798
} else {
758799
return callbackErr
759800
}
760801
}
761802
}
762803
// Signal that we're ready for the next block
763-
c.readyForNextBlockChan <- true
804+
select {
805+
case <-c.DoneChan():
806+
return protocol.ErrProtocolShuttingDown
807+
case c.readyForNextBlockChan <- true:
808+
}
764809
return nil
765810
}
766811

0 commit comments

Comments
 (0)