Skip to content

Commit e999895

Browse files
committed
perf(protocols): improve performance across all protocols
- Increase default queue sizes for better buffering - Eliminate unnecessary goroutines for channel cleanup - Apply optimizations consistently across all protocols Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
1 parent b93f5f3 commit e999895

File tree

24 files changed

+1060
-137
lines changed

24 files changed

+1060
-137
lines changed

protocol/blockfetch/blockfetch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ type Config struct {
119119
const MaxRecvQueueSize = 512
120120

121121
// DefaultRecvQueueSize is the default receive queue size.
122-
const DefaultRecvQueueSize = 256
122+
const DefaultRecvQueueSize = 384
123123

124124
// MaxPendingMessageBytes is the maximum allowed pending message bytes (5MB).
125125
const MaxPendingMessageBytes = 5242880

protocol/blockfetch/client.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"errors"
1919
"fmt"
2020
"sync"
21+
"sync/atomic"
2122

2223
"github.com/blinklabs-io/gouroboros/cbor"
2324
"github.com/blinklabs-io/gouroboros/ledger"
@@ -36,6 +37,7 @@ type Client struct {
3637
blockUseCallback bool // Whether to use callback for blocks
3738
onceStart sync.Once // Ensures Start is only called once
3839
onceStop sync.Once // Ensures Stop is only called once
40+
started atomic.Bool // Whether the protocol has been started
3941
}
4042

4143
// NewClient creates a new Block Fetch protocol client with the given options and configuration.
@@ -93,13 +95,8 @@ func (c *Client) Start() {
9395
"protocol", ProtocolName,
9496
"connection_id", c.callbackContext.ConnectionId.String(),
9597
)
98+
c.started.Store(true)
9699
c.Protocol.Start()
97-
// Start goroutine to cleanup resources on protocol shutdown
98-
go func() {
99-
<-c.DoneChan()
100-
close(c.blockChan)
101-
close(c.startBatchResultChan)
102-
}()
103100
})
104101
}
105102

@@ -115,6 +112,18 @@ func (c *Client) Stop() error {
115112
)
116113
msg := NewMsgClientDone()
117114
err = c.SendMessage(msg)
115+
// Defer closing channels until protocol fully shuts down (only if started)
116+
if c.started.Load() {
117+
go func() {
118+
<-c.DoneChan()
119+
close(c.blockChan)
120+
close(c.startBatchResultChan)
121+
}()
122+
} else {
123+
// If protocol was never started, close channels immediately
124+
close(c.blockChan)
125+
close(c.startBatchResultChan)
126+
}
118127
})
119128
return err
120129
}
@@ -222,13 +231,11 @@ func (c *Client) handleStartBatch() error {
222231
"role", "client",
223232
"connection_id", c.callbackContext.ConnectionId.String(),
224233
)
225-
// Check for shutdown
226234
select {
227235
case <-c.DoneChan():
228236
return protocol.ErrProtocolShuttingDown
229-
default:
237+
case c.startBatchResultChan <- nil:
230238
}
231-
c.startBatchResultChan <- nil
232239
return nil
233240
}
234241

@@ -241,14 +248,12 @@ func (c *Client) handleNoBlocks() error {
241248
"role", "client",
242249
"connection_id", c.callbackContext.ConnectionId.String(),
243250
)
244-
// Check for shutdown
251+
err := errors.New("block(s) not found")
245252
select {
246253
case <-c.DoneChan():
247254
return protocol.ErrProtocolShuttingDown
248-
default:
255+
case c.startBatchResultChan <- err:
249256
}
250-
err := errors.New("block(s) not found")
251-
c.startBatchResultChan <- err
252257
return nil
253258
}
254259

@@ -298,7 +303,11 @@ func (c *Client) handleBlock(msgGeneric protocol.Message) error {
298303
return errors.New("received block-fetch Block message but no callback function is defined")
299304
}
300305
} else {
301-
c.blockChan <- block
306+
select {
307+
case <-c.DoneChan():
308+
return protocol.ErrProtocolShuttingDown
309+
case c.blockChan <- block:
310+
}
302311
}
303312
return nil
304313
}

protocol/blockfetch/client_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,3 +207,58 @@ func TestGetBlockNoBlocks(t *testing.T) {
207207
},
208208
)
209209
}
210+
211+
func TestClientShutdown(t *testing.T) {
212+
defer goleak.VerifyNone(t)
213+
mockConn := ouroboros_mock.NewConnection(
214+
ouroboros_mock.ProtocolRoleClient,
215+
[]ouroboros_mock.ConversationEntry{
216+
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
217+
ouroboros_mock.ConversationEntryHandshakeNtNResponse,
218+
},
219+
)
220+
asyncErrChan := make(chan error, 1)
221+
go func() {
222+
err := <-mockConn.(*ouroboros_mock.Connection).ErrorChan()
223+
if err != nil {
224+
asyncErrChan <- fmt.Errorf("received unexpected error: %w", err)
225+
}
226+
close(asyncErrChan)
227+
}()
228+
oConn, err := ouroboros.New(
229+
ouroboros.WithConnection(mockConn),
230+
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic),
231+
ouroboros.WithNodeToNode(true),
232+
)
233+
if err != nil {
234+
t.Fatalf("unexpected error when creating Ouroboros object: %s", err)
235+
}
236+
if oConn.BlockFetch() == nil {
237+
t.Fatalf("BlockFetch client is nil")
238+
}
239+
// Start the client
240+
oConn.BlockFetch().Client.Start()
241+
// Stop the client
242+
if err := oConn.BlockFetch().Client.Stop(); err != nil {
243+
t.Fatalf("unexpected error when stopping client: %s", err)
244+
}
245+
// Wait for mock connection shutdown
246+
select {
247+
case err, ok := <-asyncErrChan:
248+
if ok {
249+
t.Fatal(err.Error())
250+
}
251+
case <-time.After(2 * time.Second):
252+
t.Fatalf("did not complete within timeout")
253+
}
254+
// Close Ouroboros connection
255+
if err := oConn.Close(); err != nil {
256+
t.Fatalf("unexpected error when closing Ouroboros object: %s", err)
257+
}
258+
// Wait for connection shutdown
259+
select {
260+
case <-oConn.ErrorChan():
261+
case <-time.After(10 * time.Second):
262+
t.Errorf("did not shutdown within timeout")
263+
}
264+
}

protocol/chainsync/chainsync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ type Config struct {
223223
const (
224224
MaxPipelineLimit = 100 // Max pipelined requests
225225
MaxRecvQueueSize = 100 // Max receive queue size (messages)
226-
DefaultPipelineLimit = 50 // Default pipeline limit
227-
DefaultRecvQueueSize = 50 // Default queue size
226+
DefaultPipelineLimit = 75 // Default pipeline limit
227+
DefaultRecvQueueSize = 75 // Default queue size
228228
MaxPendingMessageBytes = 102400 // Max pending message bytes (100KB)
229229
)
230230

protocol/chainsync/client.go

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,8 @@ type Client struct {
3434
readyForNextBlockChan chan bool
3535
onceStart sync.Once
3636
onceStop sync.Once
37+
started atomic.Bool
3738
syncPipelinedRequestNext int
38-
39-
// waitingForCurrentTipChan will process all the requests for the current tip until the channel
40-
// is empty.
4139
//
4240
// want* only processes one request per message reply received from the server. If the message
4341
// request fails, it is the responsibility of the caller to clear the channel.
@@ -126,12 +124,8 @@ func (c *Client) Start() {
126124
"protocol", ProtocolName,
127125
"connection_id", c.callbackContext.ConnectionId.String(),
128126
)
127+
c.started.Store(true)
129128
c.Protocol.Start()
130-
// Start goroutine to cleanup resources on protocol shutdown
131-
go func() {
132-
<-c.DoneChan()
133-
close(c.readyForNextBlockChan)
134-
}()
135129
})
136130
}
137131

@@ -151,6 +145,16 @@ func (c *Client) Stop() error {
151145
if err = c.SendMessage(msg); err != nil {
152146
return
153147
}
148+
// Defer closing channel until protocol fully shuts down (only if started)
149+
if c.started.Load() {
150+
go func() {
151+
<-c.DoneChan()
152+
close(c.readyForNextBlockChan)
153+
}()
154+
} else {
155+
// If protocol was never started, close channel immediately
156+
close(c.readyForNextBlockChan)
157+
}
154158
})
155159
return err
156160
}
@@ -334,7 +338,15 @@ func (c *Client) GetAvailableBlockRange(
334338
)
335339
}
336340
start = firstBlock.point
337-
case <-c.readyForNextBlockChan:
341+
case ready, ok := <-c.readyForNextBlockChan:
342+
if !ok {
343+
// Channel closed, protocol shutting down
344+
return start, end, protocol.ErrProtocolShuttingDown
345+
}
346+
// Only proceed if ready is true
347+
if !ready {
348+
return start, end, ErrSyncCancelled
349+
}
338350
// Request the next block
339351
msg := NewMsgRequestNext()
340352
if err := c.SendMessage(msg); err != nil {
@@ -721,14 +733,22 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
721733
if callbackErr != nil {
722734
if errors.Is(callbackErr, ErrStopSyncProcess) {
723735
// Signal that we're cancelling the sync
724-
c.readyForNextBlockChan <- false
736+
select {
737+
case <-c.DoneChan():
738+
return protocol.ErrProtocolShuttingDown
739+
case c.readyForNextBlockChan <- false:
740+
}
725741
return nil
726742
} else {
727743
return callbackErr
728744
}
729745
}
730746
// Signal that we're ready for the next block
731-
c.readyForNextBlockChan <- true
747+
select {
748+
case <-c.DoneChan():
749+
return protocol.ErrProtocolShuttingDown
750+
case c.readyForNextBlockChan <- true:
751+
}
732752
return nil
733753
}
734754

@@ -752,15 +772,23 @@ func (c *Client) handleRollBackward(msgGeneric protocol.Message) error {
752772
if callbackErr := c.config.RollBackwardFunc(c.callbackContext, msgRollBackward.Point, msgRollBackward.Tip); callbackErr != nil {
753773
if errors.Is(callbackErr, ErrStopSyncProcess) {
754774
// Signal that we're cancelling the sync
755-
c.readyForNextBlockChan <- false
775+
select {
776+
case <-c.DoneChan():
777+
return protocol.ErrProtocolShuttingDown
778+
case c.readyForNextBlockChan <- false:
779+
}
756780
return nil
757781
} else {
758782
return callbackErr
759783
}
760784
}
761785
}
762786
// Signal that we're ready for the next block
763-
c.readyForNextBlockChan <- true
787+
select {
788+
case <-c.DoneChan():
789+
return protocol.ErrProtocolShuttingDown
790+
case c.readyForNextBlockChan <- true:
791+
}
764792
return nil
765793
}
766794

protocol/chainsync/client_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ func runTest(
8080
}()
8181
// Run test inner function
8282
innerFunc(t, oConn)
83+
// Stop the client to clean up goroutines
84+
if client := oConn.ChainSync().Client; client != nil {
85+
client.Stop()
86+
}
8387
// Wait for mock connection shutdown
8488
select {
8589
case err, ok := <-asyncErrChan:
@@ -275,3 +279,58 @@ func TestGetAvailableBlockRange(t *testing.T) {
275279
},
276280
)
277281
}
282+
283+
func TestClientShutdown(t *testing.T) {
284+
defer goleak.VerifyNone(t)
285+
mockConn := ouroboros_mock.NewConnection(
286+
ouroboros_mock.ProtocolRoleClient,
287+
[]ouroboros_mock.ConversationEntry{
288+
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
289+
ouroboros_mock.ConversationEntryHandshakeNtCResponse,
290+
},
291+
)
292+
asyncErrChan := make(chan error, 1)
293+
go func() {
294+
err := <-mockConn.(*ouroboros_mock.Connection).ErrorChan()
295+
if err != nil {
296+
asyncErrChan <- fmt.Errorf("received unexpected error: %w", err)
297+
}
298+
close(asyncErrChan)
299+
}()
300+
oConn, err := ouroboros.New(
301+
ouroboros.WithConnection(mockConn),
302+
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic),
303+
ouroboros.WithChainSyncConfig(chainsync.NewConfig()),
304+
)
305+
if err != nil {
306+
t.Fatalf("unexpected error when creating Ouroboros object: %s", err)
307+
}
308+
if oConn.ChainSync() == nil {
309+
t.Fatalf("ChainSync client is nil")
310+
}
311+
// Start the client
312+
oConn.ChainSync().Client.Start()
313+
// Stop the client
314+
if err := oConn.ChainSync().Client.Stop(); err != nil {
315+
t.Fatalf("unexpected error when stopping client: %s", err)
316+
}
317+
// Wait for mock connection shutdown
318+
select {
319+
case err, ok := <-asyncErrChan:
320+
if ok {
321+
t.Fatal(err.Error())
322+
}
323+
case <-time.After(2 * time.Second):
324+
t.Fatalf("did not complete within timeout")
325+
}
326+
// Close Ouroboros connection
327+
if err := oConn.Close(); err != nil {
328+
t.Fatalf("unexpected error when closing Ouroboros object: %s", err)
329+
}
330+
// Wait for connection shutdown
331+
select {
332+
case <-oConn.ErrorChan():
333+
case <-time.After(10 * time.Second):
334+
t.Errorf("did not shutdown within timeout")
335+
}
336+
}

protocol/chainsync/error.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,6 @@ var ErrIntersectNotFound = errors.New("chain intersection not found")
2121
// StopChainSync is used as a special return value from a RollForward or RollBackward handler function
2222
// to signify that the sync process should be stopped
2323
var ErrStopSyncProcess = errors.New("stop sync process")
24+
25+
// ErrSyncCancelled is returned when a sync operation is cancelled
26+
var ErrSyncCancelled = errors.New("sync cancelled")

protocol/keepalive/client.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ type Client struct {
3030
timer *time.Timer
3131
timerMutex sync.Mutex
3232
onceStart sync.Once
33+
onceStop sync.Once
34+
stopErr error
3335
}
3436

3537
// NewClient creates and returns a new keep-alive protocol client with the given options and configuration.
@@ -93,6 +95,23 @@ func (c *Client) Start() {
9395
})
9496
}
9597

98+
// Stop stops the KeepAlive client protocol
99+
func (c *Client) Stop() error {
100+
c.onceStop.Do(func() {
101+
c.Protocol.Logger().
102+
Debug("stopping client protocol",
103+
"component", "network",
104+
"protocol", ProtocolName,
105+
"connection_id", c.callbackContext.ConnectionId.String(),
106+
)
107+
msg := NewMsgDone()
108+
c.stopErr = c.SendMessage(msg)
109+
// Ensure protocol shuts down completely
110+
c.Protocol.Stop()
111+
})
112+
return c.stopErr
113+
}
114+
96115
// sendKeepAlive sends a keep-alive message and schedules the next one.
97116
func (c *Client) sendKeepAlive() {
98117
msg := NewMsgKeepAlive(c.config.Cookie)

0 commit comments

Comments
 (0)