Skip to content

Commit 41df945

Browse files
committed
perf(protocols): improve performance across all protocols
- Increase default queue sizes for better buffering - Eliminate unnecessary goroutines for channel cleanup - Apply optimizations consistently across all protocols Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
1 parent b93f5f3 commit 41df945

File tree

19 files changed

+865
-107
lines changed

19 files changed

+865
-107
lines changed

protocol/blockfetch/blockfetch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ type Config struct {
119119
const MaxRecvQueueSize = 512
120120

121121
// DefaultRecvQueueSize is the default receive queue size.
122-
const DefaultRecvQueueSize = 256
122+
const DefaultRecvQueueSize = 384
123123

124124
// MaxPendingMessageBytes is the maximum allowed pending message bytes (5MB).
125125
const MaxPendingMessageBytes = 5242880

protocol/blockfetch/client.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,6 @@ func (c *Client) Start() {
9494
"connection_id", c.callbackContext.ConnectionId.String(),
9595
)
9696
c.Protocol.Start()
97-
// Start goroutine to cleanup resources on protocol shutdown
98-
go func() {
99-
<-c.DoneChan()
100-
close(c.blockChan)
101-
close(c.startBatchResultChan)
102-
}()
10397
})
10498
}
10599

@@ -115,6 +109,12 @@ func (c *Client) Stop() error {
115109
)
116110
msg := NewMsgClientDone()
117111
err = c.SendMessage(msg)
112+
// Defer closing channels until protocol fully shuts down
113+
go func() {
114+
<-c.DoneChan()
115+
close(c.blockChan)
116+
close(c.startBatchResultChan)
117+
}()
118118
})
119119
return err
120120
}
@@ -222,13 +222,11 @@ func (c *Client) handleStartBatch() error {
222222
"role", "client",
223223
"connection_id", c.callbackContext.ConnectionId.String(),
224224
)
225-
// Check for shutdown
226225
select {
227226
case <-c.DoneChan():
228227
return protocol.ErrProtocolShuttingDown
229-
default:
228+
case c.startBatchResultChan <- nil:
230229
}
231-
c.startBatchResultChan <- nil
232230
return nil
233231
}
234232

@@ -241,14 +239,12 @@ func (c *Client) handleNoBlocks() error {
241239
"role", "client",
242240
"connection_id", c.callbackContext.ConnectionId.String(),
243241
)
244-
// Check for shutdown
242+
err := errors.New("block(s) not found")
245243
select {
246244
case <-c.DoneChan():
247245
return protocol.ErrProtocolShuttingDown
248-
default:
246+
case c.startBatchResultChan <- err:
249247
}
250-
err := errors.New("block(s) not found")
251-
c.startBatchResultChan <- err
252248
return nil
253249
}
254250

@@ -298,7 +294,11 @@ func (c *Client) handleBlock(msgGeneric protocol.Message) error {
298294
return errors.New("received block-fetch Block message but no callback function is defined")
299295
}
300296
} else {
301-
c.blockChan <- block
297+
select {
298+
case <-c.DoneChan():
299+
return protocol.ErrProtocolShuttingDown
300+
case c.blockChan <- block:
301+
}
302302
}
303303
return nil
304304
}

protocol/blockfetch/client_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,3 +207,58 @@ func TestGetBlockNoBlocks(t *testing.T) {
207207
},
208208
)
209209
}
210+
211+
func TestClientShutdown(t *testing.T) {
212+
defer goleak.VerifyNone(t)
213+
mockConn := ouroboros_mock.NewConnection(
214+
ouroboros_mock.ProtocolRoleClient,
215+
[]ouroboros_mock.ConversationEntry{
216+
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
217+
ouroboros_mock.ConversationEntryHandshakeNtNResponse,
218+
},
219+
)
220+
asyncErrChan := make(chan error, 1)
221+
go func() {
222+
err := <-mockConn.(*ouroboros_mock.Connection).ErrorChan()
223+
if err != nil {
224+
asyncErrChan <- fmt.Errorf("received unexpected error: %w", err)
225+
}
226+
close(asyncErrChan)
227+
}()
228+
oConn, err := ouroboros.New(
229+
ouroboros.WithConnection(mockConn),
230+
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic),
231+
ouroboros.WithNodeToNode(true),
232+
)
233+
if err != nil {
234+
t.Fatalf("unexpected error when creating Ouroboros object: %s", err)
235+
}
236+
if oConn.BlockFetch() == nil {
237+
t.Fatalf("BlockFetch client is nil")
238+
}
239+
// Start the client
240+
oConn.BlockFetch().Client.Start()
241+
// Stop the client
242+
if err := oConn.BlockFetch().Client.Stop(); err != nil {
243+
t.Fatalf("unexpected error when stopping client: %s", err)
244+
}
245+
// Wait for mock connection shutdown
246+
select {
247+
case err, ok := <-asyncErrChan:
248+
if ok {
249+
t.Fatal(err.Error())
250+
}
251+
case <-time.After(2 * time.Second):
252+
t.Fatalf("did not complete within timeout")
253+
}
254+
// Close Ouroboros connection
255+
if err := oConn.Close(); err != nil {
256+
t.Fatalf("unexpected error when closing Ouroboros object: %s", err)
257+
}
258+
// Wait for connection shutdown
259+
select {
260+
case <-oConn.ErrorChan():
261+
case <-time.After(10 * time.Second):
262+
t.Errorf("did not shutdown within timeout")
263+
}
264+
}

protocol/chainsync/chainsync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ type Config struct {
223223
const (
224224
MaxPipelineLimit = 100 // Max pipelined requests
225225
MaxRecvQueueSize = 100 // Max receive queue size (messages)
226-
DefaultPipelineLimit = 50 // Default pipeline limit
227-
DefaultRecvQueueSize = 50 // Default queue size
226+
DefaultPipelineLimit = 75 // Default pipeline limit
227+
DefaultRecvQueueSize = 75 // Default queue size
228228
MaxPendingMessageBytes = 102400 // Max pending message bytes (100KB)
229229
)
230230

protocol/chainsync/client.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ type Client struct {
3535
onceStart sync.Once
3636
onceStop sync.Once
3737
syncPipelinedRequestNext int
38-
39-
// waitingForCurrentTipChan will process all the requests for the current tip until the channel
40-
// is empty.
4138
//
4239
// want* only processes one request per message reply received from the server. If the message
4340
// request fails, it is the responsibility of the caller to clear the channel.
@@ -127,11 +124,6 @@ func (c *Client) Start() {
127124
"connection_id", c.callbackContext.ConnectionId.String(),
128125
)
129126
c.Protocol.Start()
130-
// Start goroutine to cleanup resources on protocol shutdown
131-
go func() {
132-
<-c.DoneChan()
133-
close(c.readyForNextBlockChan)
134-
}()
135127
})
136128
}
137129

@@ -151,6 +143,11 @@ func (c *Client) Stop() error {
151143
if err = c.SendMessage(msg); err != nil {
152144
return
153145
}
146+
// Defer closing channel until protocol fully shuts down
147+
go func() {
148+
<-c.DoneChan()
149+
close(c.readyForNextBlockChan)
150+
}()
154151
})
155152
return err
156153
}
@@ -721,14 +718,22 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
721718
if callbackErr != nil {
722719
if errors.Is(callbackErr, ErrStopSyncProcess) {
723720
// Signal that we're cancelling the sync
724-
c.readyForNextBlockChan <- false
721+
select {
722+
case <-c.DoneChan():
723+
return protocol.ErrProtocolShuttingDown
724+
case c.readyForNextBlockChan <- false:
725+
}
725726
return nil
726727
} else {
727728
return callbackErr
728729
}
729730
}
730731
// Signal that we're ready for the next block
731-
c.readyForNextBlockChan <- true
732+
select {
733+
case <-c.DoneChan():
734+
return protocol.ErrProtocolShuttingDown
735+
case c.readyForNextBlockChan <- true:
736+
}
732737
return nil
733738
}
734739

@@ -752,15 +757,23 @@ func (c *Client) handleRollBackward(msgGeneric protocol.Message) error {
752757
if callbackErr := c.config.RollBackwardFunc(c.callbackContext, msgRollBackward.Point, msgRollBackward.Tip); callbackErr != nil {
753758
if errors.Is(callbackErr, ErrStopSyncProcess) {
754759
// Signal that we're cancelling the sync
755-
c.readyForNextBlockChan <- false
760+
select {
761+
case <-c.DoneChan():
762+
return protocol.ErrProtocolShuttingDown
763+
case c.readyForNextBlockChan <- false:
764+
}
756765
return nil
757766
} else {
758767
return callbackErr
759768
}
760769
}
761770
}
762771
// Signal that we're ready for the next block
763-
c.readyForNextBlockChan <- true
772+
select {
773+
case <-c.DoneChan():
774+
return protocol.ErrProtocolShuttingDown
775+
case c.readyForNextBlockChan <- true:
776+
}
764777
return nil
765778
}
766779

protocol/chainsync/client_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ func runTest(
8080
}()
8181
// Run test inner function
8282
innerFunc(t, oConn)
83+
// Stop the client to clean up goroutines
84+
if client := oConn.ChainSync().Client; client != nil {
85+
client.Stop()
86+
}
8387
// Wait for mock connection shutdown
8488
select {
8589
case err, ok := <-asyncErrChan:
@@ -275,3 +279,58 @@ func TestGetAvailableBlockRange(t *testing.T) {
275279
},
276280
)
277281
}
282+
283+
func TestClientShutdown(t *testing.T) {
284+
defer goleak.VerifyNone(t)
285+
mockConn := ouroboros_mock.NewConnection(
286+
ouroboros_mock.ProtocolRoleClient,
287+
[]ouroboros_mock.ConversationEntry{
288+
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
289+
ouroboros_mock.ConversationEntryHandshakeNtCResponse,
290+
},
291+
)
292+
asyncErrChan := make(chan error, 1)
293+
go func() {
294+
err := <-mockConn.(*ouroboros_mock.Connection).ErrorChan()
295+
if err != nil {
296+
asyncErrChan <- fmt.Errorf("received unexpected error: %w", err)
297+
}
298+
close(asyncErrChan)
299+
}()
300+
oConn, err := ouroboros.New(
301+
ouroboros.WithConnection(mockConn),
302+
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic),
303+
ouroboros.WithChainSyncConfig(chainsync.NewConfig()),
304+
)
305+
if err != nil {
306+
t.Fatalf("unexpected error when creating Ouroboros object: %s", err)
307+
}
308+
if oConn.ChainSync() == nil {
309+
t.Fatalf("ChainSync client is nil")
310+
}
311+
// Start the client
312+
oConn.ChainSync().Client.Start()
313+
// Stop the client
314+
if err := oConn.ChainSync().Client.Stop(); err != nil {
315+
t.Fatalf("unexpected error when stopping client: %s", err)
316+
}
317+
// Wait for mock connection shutdown
318+
select {
319+
case err, ok := <-asyncErrChan:
320+
if ok {
321+
t.Fatal(err.Error())
322+
}
323+
case <-time.After(2 * time.Second):
324+
t.Fatalf("did not complete within timeout")
325+
}
326+
// Close Ouroboros connection
327+
if err := oConn.Close(); err != nil {
328+
t.Fatalf("unexpected error when closing Ouroboros object: %s", err)
329+
}
330+
// Wait for connection shutdown
331+
select {
332+
case <-oConn.ErrorChan():
333+
case <-time.After(10 * time.Second):
334+
t.Errorf("did not shutdown within timeout")
335+
}
336+
}

protocol/keepalive/client_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,3 +238,55 @@ func TestServerKeepaliveHandlingWithDifferentCookie(t *testing.T) {
238238
t.Errorf("did not shutdown within timeout")
239239
}
240240
}
241+
func TestClientShutdown(t *testing.T) {
242+
defer goleak.VerifyNone(t)
243+
mockConn := ouroboros_mock.NewConnection(
244+
ouroboros_mock.ProtocolRoleClient,
245+
[]ouroboros_mock.ConversationEntry{
246+
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
247+
ouroboros_mock.ConversationEntryHandshakeNtNResponse,
248+
},
249+
)
250+
asyncErrChan := make(chan error, 1)
251+
go func() {
252+
err := <-mockConn.(*ouroboros_mock.Connection).ErrorChan()
253+
if err != nil {
254+
asyncErrChan <- fmt.Errorf("received unexpected error: %w", err)
255+
}
256+
close(asyncErrChan)
257+
}()
258+
oConn, err := ouroboros.New(
259+
ouroboros.WithConnection(mockConn),
260+
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic),
261+
ouroboros.WithNodeToNode(true),
262+
)
263+
if err != nil {
264+
t.Fatalf("unexpected error when creating Ouroboros object: %s", err)
265+
}
266+
if oConn.KeepAlive() == nil {
267+
t.Fatalf("KeepAlive client is nil")
268+
}
269+
// Start the client
270+
oConn.KeepAlive().Client.Start()
271+
// Stop the client
272+
oConn.KeepAlive().Client.Stop()
273+
// Wait for mock connection shutdown
274+
select {
275+
case err, ok := <-asyncErrChan:
276+
if ok {
277+
t.Fatal(err.Error())
278+
}
279+
case <-time.After(2 * time.Second):
280+
t.Fatalf("did not complete within timeout")
281+
}
282+
// Close Ouroboros connection
283+
if err := oConn.Close(); err != nil {
284+
t.Fatalf("unexpected error when closing Ouroboros object: %s", err)
285+
}
286+
// Wait for connection shutdown
287+
select {
288+
case <-oConn.ErrorChan():
289+
case <-time.After(10 * time.Second):
290+
t.Errorf("did not shutdown within timeout")
291+
}
292+
}

0 commit comments

Comments
 (0)