Skip to content

Commit fe904df

Browse files
committed
perf(protocols): improve performance across all protocols
- Increase default queue sizes for better buffering - Eliminate unnecessary goroutines for channel cleanup - Apply optimizations consistently across all protocols Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
1 parent b93f5f3 commit fe904df

File tree

12 files changed

+293
-55
lines changed

12 files changed

+293
-55
lines changed

protocol/blockfetch/blockfetch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ type Config struct {
119119
const MaxRecvQueueSize = 512
120120

121121
// DefaultRecvQueueSize is the default receive queue size.
122-
const DefaultRecvQueueSize = 256
122+
const DefaultRecvQueueSize = 384
123123

124124
// MaxPendingMessageBytes is the maximum allowed pending message bytes (5MB).
125125
const MaxPendingMessageBytes = 5242880

protocol/blockfetch/client.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,6 @@ func (c *Client) Start() {
9494
"connection_id", c.callbackContext.ConnectionId.String(),
9595
)
9696
c.Protocol.Start()
97-
// Start goroutine to cleanup resources on protocol shutdown
98-
go func() {
99-
<-c.DoneChan()
100-
close(c.blockChan)
101-
close(c.startBatchResultChan)
102-
}()
10397
})
10498
}
10599

@@ -115,6 +109,12 @@ func (c *Client) Stop() error {
115109
)
116110
msg := NewMsgClientDone()
117111
err = c.SendMessage(msg)
112+
// Defer closing channels until protocol fully shuts down
113+
go func() {
114+
<-c.DoneChan()
115+
close(c.blockChan)
116+
close(c.startBatchResultChan)
117+
}()
118118
})
119119
return err
120120
}

protocol/blockfetch/client_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,3 +207,58 @@ func TestGetBlockNoBlocks(t *testing.T) {
207207
},
208208
)
209209
}
210+
211+
func TestClientShutdown(t *testing.T) {
212+
defer goleak.VerifyNone(t)
213+
mockConn := ouroboros_mock.NewConnection(
214+
ouroboros_mock.ProtocolRoleClient,
215+
[]ouroboros_mock.ConversationEntry{
216+
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
217+
ouroboros_mock.ConversationEntryHandshakeNtNResponse,
218+
},
219+
)
220+
asyncErrChan := make(chan error, 1)
221+
go func() {
222+
err := <-mockConn.(*ouroboros_mock.Connection).ErrorChan()
223+
if err != nil {
224+
asyncErrChan <- fmt.Errorf("received unexpected error: %w", err)
225+
}
226+
close(asyncErrChan)
227+
}()
228+
oConn, err := ouroboros.New(
229+
ouroboros.WithConnection(mockConn),
230+
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic),
231+
ouroboros.WithNodeToNode(true),
232+
)
233+
if err != nil {
234+
t.Fatalf("unexpected error when creating Ouroboros object: %s", err)
235+
}
236+
if oConn.BlockFetch() == nil {
237+
t.Fatalf("BlockFetch client is nil")
238+
}
239+
// Start the client
240+
oConn.BlockFetch().Client.Start()
241+
// Stop the client
242+
if err := oConn.BlockFetch().Client.Stop(); err != nil {
243+
t.Fatalf("unexpected error when stopping client: %s", err)
244+
}
245+
// Wait for mock connection shutdown
246+
select {
247+
case err, ok := <-asyncErrChan:
248+
if ok {
249+
t.Fatal(err.Error())
250+
}
251+
case <-time.After(2 * time.Second):
252+
t.Fatalf("did not complete within timeout")
253+
}
254+
// Close Ouroboros connection
255+
if err := oConn.Close(); err != nil {
256+
t.Fatalf("unexpected error when closing Ouroboros object: %s", err)
257+
}
258+
// Wait for connection shutdown
259+
select {
260+
case <-oConn.ErrorChan():
261+
case <-time.After(10 * time.Second):
262+
t.Errorf("did not shutdown within timeout")
263+
}
264+
}

protocol/chainsync/chainsync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ type Config struct {
223223
const (
224224
MaxPipelineLimit = 100 // Max pipelined requests
225225
MaxRecvQueueSize = 100 // Max receive queue size (messages)
226-
DefaultPipelineLimit = 50 // Default pipeline limit
227-
DefaultRecvQueueSize = 50 // Default queue size
226+
DefaultPipelineLimit = 75 // Default pipeline limit
227+
DefaultRecvQueueSize = 75 // Default queue size
228228
MaxPendingMessageBytes = 102400 // Max pending message bytes (100KB)
229229
)
230230

protocol/chainsync/client.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ type Client struct {
3535
onceStart sync.Once
3636
onceStop sync.Once
3737
syncPipelinedRequestNext int
38-
39-
// waitingForCurrentTipChan will process all the requests for the current tip until the channel
40-
// is empty.
4138
//
4239
// want* only processes one request per message reply received from the server. If the message
4340
// request fails, it is the responsibility of the caller to clear the channel.
@@ -127,11 +124,6 @@ func (c *Client) Start() {
127124
"connection_id", c.callbackContext.ConnectionId.String(),
128125
)
129126
c.Protocol.Start()
130-
// Start goroutine to cleanup resources on protocol shutdown
131-
go func() {
132-
<-c.DoneChan()
133-
close(c.readyForNextBlockChan)
134-
}()
135127
})
136128
}
137129

@@ -151,6 +143,11 @@ func (c *Client) Stop() error {
151143
if err = c.SendMessage(msg); err != nil {
152144
return
153145
}
146+
// Defer closing channel until protocol fully shuts down
147+
go func() {
148+
<-c.DoneChan()
149+
close(c.readyForNextBlockChan)
150+
}()
154151
})
155152
return err
156153
}

protocol/chainsync/client_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ func runTest(
8080
}()
8181
// Run test inner function
8282
innerFunc(t, oConn)
83+
// Stop the client to clean up goroutines
84+
if client := oConn.ChainSync().Client; client != nil {
85+
client.Stop()
86+
}
8387
// Wait for mock connection shutdown
8488
select {
8589
case err, ok := <-asyncErrChan:
@@ -275,3 +279,58 @@ func TestGetAvailableBlockRange(t *testing.T) {
275279
},
276280
)
277281
}
282+
283+
func TestClientShutdown(t *testing.T) {
284+
defer goleak.VerifyNone(t)
285+
mockConn := ouroboros_mock.NewConnection(
286+
ouroboros_mock.ProtocolRoleClient,
287+
[]ouroboros_mock.ConversationEntry{
288+
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
289+
ouroboros_mock.ConversationEntryHandshakeNtCResponse,
290+
},
291+
)
292+
asyncErrChan := make(chan error, 1)
293+
go func() {
294+
err := <-mockConn.(*ouroboros_mock.Connection).ErrorChan()
295+
if err != nil {
296+
asyncErrChan <- fmt.Errorf("received unexpected error: %w", err)
297+
}
298+
close(asyncErrChan)
299+
}()
300+
oConn, err := ouroboros.New(
301+
ouroboros.WithConnection(mockConn),
302+
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic),
303+
ouroboros.WithChainSyncConfig(chainsync.NewConfig()),
304+
)
305+
if err != nil {
306+
t.Fatalf("unexpected error when creating Ouroboros object: %s", err)
307+
}
308+
if oConn.ChainSync() == nil {
309+
t.Fatalf("ChainSync client is nil")
310+
}
311+
// Start the client
312+
oConn.ChainSync().Client.Start()
313+
// Stop the client
314+
if err := oConn.ChainSync().Client.Stop(); err != nil {
315+
t.Fatalf("unexpected error when stopping client: %s", err)
316+
}
317+
// Wait for mock connection shutdown
318+
select {
319+
case err, ok := <-asyncErrChan:
320+
if ok {
321+
t.Fatal(err.Error())
322+
}
323+
case <-time.After(2 * time.Second):
324+
t.Fatalf("did not complete within timeout")
325+
}
326+
// Close Ouroboros connection
327+
if err := oConn.Close(); err != nil {
328+
t.Fatalf("unexpected error when closing Ouroboros object: %s", err)
329+
}
330+
// Wait for connection shutdown
331+
select {
332+
case <-oConn.ErrorChan():
333+
case <-time.After(10 * time.Second):
334+
t.Errorf("did not shutdown within timeout")
335+
}
336+
}

protocol/leiosnotify/client.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,6 @@ func (c *Client) Start() {
7676
"connection_id", c.callbackContext.ConnectionId.String(),
7777
)
7878
c.Protocol.Start()
79-
// Start goroutine to cleanup resources on protocol shutdown
80-
go func() {
81-
<-c.DoneChan()
82-
close(c.requestNextChan)
83-
}()
8479
})
8580
}
8681

@@ -95,6 +90,11 @@ func (c *Client) Stop() error {
9590
)
9691
msg := NewMsgDone()
9792
err = c.SendMessage(msg)
93+
// Defer closing channel until protocol fully shuts down
94+
go func() {
95+
<-c.DoneChan()
96+
close(c.requestNextChan)
97+
}()
9898
})
9999
return err
100100
}

protocol/localtxmonitor/client.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,6 @@ func (c *Client) Start() {
9191
"connection_id", c.callbackContext.ConnectionId.String(),
9292
)
9393
c.Protocol.Start()
94-
// Start goroutine to cleanup resources on protocol shutdown
95-
go func() {
96-
<-c.DoneChan()
97-
close(c.acquireResultChan)
98-
close(c.hasTxResultChan)
99-
close(c.nextTxResultChan)
100-
close(c.getSizesResultChan)
101-
}()
10294
})
10395
}
10496

@@ -118,6 +110,14 @@ func (c *Client) Stop() error {
118110
if err = c.SendMessage(msg); err != nil {
119111
return
120112
}
113+
// Defer closing channels until protocol fully shuts down
114+
go func() {
115+
<-c.DoneChan()
116+
close(c.acquireResultChan)
117+
close(c.hasTxResultChan)
118+
close(c.nextTxResultChan)
119+
close(c.getSizesResultChan)
120+
}()
121121
})
122122
return err
123123
}

protocol/localtxmonitor/client_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,3 +296,57 @@ func TestNextTx(t *testing.T) {
296296
},
297297
)
298298
}
299+
300+
func TestClientShutdown(t *testing.T) {
301+
defer goleak.VerifyNone(t)
302+
mockConn := ouroboros_mock.NewConnection(
303+
ouroboros_mock.ProtocolRoleClient,
304+
[]ouroboros_mock.ConversationEntry{
305+
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
306+
ouroboros_mock.ConversationEntryHandshakeNtCResponse,
307+
},
308+
)
309+
asyncErrChan := make(chan error, 1)
310+
go func() {
311+
err := <-mockConn.(*ouroboros_mock.Connection).ErrorChan()
312+
if err != nil {
313+
asyncErrChan <- fmt.Errorf("received unexpected error: %w", err)
314+
}
315+
close(asyncErrChan)
316+
}()
317+
oConn, err := ouroboros.New(
318+
ouroboros.WithConnection(mockConn),
319+
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic),
320+
)
321+
if err != nil {
322+
t.Fatalf("unexpected error when creating Ouroboros object: %s", err)
323+
}
324+
if oConn.LocalTxMonitor() == nil {
325+
t.Fatalf("LocalTxMonitor client is nil")
326+
}
327+
// Start the client
328+
oConn.LocalTxMonitor().Client.Start()
329+
// Stop the client
330+
if err := oConn.LocalTxMonitor().Client.Stop(); err != nil {
331+
t.Fatalf("unexpected error when stopping client: %s", err)
332+
}
333+
// Wait for mock connection shutdown
334+
select {
335+
case err, ok := <-asyncErrChan:
336+
if ok {
337+
t.Fatal(err.Error())
338+
}
339+
case <-time.After(2 * time.Second):
340+
t.Fatalf("did not complete within timeout")
341+
}
342+
// Close Ouroboros connection
343+
if err := oConn.Close(); err != nil {
344+
t.Fatalf("unexpected error when closing Ouroboros object: %s", err)
345+
}
346+
// Wait for connection shutdown
347+
select {
348+
case <-oConn.ErrorChan():
349+
case <-time.After(10 * time.Second):
350+
t.Errorf("did not shutdown within timeout")
351+
}
352+
}

protocol/localtxsubmission/client.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,6 @@ func (c *Client) Start() {
8080
"connection_id", c.callbackContext.ConnectionId.String(),
8181
)
8282
c.Protocol.Start()
83-
// Start goroutine to cleanup resources on protocol shutdown
84-
go func() {
85-
<-c.DoneChan()
86-
close(c.submitResultChan)
87-
}()
8883
})
8984
}
9085

@@ -104,6 +99,11 @@ func (c *Client) Stop() error {
10499
if err = c.SendMessage(msg); err != nil {
105100
return
106101
}
102+
// Defer closing channel until protocol fully shuts down
103+
go func() {
104+
<-c.DoneChan()
105+
close(c.submitResultChan)
106+
}()
107107
})
108108
return err
109109
}

0 commit comments

Comments
 (0)