Skip to content

Commit ddbf172

Browse files
committed
perf(protocols): improve performance across all protocols
- Increase default queue sizes for better buffering - Eliminate unnecessary goroutines for channel cleanup - Apply optimizations consistently across all protocols Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
1 parent b93f5f3 commit ddbf172

File tree

23 files changed

+1020
-122
lines changed

23 files changed

+1020
-122
lines changed

protocol/blockfetch/blockfetch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ type Config struct {
119119
const MaxRecvQueueSize = 512
120120

121121
// DefaultRecvQueueSize is the default receive queue size.
122-
const DefaultRecvQueueSize = 256
122+
const DefaultRecvQueueSize = 384
123123

124124
// MaxPendingMessageBytes is the maximum allowed pending message bytes (5MB).
125125
const MaxPendingMessageBytes = 5242880

protocol/blockfetch/client.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type Client struct {
3636
blockUseCallback bool // Whether to use callback for blocks
3737
onceStart sync.Once // Ensures Start is only called once
3838
onceStop sync.Once // Ensures Stop is only called once
39+
started bool // Whether the protocol has been started
3940
}
4041

4142
// NewClient creates a new Block Fetch protocol client with the given options and configuration.
@@ -93,13 +94,8 @@ func (c *Client) Start() {
9394
"protocol", ProtocolName,
9495
"connection_id", c.callbackContext.ConnectionId.String(),
9596
)
97+
c.started = true
9698
c.Protocol.Start()
97-
// Start goroutine to cleanup resources on protocol shutdown
98-
go func() {
99-
<-c.DoneChan()
100-
close(c.blockChan)
101-
close(c.startBatchResultChan)
102-
}()
10399
})
104100
}
105101

@@ -115,6 +111,18 @@ func (c *Client) Stop() error {
115111
)
116112
msg := NewMsgClientDone()
117113
err = c.SendMessage(msg)
114+
// Defer closing channels until protocol fully shuts down (only if started)
115+
if c.started {
116+
go func() {
117+
<-c.DoneChan()
118+
close(c.blockChan)
119+
close(c.startBatchResultChan)
120+
}()
121+
} else {
122+
// If protocol was never started, close channels immediately
123+
close(c.blockChan)
124+
close(c.startBatchResultChan)
125+
}
118126
})
119127
return err
120128
}
@@ -222,13 +230,11 @@ func (c *Client) handleStartBatch() error {
222230
"role", "client",
223231
"connection_id", c.callbackContext.ConnectionId.String(),
224232
)
225-
// Check for shutdown
226233
select {
227234
case <-c.DoneChan():
228235
return protocol.ErrProtocolShuttingDown
229-
default:
236+
case c.startBatchResultChan <- nil:
230237
}
231-
c.startBatchResultChan <- nil
232238
return nil
233239
}
234240

@@ -241,14 +247,12 @@ func (c *Client) handleNoBlocks() error {
241247
"role", "client",
242248
"connection_id", c.callbackContext.ConnectionId.String(),
243249
)
244-
// Check for shutdown
250+
err := errors.New("block(s) not found")
245251
select {
246252
case <-c.DoneChan():
247253
return protocol.ErrProtocolShuttingDown
248-
default:
254+
case c.startBatchResultChan <- err:
249255
}
250-
err := errors.New("block(s) not found")
251-
c.startBatchResultChan <- err
252256
return nil
253257
}
254258

@@ -298,7 +302,11 @@ func (c *Client) handleBlock(msgGeneric protocol.Message) error {
298302
return errors.New("received block-fetch Block message but no callback function is defined")
299303
}
300304
} else {
301-
c.blockChan <- block
305+
select {
306+
case <-c.DoneChan():
307+
return protocol.ErrProtocolShuttingDown
308+
case c.blockChan <- block:
309+
}
302310
}
303311
return nil
304312
}

protocol/blockfetch/client_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,3 +207,58 @@ func TestGetBlockNoBlocks(t *testing.T) {
207207
},
208208
)
209209
}
210+
211+
func TestClientShutdown(t *testing.T) {
212+
defer goleak.VerifyNone(t)
213+
mockConn := ouroboros_mock.NewConnection(
214+
ouroboros_mock.ProtocolRoleClient,
215+
[]ouroboros_mock.ConversationEntry{
216+
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
217+
ouroboros_mock.ConversationEntryHandshakeNtNResponse,
218+
},
219+
)
220+
asyncErrChan := make(chan error, 1)
221+
go func() {
222+
err := <-mockConn.(*ouroboros_mock.Connection).ErrorChan()
223+
if err != nil {
224+
asyncErrChan <- fmt.Errorf("received unexpected error: %w", err)
225+
}
226+
close(asyncErrChan)
227+
}()
228+
oConn, err := ouroboros.New(
229+
ouroboros.WithConnection(mockConn),
230+
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic),
231+
ouroboros.WithNodeToNode(true),
232+
)
233+
if err != nil {
234+
t.Fatalf("unexpected error when creating Ouroboros object: %s", err)
235+
}
236+
if oConn.BlockFetch() == nil {
237+
t.Fatalf("BlockFetch client is nil")
238+
}
239+
// Start the client
240+
oConn.BlockFetch().Client.Start()
241+
// Stop the client
242+
if err := oConn.BlockFetch().Client.Stop(); err != nil {
243+
t.Fatalf("unexpected error when stopping client: %s", err)
244+
}
245+
// Wait for mock connection shutdown
246+
select {
247+
case err, ok := <-asyncErrChan:
248+
if ok {
249+
t.Fatal(err.Error())
250+
}
251+
case <-time.After(2 * time.Second):
252+
t.Fatalf("did not complete within timeout")
253+
}
254+
// Close Ouroboros connection
255+
if err := oConn.Close(); err != nil {
256+
t.Fatalf("unexpected error when closing Ouroboros object: %s", err)
257+
}
258+
// Wait for connection shutdown
259+
select {
260+
case <-oConn.ErrorChan():
261+
case <-time.After(10 * time.Second):
262+
t.Errorf("did not shutdown within timeout")
263+
}
264+
}

protocol/chainsync/chainsync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ type Config struct {
223223
const (
224224
MaxPipelineLimit = 100 // Max pipelined requests
225225
MaxRecvQueueSize = 100 // Max receive queue size (messages)
226-
DefaultPipelineLimit = 50 // Default pipeline limit
227-
DefaultRecvQueueSize = 50 // Default queue size
226+
DefaultPipelineLimit = 75 // Default pipeline limit
227+
DefaultRecvQueueSize = 75 // Default queue size
228228
MaxPendingMessageBytes = 102400 // Max pending message bytes (100KB)
229229
)
230230

protocol/chainsync/client.go

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,8 @@ type Client struct {
3434
readyForNextBlockChan chan bool
3535
onceStart sync.Once
3636
onceStop sync.Once
37+
started bool
3738
syncPipelinedRequestNext int
38-
39-
// waitingForCurrentTipChan will process all the requests for the current tip until the channel
40-
// is empty.
4139
//
4240
// want* only processes one request per message reply received from the server. If the message
4341
// request fails, it is the responsibility of the caller to clear the channel.
@@ -126,12 +124,8 @@ func (c *Client) Start() {
126124
"protocol", ProtocolName,
127125
"connection_id", c.callbackContext.ConnectionId.String(),
128126
)
127+
c.started = true
129128
c.Protocol.Start()
130-
// Start goroutine to cleanup resources on protocol shutdown
131-
go func() {
132-
<-c.DoneChan()
133-
close(c.readyForNextBlockChan)
134-
}()
135129
})
136130
}
137131

@@ -151,6 +145,16 @@ func (c *Client) Stop() error {
151145
if err = c.SendMessage(msg); err != nil {
152146
return
153147
}
148+
// Defer closing channel until protocol fully shuts down (only if started)
149+
if c.started {
150+
go func() {
151+
<-c.DoneChan()
152+
close(c.readyForNextBlockChan)
153+
}()
154+
} else {
155+
// If protocol was never started, close channel immediately
156+
close(c.readyForNextBlockChan)
157+
}
154158
})
155159
return err
156160
}
@@ -721,14 +725,22 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
721725
if callbackErr != nil {
722726
if errors.Is(callbackErr, ErrStopSyncProcess) {
723727
// Signal that we're cancelling the sync
724-
c.readyForNextBlockChan <- false
728+
select {
729+
case <-c.DoneChan():
730+
return protocol.ErrProtocolShuttingDown
731+
case c.readyForNextBlockChan <- false:
732+
}
725733
return nil
726734
} else {
727735
return callbackErr
728736
}
729737
}
730738
// Signal that we're ready for the next block
731-
c.readyForNextBlockChan <- true
739+
select {
740+
case <-c.DoneChan():
741+
return protocol.ErrProtocolShuttingDown
742+
case c.readyForNextBlockChan <- true:
743+
}
732744
return nil
733745
}
734746

@@ -752,15 +764,23 @@ func (c *Client) handleRollBackward(msgGeneric protocol.Message) error {
752764
if callbackErr := c.config.RollBackwardFunc(c.callbackContext, msgRollBackward.Point, msgRollBackward.Tip); callbackErr != nil {
753765
if errors.Is(callbackErr, ErrStopSyncProcess) {
754766
// Signal that we're cancelling the sync
755-
c.readyForNextBlockChan <- false
767+
select {
768+
case <-c.DoneChan():
769+
return protocol.ErrProtocolShuttingDown
770+
case c.readyForNextBlockChan <- false:
771+
}
756772
return nil
757773
} else {
758774
return callbackErr
759775
}
760776
}
761777
}
762778
// Signal that we're ready for the next block
763-
c.readyForNextBlockChan <- true
779+
select {
780+
case <-c.DoneChan():
781+
return protocol.ErrProtocolShuttingDown
782+
case c.readyForNextBlockChan <- true:
783+
}
764784
return nil
765785
}
766786

protocol/chainsync/client_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ func runTest(
8080
}()
8181
// Run test inner function
8282
innerFunc(t, oConn)
83+
// Stop the client to clean up goroutines
84+
if client := oConn.ChainSync().Client; client != nil {
85+
client.Stop()
86+
}
8387
// Wait for mock connection shutdown
8488
select {
8589
case err, ok := <-asyncErrChan:
@@ -275,3 +279,58 @@ func TestGetAvailableBlockRange(t *testing.T) {
275279
},
276280
)
277281
}
282+
283+
func TestClientShutdown(t *testing.T) {
284+
defer goleak.VerifyNone(t)
285+
mockConn := ouroboros_mock.NewConnection(
286+
ouroboros_mock.ProtocolRoleClient,
287+
[]ouroboros_mock.ConversationEntry{
288+
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
289+
ouroboros_mock.ConversationEntryHandshakeNtCResponse,
290+
},
291+
)
292+
asyncErrChan := make(chan error, 1)
293+
go func() {
294+
err := <-mockConn.(*ouroboros_mock.Connection).ErrorChan()
295+
if err != nil {
296+
asyncErrChan <- fmt.Errorf("received unexpected error: %w", err)
297+
}
298+
close(asyncErrChan)
299+
}()
300+
oConn, err := ouroboros.New(
301+
ouroboros.WithConnection(mockConn),
302+
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic),
303+
ouroboros.WithChainSyncConfig(chainsync.NewConfig()),
304+
)
305+
if err != nil {
306+
t.Fatalf("unexpected error when creating Ouroboros object: %s", err)
307+
}
308+
if oConn.ChainSync() == nil {
309+
t.Fatalf("ChainSync client is nil")
310+
}
311+
// Start the client
312+
oConn.ChainSync().Client.Start()
313+
// Stop the client
314+
if err := oConn.ChainSync().Client.Stop(); err != nil {
315+
t.Fatalf("unexpected error when stopping client: %s", err)
316+
}
317+
// Wait for mock connection shutdown
318+
select {
319+
case err, ok := <-asyncErrChan:
320+
if ok {
321+
t.Fatal(err.Error())
322+
}
323+
case <-time.After(2 * time.Second):
324+
t.Fatalf("did not complete within timeout")
325+
}
326+
// Close Ouroboros connection
327+
if err := oConn.Close(); err != nil {
328+
t.Fatalf("unexpected error when closing Ouroboros object: %s", err)
329+
}
330+
// Wait for connection shutdown
331+
select {
332+
case <-oConn.ErrorChan():
333+
case <-time.After(10 * time.Second):
334+
t.Errorf("did not shutdown within timeout")
335+
}
336+
}

protocol/keepalive/client.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type Client struct {
3030
timer *time.Timer
3131
timerMutex sync.Mutex
3232
onceStart sync.Once
33+
onceStop sync.Once
3334
}
3435

3536
// NewClient creates and returns a new keep-alive protocol client with the given options and configuration.
@@ -93,6 +94,22 @@ func (c *Client) Start() {
9394
})
9495
}
9596

97+
// Stop stops the KeepAlive client protocol
98+
func (c *Client) Stop() error {
99+
var err error
100+
c.onceStop.Do(func() {
101+
c.Protocol.Logger().
102+
Debug("stopping client protocol",
103+
"component", "network",
104+
"protocol", ProtocolName,
105+
"connection_id", c.callbackContext.ConnectionId.String(),
106+
)
107+
msg := NewMsgDone()
108+
err = c.SendMessage(msg)
109+
})
110+
return err
111+
}
112+
96113
// sendKeepAlive sends a keep-alive message and schedules the next one.
97114
func (c *Client) sendKeepAlive() {
98115
msg := NewMsgKeepAlive(c.config.Cookie)

0 commit comments

Comments
 (0)