-
Notifications
You must be signed in to change notification settings - Fork 21
Not returning on connection close for chainsync, block-fetch and tx-submission protocol #1141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Closes #1112 |
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
Signed-off-by: Jenita <jkawan@blinklabs.io>
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthroughAdds centralized connection-level error handling and per-connection cancellable contexts; wires the context into ProtocolOptions and protocol callback contexts. Changes EOF propagation: clients treat EOF as an error only when server-side protocols are active; servers always propagate EOF. Routes muxer and protocol errors through the centralized handler, forwarding wrapped errors and closing the connection. Expands protocol state API (CurrentState, IsDone, StateMap.GetDoneState) and adds response channel fields for state transitions. Prevents resending Done in Stop. Adds comprehensive in-memory tests for connection, chainsync, blockfetch, and txsubmission. Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Possibly related PRs
Pre-merge checks and finishing touches✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
♻️ Duplicate comments (1)
protocol/protocol.go (1)
139-151: IsDone() behavior matches intended semanticsTreating AgencyNone and InitialState as done addresses idle/never‑started cases cleanly.
🧹 Nitpick comments (6)
protocol/blockfetch/client.go (1)
112-118: Consider locking Stop() like chainsync to avoid interleavingAlign with chainsync by guarding the Done send with busyMutex to avoid racing with GetBlock/GetBlockRange.
Apply:
func (c *Client) Stop() error { var err error c.onceStop.Do(func() { c.Protocol.Logger(). Debug("stopping client protocol", "component", "network", "protocol", ProtocolName, "connection_id", c.callbackContext.ConnectionId.String(), ) - if !c.IsDone() { + c.busyMutex.Lock() + defer c.busyMutex.Unlock() + if !c.IsDone() { msg := NewMsgClientDone() if err = c.SendMessage(msg); err != nil { return } } }) return err }protocol/blockfetch/blockfetch_test.go (2)
51-59: Avoid Read() returning (0, nil); block or EOF to prevent busy spinReturning 0, nil violates net.Conn expectations and can spin the muxer. Make Read block until close, then return EOF.
Apply:
-func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil } +func (c *testConn) Read(b []byte) (n int, err error) { + <-c.closeChan + return 0, io.EOF +}
159-165: Increase timeout to reduce flakiness100ms can be tight on CI. Consider 500ms–1s to avoid false negatives.
connection_test.go (2)
151-156: Typo: “stoppeds”.Minor nit.
- // Protocol is stoppeds + // Protocol is stopped
192-210: Prefer waiting for protocol init instead of fixed sleep.Replace fixed sleep with the same polling used above to reduce flakes under CI load.
protocol/txsubmission/txsubmission_test.go (1)
154-167: Good message-send assertion; consider shorter timeout.Looks fine; 2s is generous, but with a running muxer you can trim to 500ms.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
connection.go(2 hunks)connection_test.go(2 hunks)protocol/blockfetch/blockfetch_test.go(1 hunks)protocol/blockfetch/client.go(1 hunks)protocol/chainsync/chainsync_test.go(1 hunks)protocol/chainsync/client.go(1 hunks)protocol/protocol.go(8 hunks)protocol/txsubmission/client.go(1 hunks)protocol/txsubmission/txsubmission_test.go(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: go-test
connection_test.go
[error] 74-74: TestErrorHandlingWithActiveProtocols: unexpected error when creating Connection object: handshake: timeout waiting on transition from protocol state Propose
[error] 132-132: TestErrorHandlingWithActiveProtocols: unexpected error when creating Connection object: handshake: timeout waiting on transition from protocol state Propose
🔇 Additional comments (6)
protocol/chainsync/client.go (1)
150-154: Good guard: only send Done when not already donePrevents redundant Done and aligns Stop with protocol state. Locking via busyMutex is appropriate.
protocol/protocol.go (1)
491-540: State tracking and readiness signalling look solidLocking around currentState updates/reads and using stateEntry for agency/timeout is correct. Timeout message uses CurrentState() safely.
If any protocol has multiple AgencyNone states, ensure StateMap defines exactly one terminal state; otherwise GetDoneState() can be ambiguous.
Also applies to: 550-551, 556-569, 592-595, 630-631
protocol/chainsync/chainsync_test.go (1)
151-166: Good coverage for client Done send.Starting the muxer and asserting a write to the fake conn verifies the basic send path.
protocol/txsubmission/client.go (1)
86-92: No-op change.Formatting-only; safe to merge.
protocol/txsubmission/txsubmission_test.go (2)
176-181: Server start/stop smoke test LGTM.No issues.
16-29: Import nettest sync; prepare for safer conn behavior.You’ll add sync for mutex/once; rest unchanged.
import ( "io" "log/slog" "net" + "sync" "testing" "time"Likely an incorrect or invalid review comment.
| // handleConnectionError handles connection-level errors centrally | ||
| func (c *Connection) handleConnectionError(err error) error { | ||
| if err == nil { | ||
| return nil | ||
| } | ||
|
|
||
| // Only propagate EOF errors when acting as a client with active server-side protocols | ||
| if errors.Is(err, io.EOF) { | ||
| // Check if we have any active server-side protocols | ||
| if c.server { | ||
| return err | ||
| } | ||
|
|
||
| // For clients, only propagate EOF if we have active server protocols | ||
| hasActiveServerProtocols := false | ||
| if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() { | ||
| hasActiveServerProtocols = true | ||
| } | ||
| if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() { | ||
| hasActiveServerProtocols = true | ||
| } | ||
| if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() { | ||
| hasActiveServerProtocols = true | ||
| } | ||
| if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() { | ||
| hasActiveServerProtocols = true | ||
| } | ||
| if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() { | ||
| hasActiveServerProtocols = true | ||
| } | ||
| if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() { | ||
| hasActiveServerProtocols = true | ||
| } | ||
|
|
||
| if hasActiveServerProtocols { | ||
| return err | ||
| } | ||
|
|
||
| // EOF with no active server protocols is normal connection closure | ||
| return nil | ||
| } | ||
|
|
||
| // For non-EOF errors, always propagate | ||
| return err | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
EOF handling logic inverted for server/client; also consider ErrUnexpectedEOF
- As server, we should NOT surface EOF if all server-side protocols (chainsync, block-fetch, tx-submission; plus local protocols in NtC) are done or never started. Current code always returns err when c.server is true.
- As client, we should check our client-side protocols, not server-side, to decide whether EOF is unexpected.
- Also handle io.ErrUnexpectedEOF (or ensure the muxer remaps it to io.EOF).
Proposed fix:
func (c *Connection) handleConnectionError(err error) error {
if err == nil {
return nil
}
- // Only propagate EOF errors when acting as a client with active server-side protocols
- if errors.Is(err, io.EOF) {
- // Check if we have any active server-side protocols
- if c.server {
- return err
- }
-
- // For clients, only propagate EOF if we have active server protocols
- hasActiveServerProtocols := false
- if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() {
- hasActiveServerProtocols = true
- }
- if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() {
- hasActiveServerProtocols = true
- }
- if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() {
- hasActiveServerProtocols = true
- }
- if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() {
- hasActiveServerProtocols = true
- }
- if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() {
- hasActiveServerProtocols = true
- }
- if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() {
- hasActiveServerProtocols = true
- }
-
- if hasActiveServerProtocols {
- return err
- }
-
- // EOF with no active server protocols is normal connection closure
- return nil
- }
+ // Treat EOF/UnexpectedEOF as connection closed, decide based on active protocols for our role
+ if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
+ hasActive := false
+ if c.server {
+ // Server: check server-side protocols
+ if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() {
+ hasActive = true
+ }
+ if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() {
+ hasActive = true
+ }
+ if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() {
+ hasActive = true
+ }
+ if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() {
+ hasActive = true
+ }
+ if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() {
+ hasActive = true
+ }
+ if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() {
+ hasActive = true
+ }
+ } else {
+ // Client: check client-side protocols
+ if c.chainSync != nil && c.chainSync.Client != nil && !c.chainSync.Client.IsDone() {
+ hasActive = true
+ }
+ if c.blockFetch != nil && c.blockFetch.Client != nil && !c.blockFetch.Client.IsDone() {
+ hasActive = true
+ }
+ if c.txSubmission != nil && c.txSubmission.Client != nil && !c.txSubmission.Client.IsDone() {
+ hasActive = true
+ }
+ if c.localStateQuery != nil && c.localStateQuery.Client != nil && !c.localStateQuery.Client.IsDone() {
+ hasActive = true
+ }
+ if c.localTxMonitor != nil && c.localTxMonitor.Client != nil && !c.localTxMonitor.Client.IsDone() {
+ hasActive = true
+ }
+ if c.localTxSubmission != nil && c.localTxSubmission.Client != nil && !c.localTxSubmission.Client.IsDone() {
+ hasActive = true
+ }
+ }
+ // Propagate only if a protocol is still active; otherwise swallow as normal closure
+ if hasActive {
+ return err
+ }
+ return nil
+ }
// For non-EOF errors, always propagate
return err
}To confirm muxer behavior for UnexpectedEOF remapping, run:
Also applies to: 335-347
🏁 Script executed:
#!/bin/bash
# Check if muxer remaps io.ErrUnexpectedEOF to io.EOF
rg -nP 'ErrUnexpectedEOF|UnexpectedEOF' muxer/ --context 2 || trueLength of output: 0
🏁 Script executed:
#!/bin/bash
# 1. Check if muxer directory exists and list structure
echo "=== Checking muxer directory ==="
fd --type d 'muxer' . || echo "No muxer directory found"
# 2. Search for EOF handling patterns more broadly in codebase
echo -e "\n=== Searching for EOF handling patterns ==="
rg -n 'io\.EOF|ErrUnexpectedEOF' --type go -B2 -A2 || echo "No EOF patterns found"
# 3. Verify current connection.go exists and show lines 253-297
echo -e "\n=== Verifying connection.go current state ==="
if [ -f connection.go ]; then
sed -n '253,297p' connection.go
else
echo "connection.go not found at root; searching for it..."
fd --type f 'connection.go' . --exec head -20 {}
fiLength of output: 5839
🌐 Web query:
Go io.ErrUnexpectedEOF error type documentation
💡 Result:
io.ErrUnexpectedEOF
-
What it is: a package-level error variable defined as
var ErrUnexpectedEOF = errors.New("unexpected EOF")
meaning EOF was encountered in the middle of reading a fixed-size block or structured data. [1] -
Where it’s declared: package io in the Go standard library. [1]
-
When it’s returned: commonly by helpers that require reading an exact number of bytes (e.g. io.ReadFull, io.ReadAtLeast) and by packages that read fixed-size binary structures (e.g. encoding/binary) when EOF occurs after some but not all bytes were read. If no bytes were read, functions typically return io.EOF instead. [2][3]
-
How to check it: since Go 1.13 errors may be wrapped, use errors.Is(err, io.ErrUnexpectedEOF) to detect it robustly. (Older code sometimes used err == io.ErrUnexpectedEOF.) [4]
Sources:
[1] io package docs (ErrUnexpectedEOF definition).
[2] io.ReadFull / ReadAtLeast behavior (returns ErrUnexpectedEOF when partial read then EOF).
[3] encoding/binary.Read behavior (returns io.ErrUnexpectedEOF if EOF after some bytes).
[4] Go error-handling FAQ (use errors.Is for comparisons).
🏁 Script executed:
#!/bin/bash
# Get current state of connection.go lines 250-350 for full context
echo "=== git/connection.go lines 250-350 ==="
sed -n '250,350p' git/connection.goLength of output: 100
🏁 Script executed:
#!/bin/bash
# Find connection.go and show lines 250-350
echo "=== Locating connection.go ==="
CONN_FILE=$(fd -t f 'connection\.go$' 2>/dev/null | head -1)
echo "Found at: $CONN_FILE"
if [ -n "$CONN_FILE" ]; then
echo -e "\n=== $CONN_FILE lines 250-350 ==="
sed -n '250,350p' "$CONN_FILE"
else
echo "connection.go not found"
fiLength of output: 3089
🏁 Script executed:
#!/bin/bash
# Check lines 335-347 for the "also applies to" section
CONN_FILE=$(fd -t f 'connection\.go$' 2>/dev/null | head -1)
if [ -n "$CONN_FILE" ]; then
echo "=== $CONN_FILE lines 330-355 (context for 335-347) ==="
sed -n '330,355p' "$CONN_FILE"
fiLength of output: 802
Fix EOF handling logic: server/client role checks are inverted
The current code has the logic backwards:
- Server (line 266): returns EOF unconditionally without checking active server-side protocols
- Client (lines 271–287): checks server-side protocol status instead of client-side
This means servers always propagate EOF (even when protocols are active), and clients check the wrong role's protocols. The proposed fix correctly swaps the logic: server checks Server fields, client checks Client fields. Additionally, handle io.ErrUnexpectedEOF alongside io.EOF, as the protocol layer already uses it for partial reads.
Apply the proposed diff and ensure the muxer behavior aligns with the corrected logic.
🤖 Prompt for AI Agents
In connection.go around lines 253 to 297, the EOF handling logic is inverted and
misses io.ErrUnexpectedEOF; change the EOF check to treat both io.EOF and
io.ErrUnexpectedEOF the same, and swap the role-specific checks so that when
c.server is true you inspect the protocol .Server fields for active server-side
protocols, and when c.server is false (client) you inspect the protocol .Client
fields for active client-side protocols; if any active protocols are found
return the error, otherwise treat EOF/ErrUnexpectedEOF as normal closure and
return nil.
| import ( | ||
| "io" | ||
| "log/slog" | ||
| "net" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/blinklabs-io/gouroboros/connection" | ||
| "github.com/blinklabs-io/gouroboros/muxer" | ||
| "github.com/blinklabs-io/gouroboros/protocol" | ||
| "github.com/blinklabs-io/gouroboros/protocol/common" | ||
| "github.com/stretchr/testify/assert" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Import sync; avoid busy-read behavior.
You’ll add sync for mutex/once and fix Read to block/EOF on close to prevent spin loops in muxer readers.
Apply this diff:
import (
"io"
"log/slog"
"net"
+ "sync"
"testing"
"time"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import ( | |
| "io" | |
| "log/slog" | |
| "net" | |
| "testing" | |
| "time" | |
| "github.com/blinklabs-io/gouroboros/connection" | |
| "github.com/blinklabs-io/gouroboros/muxer" | |
| "github.com/blinklabs-io/gouroboros/protocol" | |
| "github.com/blinklabs-io/gouroboros/protocol/common" | |
| "github.com/stretchr/testify/assert" | |
| ) | |
| import ( | |
| "io" | |
| "log/slog" | |
| "net" | |
| "sync" | |
| "testing" | |
| "time" | |
| "github.com/blinklabs-io/gouroboros/connection" | |
| "github.com/blinklabs-io/gouroboros/muxer" | |
| "github.com/blinklabs-io/gouroboros/protocol" | |
| "github.com/blinklabs-io/gouroboros/protocol/common" | |
| "github.com/stretchr/testify/assert" | |
| ) |
🤖 Prompt for AI Agents
In protocol/chainsync/chainsync_test.go around lines 17 to 29, add the missing
"sync" import and modify the muxer reader Read implementation to avoid
busy-looping: introduce a sync.Mutex (and sync.Once where appropriate) and a
condition or channel to block Read until data is available or the stream is
closed; on close, ensure pending Reads unblock and return io.EOF (after draining
any buffered data) instead of spinning. Update any close logic to signal the
condition/channel and use the mutex to protect shared state so Read properly
blocks and exits with EOF on close.
| type testConn struct { | ||
| writeChan chan []byte | ||
| closed bool | ||
| closeChan chan struct{} | ||
| } | ||
|
|
||
| func newTestConn() *testConn { | ||
| return &testConn{ | ||
| writeChan: make(chan []byte, 100), | ||
| closeChan: make(chan struct{}), | ||
| } | ||
| } | ||
|
|
||
| func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil } | ||
| func (c *testConn) Write(b []byte) (n int, err error) { | ||
| select { | ||
| case c.writeChan <- b: | ||
| return len(b), nil | ||
| case <-c.closeChan: | ||
| return 0, io.EOF | ||
| } | ||
| } | ||
| func (c *testConn) Close() error { | ||
| if !c.closed { | ||
| close(c.closeChan) | ||
| c.closed = true | ||
| } | ||
| return nil | ||
| } | ||
| func (c *testConn) LocalAddr() net.Addr { return testAddr{} } | ||
| func (c *testConn) RemoteAddr() net.Addr { return testAddr{} } | ||
| func (c *testConn) SetDeadline(t time.Time) error { return nil } | ||
| func (c *testConn) SetReadDeadline(t time.Time) error { return nil } | ||
| func (c *testConn) SetWriteDeadline(t time.Time) error { return nil } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Harden testConn: thread‑safe close/write and non‑spinning Read.
Current Read returns (0,nil) and Close/Write aren’t synchronized. This can spin a reader and race on close.
Apply this diff:
type testConn struct {
- writeChan chan []byte
- closed bool
- closeChan chan struct{}
+ writeChan chan []byte
+ closed bool
+ closeChan chan struct{}
+ mu sync.Mutex
+ closeOnce sync.Once
}
func newTestConn() *testConn {
return &testConn{
writeChan: make(chan []byte, 100),
closeChan: make(chan struct{}),
}
}
-func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil }
+func (c *testConn) Read(b []byte) (n int, err error) {
+ // Block until closed; avoids busy-loop in readers
+ <-c.closeChan
+ return 0, io.EOF
+}
func (c *testConn) Write(b []byte) (n int, err error) {
select {
case c.writeChan <- b:
return len(b), nil
case <-c.closeChan:
return 0, io.EOF
}
}
func (c *testConn) Close() error {
- if !c.closed {
- close(c.closeChan)
- c.closed = true
- }
+ c.closeOnce.Do(func() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ close(c.closeChan)
+ c.closed = true
+ })
return nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| type testConn struct { | |
| writeChan chan []byte | |
| closed bool | |
| closeChan chan struct{} | |
| } | |
| func newTestConn() *testConn { | |
| return &testConn{ | |
| writeChan: make(chan []byte, 100), | |
| closeChan: make(chan struct{}), | |
| } | |
| } | |
| func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil } | |
| func (c *testConn) Write(b []byte) (n int, err error) { | |
| select { | |
| case c.writeChan <- b: | |
| return len(b), nil | |
| case <-c.closeChan: | |
| return 0, io.EOF | |
| } | |
| } | |
| func (c *testConn) Close() error { | |
| if !c.closed { | |
| close(c.closeChan) | |
| c.closed = true | |
| } | |
| return nil | |
| } | |
| func (c *testConn) LocalAddr() net.Addr { return testAddr{} } | |
| func (c *testConn) RemoteAddr() net.Addr { return testAddr{} } | |
| func (c *testConn) SetDeadline(t time.Time) error { return nil } | |
| func (c *testConn) SetReadDeadline(t time.Time) error { return nil } | |
| func (c *testConn) SetWriteDeadline(t time.Time) error { return nil } | |
| type testConn struct { | |
| writeChan chan []byte | |
| closed bool | |
| closeChan chan struct{} | |
| mu sync.Mutex | |
| closeOnce sync.Once | |
| } | |
| func newTestConn() *testConn { | |
| return &testConn{ | |
| writeChan: make(chan []byte, 100), | |
| closeChan: make(chan struct{}), | |
| } | |
| } | |
| func (c *testConn) Read(b []byte) (n int, err error) { | |
| // Block until closed; avoids busy-loop in readers | |
| <-c.closeChan | |
| return 0, io.EOF | |
| } | |
| func (c *testConn) Write(b []byte) (n int, err error) { | |
| select { | |
| case c.writeChan <- b: | |
| return len(b), nil | |
| case <-c.closeChan: | |
| return 0, io.EOF | |
| } | |
| } | |
| func (c *testConn) Close() error { | |
| c.closeOnce.Do(func() { | |
| c.mu.Lock() | |
| defer c.mu.Unlock() | |
| close(c.closeChan) | |
| c.closed = true | |
| }) | |
| return nil | |
| } | |
| func (c *testConn) LocalAddr() net.Addr { return testAddr{} } | |
| func (c *testConn) RemoteAddr() net.Addr { return testAddr{} } | |
| func (c *testConn) SetDeadline(t time.Time) error { return nil } | |
| func (c *testConn) SetReadDeadline(t time.Time) error { return nil } | |
| func (c *testConn) SetWriteDeadline(t time.Time) error { return nil } |
🤖 Prompt for AI Agents
In protocol/chainsync/chainsync_test.go around lines 36 to 69, the testConn
implementation is unsafe: Read currently returns (0, nil) (which can spin a
reader) and Write/Close race on the closed flag; fix by making Read block and
return data or io.EOF (read from the existing writeChan and copy into the
provided buffer, returning the number of bytes read and nil, and return io.EOF
when closeChan is closed), and make Close/Write thread-safe by protecting the
closed state with synchronization (use a mutex or atomics) or by relying on the
closeChan only for signaling so writes detect closed and return io.EOF and Close
becomes idempotent and safe to call concurrently. Ensure no goroutine spins when
connection is closed and that Write returns appropriate errors when closed.
| func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil } | ||
| func (c *testConn) Close() error { | ||
| c.closeOnce.Do(func() { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| close(c.closeChan) | ||
| c.closed = true | ||
| }) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make Read blocking and Close idempotent/thread‑safe.
Avoid (0,nil) Read and data races on close.
-func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil }
+func (c *testConn) Read(b []byte) (n int, err error) {
+ <-c.closeChan
+ return 0, io.EOF
+}
func (c *testConn) Close() error {
c.closeOnce.Do(func() {
c.mu.Lock()
defer c.mu.Unlock()
close(c.closeChan)
c.closed = true
})
return nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil } | |
| func (c *testConn) Close() error { | |
| c.closeOnce.Do(func() { | |
| c.mu.Lock() | |
| defer c.mu.Unlock() | |
| close(c.closeChan) | |
| c.closed = true | |
| }) | |
| return nil | |
| } | |
| func (c *testConn) Read(b []byte) (n int, err error) { | |
| <-c.closeChan | |
| return 0, io.EOF | |
| } | |
| func (c *testConn) Close() error { | |
| c.closeOnce.Do(func() { | |
| c.mu.Lock() | |
| defer c.mu.Unlock() | |
| close(c.closeChan) | |
| c.closed = true | |
| }) | |
| return nil | |
| } |
| func getTestProtocolOptions(conn net.Conn) protocol.ProtocolOptions { | ||
| mux := muxer.New(conn) | ||
| return protocol.ProtocolOptions{ | ||
| ConnectionId: connection.ConnectionId{ | ||
| LocalAddr: testAddr{}, | ||
| RemoteAddr: testAddr{}, | ||
| }, | ||
| Muxer: mux, | ||
| Logger: slog.Default(), | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start/stop muxer so writes reach the fake conn.
Without starting the muxer, client/server SendMessage may not hit conn.Write.
func getTestProtocolOptions(conn net.Conn) protocol.ProtocolOptions {
- mux := muxer.New(conn)
- return protocol.ProtocolOptions{
+ mux := muxer.New(conn)
+ go mux.Start()
+ go func() {
+ <-conn.(*testConn).closeChan
+ mux.Stop()
+ }()
+ return protocol.ProtocolOptions{
ConnectionId: connection.ConnectionId{
LocalAddr: testAddr{},
RemoteAddr: testAddr{},
},
Muxer: mux,
Logger: slog.Default(),
+ Mode: protocol.ProtocolModeNodeToNode,
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func getTestProtocolOptions(conn net.Conn) protocol.ProtocolOptions { | |
| mux := muxer.New(conn) | |
| return protocol.ProtocolOptions{ | |
| ConnectionId: connection.ConnectionId{ | |
| LocalAddr: testAddr{}, | |
| RemoteAddr: testAddr{}, | |
| }, | |
| Muxer: mux, | |
| Logger: slog.Default(), | |
| } | |
| } | |
| func getTestProtocolOptions(conn net.Conn) protocol.ProtocolOptions { | |
| mux := muxer.New(conn) | |
| go mux.Start() | |
| go func() { | |
| <-conn.(*testConn).closeChan | |
| mux.Stop() | |
| }() | |
| return protocol.ProtocolOptions{ | |
| ConnectionId: connection.ConnectionId{ | |
| LocalAddr: testAddr{}, | |
| RemoteAddr: testAddr{}, | |
| }, | |
| Muxer: mux, | |
| Logger: slog.Default(), | |
| Mode: protocol.ProtocolModeNodeToNode, | |
| } | |
| } |
🤖 Prompt for AI Agents
In protocol/txsubmission/txsubmission_test.go around lines 81 to 91, the created
muxer is never started so writes from client/server won't reach the fake
connection; after creating the muxer call its Start() (or equivalent start
method) before returning the ProtocolOptions and ensure tests stop it (call
Close()/Stop() or register t.Cleanup to stop the muxer) so the muxer is running
for the duration of the test and is properly torn down afterwards.
Signed-off-by: Jenita <jkawan@blinklabs.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
♻️ Duplicate comments (1)
connection_test.go (1)
225-231: NtN handshake: align roles and remove client–client pattern.oConn is a client (no WithServer(true)), but mock is also ProtocolRoleClient and sends a handshake response. That’s client–client with the response from the wrong side and can stall the Propose state. This mirrors earlier feedback.
Fix one of the two ways; simplest here: keep oConn as client, make mock the server, and only send the NtN response:
- mockConn := ouroboros_mock.NewConnection( - ouroboros_mock.ProtocolRoleClient, + mockConn := ouroboros_mock.NewConnection( + ouroboros_mock.ProtocolRoleServer, []ouroboros_mock.ConversationEntry{ - ouroboros_mock.ConversationEntryHandshakeRequestGeneric, - ouroboros_mock.ConversationEntryHandshakeNtNResponse, + ouroboros_mock.ConversationEntryHandshakeNtNResponse, }, )
🧹 Nitpick comments (2)
connection_test.go (2)
34-36: Reduce goleak false positives by ignoring known mock runner goroutine (optional).If leaks persist after proper shutdown, ignore the mock runner top‑function to stabilize CI.
Example:
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/blinklabs-io/ouroboros-mock.(*Connection).run"), )Use only if cleanup is correct and a stable residual goroutine remains from the mock.
Also applies to: 223-229, 263-266
103-112: Avoid spin‑wait; replace with a deterministic wait helper.Polling oConn.ChainSync() is flaky. Prefer a helper that blocks on a ready signal (e.g., from the callback barrier) or exposes a handshake‑finished channel.
I can provide a small test helper like waitForChainSyncServer(t, oConn, 1*time.Second) that uses a barrier instead of sleeps.
Also applies to: 185-194
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
connection_test.go(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
connection_test.go (8)
connection.go (2)
NewConnection(101-124)New(127-129)protocol/handshake/messages.go (2)
NewMsgProposeVersions(64-80)NewMsgAcceptVersion(88-102)protocol/versions.go (2)
ProtocolVersionMap(24-24)ProtocolVersionNtCOffset(20-20)protocol/versiondata.go (1)
VersionDataNtC9to14(48-48)protocol/chainsync/messages.go (1)
NewMsgFindIntersect(224-232)protocol/chainsync/chainsync.go (4)
ProtocolIdNtC(31-31)New(232-240)NewConfig(246-261)ChainSync(194-197)protocol/common/types.go (1)
Point(23-28)connection_options.go (5)
WithConnection(36-40)WithNetworkMagic(50-54)WithServer(64-68)WithChainSyncConfig(131-135)WithNodeToNode(78-82)
🪛 GitHub Actions: go-test
connection_test.go
[error] 205-219: TestErrorHandlingWithActiveProtocols: timed out waiting for protocol to stop. Found large set of unexpected goroutines traces in the test output.
[error] 254-260: TestErrorHandlingWithMultipleProtocols: Received connection error with multiple active protocols: EOF. Unexpected goroutines reported in test.
[error] 300-300: TestBasicErrorHandling: Unexpected goroutines reported during test teardown (goroutine leaks detected).
| chainsync.WithFindIntersectFunc( | ||
| func(ctx chainsync.CallbackContext, points []common.Point) (common.Point, chainsync.Tip, error) { | ||
| // We need to block here to keep the protocol active | ||
| time.Sleep(5 * time.Second) | ||
| return common.Point{}, chainsync.Tip{}, fmt.Errorf("context cancelled") | ||
| }, | ||
| ), | ||
| ), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove 5s sleep; block on protocol context and signal readiness to avoid leaks/flakes.
time.Sleep(5s) keeps goroutines alive and causes goleak failures. Use a barrier to know CS is active, then block on context cancellation so shutdown is prompt.
Apply:
- chainsync.WithFindIntersectFunc(
- func(ctx chainsync.CallbackContext, points []common.Point) (common.Point, chainsync.Tip, error) {
- // We need to block here to keep the protocol active
- time.Sleep(5 * time.Second)
- return common.Point{}, chainsync.Tip{}, fmt.Errorf("context cancelled")
- },
- ),
+ chainsync.WithFindIntersectFunc(
+ func(ctx chainsync.CallbackContext, points []common.Point) (common.Point, chainsync.Tip, error) {
+ // Signal test that ChainSync is active
+ if entered := ctx.Value("enteredFindIntersect"); entered != nil {
+ if ch, ok := entered.(chan struct{}); ok {
+ select { case <-ch: default: close(ch) }
+ }
+ }
+ // Wait for shutdown instead of sleeping
+ type hasContext interface{ Context() interface{ Done() <-chan struct{}; Err() error } }
+ if hc, ok := any(ctx).(hasContext); ok {
+ <-hc.Context().Done()
+ return common.Point{}, chainsync.Tip{}, hc.Context().Err()
+ }
+ // Fallback: short wait to keep protocol "active" without leaking
+ <-time.After(250 * time.Millisecond)
+ return common.Point{}, chainsync.Tip{}, fmt.Errorf("closed")
+ },
+ ),And create/await the barrier near the test:
- // Wait a bit for protocol to start
- time.Sleep(100 * time.Millisecond)
+ // Barrier to know callback ran
+ enteredFindIntersect := make(chan struct{})
+ oConn.SetValue("enteredFindIntersect", enteredFindIntersect) // helper on Connection to pass into ctx; or thread via cfg if available
+ select {
+ case <-enteredFindIntersect:
+ case <-time.After(1 * time.Second):
+ t.Fatal("timeout waiting for FindIntersect to start")
+ }If SetValue isn’t available, thread the channel via your ChainSync config’s context or a package-level var for the test only. Based on learnings.
Also applies to: 118-123
🤖 Prompt for AI Agents
In connection_test.go around lines 89-96 (and similarly at 118-123), replace the
hardcoded time.Sleep(5 * time.Second) with a synchronization barrier that
signals when the ChainSync callback is active and then blocks on the
protocol/test context; specifically, create a ready channel or waitgroup before
starting the protocol, have the FindIntersectFunc close/send on that channel to
indicate readiness, and then wait on ctx.Done() (or the passed protocol context)
instead of sleeping so the goroutine exits promptly on cancellation; if you
cannot SetValue on the context, pass the ready channel into the ChainSync test
config or use a test-only package-level channel to coordinate.
| // Send Done message to stop the protocol | ||
| ouroboros_mock.ConversationEntryOutput{ | ||
| ProtocolId: chainsync.ProtocolIdNtC, | ||
| Messages: []protocol.Message{chainsync.NewMsgDone()}, | ||
| }, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure ChainSync actually starts before Done; assert no error after stopped.
Currently only Done is sent; if the server loop never starts, DoneChan may not close, causing the timeout. Also, the test logs an error instead of failing when one is received.
Apply:
- // Send Done message to stop the protocol
- ouroboros_mock.ConversationEntryOutput{
- ProtocolId: chainsync.ProtocolIdNtC,
- Messages: []protocol.Message{chainsync.NewMsgDone()},
- },
+ // Start ChainSync, then stop it
+ ouroboros_mock.ConversationEntryOutput{
+ ProtocolId: chainsync.ProtocolIdNtC,
+ Messages: []protocol.Message{
+ chainsync.NewMsgFindIntersect(
+ []common.Point{{Slot: 21600, Hash: []byte("19297addad3da631einos029")}},
+ ),
+ chainsync.NewMsgDone(),
+ },
+ },- // Wait for protocol to be done (Done message from mock should trigger this)
+ // Wait for protocol to be done (Done message should trigger this)
select {
case <-chainSyncProtocol.Server.DoneChan():
- // Protocol is stoppeds
- case <-time.After(1 * time.Second):
+ // Protocol is stopped
+ case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for protocol to stop")
}- // Now close the mock connection to generate an error
+ // Now close the mock connection; errors after all protocols stopped should be ignored
mockConn.Close()
select {
- case err := <-oConn.ErrorChan():
- t.Logf("Received error during shutdown: %s", err)
+ case err := <-oConn.ErrorChan():
+ t.Fatalf("unexpected connection error after protocols stopped: %v", err)
case <-time.After(500 * time.Millisecond):
- t.Log("No connection error received (expected when protocols are stopped)")
+ // OK: no error
}Also applies to: 200-206, 210-215
connection_test.go
Outdated
| select { | ||
| case err := <-oConn.ErrorChan(): | ||
| t.Logf("Received error during shutdown: %s", err) | ||
| case <-time.After(500 * time.Millisecond): | ||
| t.Log("No connection error received (expected when protocols are stopped)") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Tighten assertions on ErrorChan to match PR behavior.
Tests should fail on unexpected errors rather than log them, ensuring semantics don’t regress silently.
See diffs above where logging was replaced with t.Fatalf on any received error in “stopped/no‑traffic” scenarios.
Also applies to: 249-257
🤖 Prompt for AI Agents
In connection_test.go around lines 210-215, the test currently logs any error
received from oConn.ErrorChan() during shutdown, but the review requires
tightening assertions so unexpected errors fail the test; change the select
branch that currently calls t.Logf("Received error during shutdown: %s", err) to
call t.Fatalf with the same message to fail on errors, and make the identical
replacement for the other occurrence mentioned at lines ~249-257 so both
“stopped/no-traffic” scenarios fail the test when an error is received instead
of merely logging it.
Signed-off-by: Jenita <jkawan@blinklabs.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
protocol/chainsync/client.go (1)
122-137: Fix panic: send on closed channel in ChainSync clientreadyForNextBlockChan is closed on Done, but handleRollForward/Backward can still send, causing “send on closed channel” (seen in CI). Don’t close the channel; instead, make syncLoop exit on Done.
Apply:
@@ func (c *Client) Start() { - go func() { - <-c.DoneChan() - close(c.readyForNextBlockChan) - }() + go func() { + <-c.DoneChan() + // No channel close; syncLoop exits on Done + }() @@ func (c *Client) syncLoop() { - for { - // Wait for a block to be received - if ready, ok := <-c.readyForNextBlockChan; !ok { - // Channel is closed, which means we're shutting down - return - } else if !ready { - // Sync was cancelled - return - } + for { + // Wait for a block to be received or shutdown + var ready bool + select { + case <-c.DoneChan(): + return + case ready = <-c.readyForNextBlockChan: + // continue + } + if !ready { + // Sync was cancelled + return + } @@ func (c *Client) handleRollForward(...) error { - // Signal that we're ready for the next block - c.readyForNextBlockChan <- true + // Signal that we're ready for the next block + c.readyForNextBlockChan <- true return nil @@ func (c *Client) handleRollBackward(msg protocol.Message) error { - // Signal that we're ready for the next block - c.readyForNextBlockChan <- true + // Signal that we're ready for the next block + c.readyForNextBlockChan <- true return nilNote: Since we no longer close the channel, no additional guards are required at send sites; syncLoop exits via Done.
Also applies to: 440-456, 724-736, 765-767
♻️ Duplicate comments (4)
connection_test.go (3)
207-214: Tighten assertion: any error after protocols stopped should failWhen protocols are stopped, closing the connection must not surface an error. Fail the test if any error is received instead of logging.
- select { - case err := <-oConn.ErrorChan(): - t.Logf("Received error during shutdown: %s", err) - case <-time.After(500 * time.Millisecond): - t.Log("No connection error received (expected when protocols are stopped)") - } + select { + case err := <-oConn.ErrorChan(): + t.Fatalf("unexpected connection error after protocols stopped: %v", err) + case <-time.After(500 * time.Millisecond): + // OK: no error + }
224-256: Fix NtN handshake role mismatch and test semantics; expect no error when no trafficThe mock sends a handshake response while oConn is not configured as server, causing client–client stalemate risk. Also, this test asserts an error without exchanging any mini‑protocol messages, which contradicts the PR behavior (“started but no messages exchanged” → no error).
Apply:
mockConn := ouroboros_mock.NewConnection( ouroboros_mock.ProtocolRoleClient, []ouroboros_mock.ConversationEntry{ - ouroboros_mock.ConversationEntryHandshakeRequestGeneric, - ouroboros_mock.ConversationEntryHandshakeNtNResponse, + // Client proposes; server (our oConn) will respond + ouroboros_mock.ConversationEntryHandshakeRequestGeneric, }, ) oConn, err := ouroboros.New( ouroboros.WithConnection(mockConn), ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic), - ouroboros.WithNodeToNode(true), + ouroboros.WithNodeToNode(true), + ouroboros.WithServer(true), ) @@ -// Wait for handshake to complete -time.Sleep(100 * time.Millisecond) +// Optionally wait briefly or add a handshake barrier if available +time.Sleep(100 * time.Millisecond) @@ -// Should receive error since protocols are active -select { -case err := <-oConn.ErrorChan(): - if err == nil { - t.Fatal("expected connection error, got nil") - } - t.Logf("Received connection error with multiple active protocols: %s", err) -case <-time.After(2 * time.Second): - t.Error("timed out waiting for connection error") -} +// No mini‑protocol traffic: closing should not emit an error +select { +case err := <-oConn.ErrorChan(): + t.Fatalf("unexpected error with no active protocol traffic: %v", err) +case <-time.After(750 * time.Millisecond): + // OK: no error +}
199-205: Typo in comment“stoppeds” → “stopped”.
- // Protocol is stoppeds + // Protocol is stoppedconnection.go (1)
256-300: Fix inverted EOF handling logic and include ErrUnexpectedEOF as connection closureThe logic for determining when to propagate EOF is backwards. When acting as a server (
c.server=true), the code unconditionally returns the error instead of checking if server-side protocols are active. Conversely, when acting as a client, it checks server-side protocol states instead of client-side. This inverts the intended behavior and will suppress errors that should propagate.Additionally,
io.ErrUnexpectedEOFshould be treated alongsideio.EOFas a normal connection closure signal, since both indicate the connection is closed (protocol.go currently handles it as an incomplete message state, but at the connection level it signals closure).Apply:
func (c *Connection) handleConnectionError(err error) error { if err == nil { return nil } - // Only propagate EOF errors when acting as a client with active server-side protocols - if errors.Is(err, io.EOF) { - // Check if we have any active server-side protocols - if c.server { - return err - } - - // For clients, only propagate EOF if we have active server protocols - hasActiveServerProtocols := false - if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() { - hasActiveServerProtocols = true - } - - if hasActiveServerProtocols { - return err - } - - // EOF with no active server protocols is normal connection closure - return nil - } + // Treat EOF/UnexpectedEOF as normal closure unless our role still has active protocols + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + hasActive := false + if c.server { + // Server: check server-side protocols + if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() { hasActive = true } + if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() { hasActive = true } + if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() { hasActive = true } + if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() { hasActive = true } + if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() { hasActive = true } + if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() { hasActive = true } + } else { + // Client: check client-side protocols + if c.chainSync != nil && c.chainSync.Client != nil && !c.chainSync.Client.IsDone() { hasActive = true } + if c.blockFetch != nil && c.blockFetch.Client != nil && !c.blockFetch.Client.IsDone() { hasActive = true } + if c.txSubmission != nil && c.txSubmission.Client != nil && !c.txSubmission.Client.IsDone() { hasActive = true } + if c.localStateQuery != nil && c.localStateQuery.Client != nil && !c.localStateQuery.Client.IsDone() { hasActive = true } + if c.localTxMonitor != nil && c.localTxMonitor.Client != nil && !c.localTxMonitor.Client.IsDone() { hasActive = true } + if c.localTxSubmission != nil && c.localTxSubmission.Client != nil && !c.localTxSubmission.Client.IsDone() { hasActive = true } + } + if hasActive { + return err + } + return nil + } // For non-EOF errors, always propagate return err }
🧹 Nitpick comments (1)
connection_test.go (1)
117-119: Avoid sleep-based readiness; use a barrier from the callbackReplace time.Sleep with a readiness signal from the FindIntersect callback to make the test deterministic and goleak-safe.
Example:
- // Wait a bit for protocol to start - time.Sleep(100 * time.Millisecond) + // Barrier to ensure callback entered + started := make(chan struct{}, 1) + // pass via config context or test-scoped var; e.g., wrap FindIntersectFunc to close(started) + // then here: + select { + case <-started: + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for ChainSync to start") + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
connection.go(5 hunks)connection_test.go(2 hunks)protocol/chainsync/chainsync.go(2 hunks)protocol/chainsync/client.go(2 hunks)protocol/chainsync/server.go(1 hunks)protocol/protocol.go(9 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
connection_test.go (6)
connection.go (2)
NewConnection(104-127)New(130-132)protocol/protocol.go (2)
ProtocolRoleClient(91-91)New(121-132)protocol/chainsync/messages.go (1)
NewMsgFindIntersect(224-232)protocol/chainsync/chainsync.go (6)
ProtocolIdNtC(32-32)New(234-242)NewConfig(248-263)WithFindIntersectFunc(291-297)CallbackContext(214-219)ChainSync(195-198)protocol/common/types.go (1)
Point(23-28)connection_options.go (5)
WithConnection(36-40)WithNetworkMagic(50-54)WithServer(64-68)WithChainSyncConfig(131-135)WithNodeToNode(78-82)
connection.go (3)
protocol/chainsync/server.go (1)
Server(27-33)muxer/muxer.go (2)
ConnectionClosedError(72-75)Muxer(57-70)protocol/protocol.go (1)
ProtocolOptions(96-106)
protocol/protocol.go (2)
protocol/state.go (3)
State(32-35)StateMap(69-69)AgencyNone(26-26)protocol/message.go (1)
Message(18-22)
protocol/chainsync/client.go (1)
protocol/chainsync/messages.go (1)
NewMsgDone(270-277)
🪛 GitHub Actions: go-test
connection.go
[error] 81-81: panic: send on closed channel
🔇 Additional comments (2)
protocol/protocol.go (1)
134-153: State tracking implementation looks goodcurrentState with RW lock, CurrentState()/IsDone() (including initial state) and guarded updates in stateLoop align with lifecycle checks needed by centralized error handling.
Also applies to: 493-503, 552-571, 592-599
protocol/chainsync/chainsync.go (1)
18-26: Context propagation in callbacks looks goodEmbedding context.Context in CallbackContext enables clean cancellation and aligns with the connection’s context wiring.
Also applies to: 213-219
| s.callbackContext = CallbackContext{ | ||
| Context: protoOptions.Context, | ||
| Server: s, | ||
| ConnectionId: protoOptions.ConnectionId, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify Context is used in callbacks to differentiate clean shutdowns from errors.
The Context propagation to CallbackContext looks correct. However, to achieve the PR's objective of not returning errors on connection close when protocols are explicitly stopped, ensure that callback functions (e.g., RequestNextFunc, FindIntersectFunc) properly check s.callbackContext.Context to distinguish between clean shutdowns (context cancellation) and actual errors.
Run the following script to verify how the Context is used in callback implementations:
🏁 Script executed:
#!/bin/bash
# Description: Check if callbacks use Context to detect clean shutdowns vs errors
# Search for RequestNextFunc and FindIntersectFunc implementations
echo "=== Searching for callback function implementations ==="
rg -n -A 10 "RequestNextFunc.*func" --type=go
echo -e "\n=== Checking Context usage in chainsync callbacks ==="
rg -n -C 5 "callbackContext\.Context|ctx\.Done\(\)|ctx\.Err\(\)" --type=go -g '**/chainsync/**'
echo -e "\n=== Checking for connection close error handling ==="
rg -n -C 5 "connection.*close|ErrConnectionClosed" --type=go -g '**/chainsync/**'Length of output: 2331
🏁 Script executed:
#!/bin/bash
# Description: Deep dive into Context usage and error handling
# 1. Find and display handleRequestNext() implementation
echo "=== handleRequestNext() implementation ==="
rg -n -A 20 "func.*handleRequestNext" --type=go -g '**/chainsync/**'
# 2. Show CallbackContext struct definition
echo -e "\n=== CallbackContext struct definition ==="
rg -n -A 10 "type CallbackContext struct" --type=go -g '**/chainsync/**'
# 3. Search for all uses of callbackContext variable in server.go
echo -e "\n=== All uses of callbackContext in server.go ==="
rg -n "callbackContext" --type=go protocol/chainsync/server.go
# 4. Check if Context is referenced anywhere in callbacks or error returns
echo -e "\n=== Check Context field references in entire chainsync package ==="
rg -n "\.Context" --type=go -g '**/chainsync/**' | head -30Length of output: 3090
🏁 Script executed:
#!/bin/bash
# Description: Verify struct initialization syntax and error handling
# 1. Check if code compiles (look for build/test results)
echo "=== Checking CallbackContext initialization pattern in both files ==="
rg -n -B 2 -A 5 "CallbackContext{" --type=go -g '**/chainsync/**'
# 2. Search for handleDone() to see if Context is checked there
echo -e "\n=== handleDone() implementation ==="
rg -n -A 20 "func.*handleDone" --type=go -g '**/chainsync/**'
# 3. Check if error handling distinguishes between clean close and errors
echo -e "\n=== Error handling in protocol message handlers ==="
rg -n -B 2 "ctx\.Done\(\)|context\.Cause\(|Select\(" --type=go -g '**/chainsync/**'
# 4. Look for any context cancellation checks
echo -e "\n=== Context cancellation usage ==="
rg -n "Err\(\)|IsCanceled|IsDone" --type=go -g '**/chainsync/**'Length of output: 1097
🏁 Script executed:
#!/bin/bash
# Description: Final verification of Context initialization and usage
# 1. Check ProtocolOptions struct to verify Context field exists
echo "=== ProtocolOptions struct definition ==="
rg -n -A 15 "type ProtocolOptions struct" --type=go
# 2. Check how Context is used elsewhere in similar protocol implementations
echo -e "\n=== Context usage in other protocol servers ==="
rg -n "protoOptions\.Context" --type=go
# 3. Look for actual error suppression or clean shutdown logic
echo -e "\n=== Error suppression or clean shutdown checks ==="
rg -n -C 3 "Stop\(\)|shutdown|clean" --type=go -g '**/protocol/**' | head -40
# 4. Verify if embedded context.Context in struct works with named field init
echo -e "\n=== Check Go pattern for embedded context.Context field ==="
rg -n "context\.Context" --type=go -g '**/protocol/**' -B 2 -A 2 | head -30Length of output: 3477
Fix embedded context.Context initialization syntax and implement error handling to achieve PR objectives.
The code has two critical issues:
-
Syntax Error (line 48):
CallbackContextembedscontext.Contextas an unnamed field, but you're initializing it with named field syntaxContext: protoOptions.Context,. For embedded interfaces in Go, you cannot use named field syntax. Either omit the field name or check Go's struct literal rules for embedded types. -
No Error Handling Implementation: The
Contextfield is added but never used. To achieve the PR's objective of not returning errors on clean connection close, the callback functions andhandleDone()must checks.callbackContext.Err()or use context cancellation patterns to distinguish between explicit shutdowns (context cancellation) and actual errors. Currently, no code checks the context for cancellation.
🤖 Prompt for AI Agents
In protocol/chainsync/server.go around lines 47 to 51, the struct literal
incorrectly uses a named field for an embedded context.Context and there is no
handling of context cancellation to treat clean shutdowns as non-errors; fix by
initializing the embedded context using the embedded-field form (i.e. include
protoOptions.Context as the value for the anonymous embedded field, not
Context:), and update the callback functions and handleDone() to check
s.callbackContext.Err() (or select on s.callbackContext.Done()) and treat
context.Canceled / context.DeadlineExceeded as a clean close (do not return them
as errors), ensuring any error returned by operations is only propagated when
the context indicates a real failure rather than an intentional shutdown.
| ) | ||
| } | ||
| // Create context for connection | ||
| c.ctx, c.cancelCtx = context.WithCancel(context.Background()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have an async shutdown method, so we don't need to add another one.
|
I'm not sure if it's helpful, but I have some (abandoned) code on a similar path here: #1244 |
Signed-off-by: Jenita <jkawan@blinklabs.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
connection_test.go (1)
254-307: Test contradicts PR objectives—expects error when no protocol messages exchanged.Per the PR objectives from issue #1112, "Apply the same non-error behavior when the protocols have been started but no messages were exchanged before the connection close."
This test only performs a handshake (lines 261-263) with no mini-protocol traffic, yet expects to receive an error (lines 281-303). According to the PR requirements, closing after handshake-only should NOT return an error.
Apply this diff to align the test with PR semantics:
- // Should receive error since protocols were active + // Should NOT receive error since no mini-protocol messages were exchanged timeout := time.After(2 * time.Second) for { select { case err, ok := <-oConn.ErrorChan(): if !ok { t.Log("Error channel closed") goto done } if err == nil { - t.Error("received nil error") - continue + continue // Ignore nil errors + } + if err != nil { + t.Fatalf("unexpected error with no protocol traffic: %v", err) } - t.Logf("Received connection error with multiple active protocols: %s", err) - if strings.Contains(err.Error(), "EOF") || - strings.Contains(err.Error(), "use of closed network connection") { - goto done - } + case <-time.After(500 * time.Millisecond): + // OK: no error received + goto done case <-timeout: - t.Error("timed out waiting for connection error") + t.Fatal("test timeout") goto done } }
🧹 Nitpick comments (1)
connection_test.go (1)
118-119: Replace arbitrary sleep with protocol state check.The 100ms sleep at line 119 is arbitrary and could cause flakiness. Consider polling the protocol's current state or using the mock's synchronization mechanisms to confirm the ChainSync FindIntersect has been processed.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
connection_test.go(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
connection_test.go (4)
protocol/handshake/messages.go (3)
NewMsgProposeVersions(64-80)NewMsgFromCbor(39-57)NewMsgAcceptVersion(88-102)protocol/versiondata.go (1)
VersionDataNtC9to14(48-48)protocol/chainsync/messages.go (3)
NewMsgFromCbor(48-84)NewMsgFindIntersect(224-232)NewMsgDone(270-277)protocol/common/types.go (1)
Point(23-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: cubic · AI code reviewer
🔇 Additional comments (1)
connection_test.go (1)
309-347: LGTM!Both subtests are straightforward and correctly validate basic error handling scenarios: dial failure and idempotent close operations.
| // With protocols stopped, we should either get no error or just connection closed errors | ||
| timeout := time.After(2 * time.Second) | ||
| for { | ||
| select { | ||
| case err, ok := <-oConn.ErrorChan(): | ||
| if !ok { | ||
| t.Log("Error channel closed") | ||
| goto done | ||
| } | ||
| if err != nil { | ||
| if !strings.Contains(err.Error(), "EOF") && | ||
| !strings.Contains(err.Error(), "use of closed network connection") { | ||
| t.Errorf("Unexpected error during shutdown: %s", err) | ||
| } | ||
| } | ||
| case <-time.After(500 * time.Millisecond): | ||
| t.Log("No connection error received (expected when protocols are stopped)") | ||
| goto done | ||
| case <-timeout: | ||
| t.Error("timed out waiting for connection cleanup") | ||
| goto done | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test logic contradicts PR objectives—no error should be received when protocols are stopped.
According to the PR objectives, when protocols are explicitly stopped and the connection closes, the library should NOT return any error. However, this test allows EOF and "use of closed network connection" errors (lines 235-238).
The test should fail if any error is received on ErrorChan after protocols are stopped, consistent with the "ErrorsIgnoredWhenProtocolsStopped" subtest name and PR requirements.
Apply this diff to align with PR objectives:
- // With protocols stopped, we should either get no error or just connection closed errors
+ // With protocols stopped, we should not receive any errors
timeout := time.After(2 * time.Second)
for {
select {
case err, ok := <-oConn.ErrorChan():
if !ok {
t.Log("Error channel closed")
goto done
}
if err != nil {
- if !strings.Contains(err.Error(), "EOF") &&
- !strings.Contains(err.Error(), "use of closed network connection") {
- t.Errorf("Unexpected error during shutdown: %s", err)
- }
+ t.Fatalf("unexpected error after protocols stopped: %v", err)
}
case <-time.After(500 * time.Millisecond):
- t.Log("No connection error received (expected when protocols are stopped)")
+ // OK: no error received
goto done
case <-timeout:
- t.Error("timed out waiting for connection cleanup")
+ t.Fatal("test timeout")
goto done
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // With protocols stopped, we should either get no error or just connection closed errors | |
| timeout := time.After(2 * time.Second) | |
| for { | |
| select { | |
| case err, ok := <-oConn.ErrorChan(): | |
| if !ok { | |
| t.Log("Error channel closed") | |
| goto done | |
| } | |
| if err != nil { | |
| if !strings.Contains(err.Error(), "EOF") && | |
| !strings.Contains(err.Error(), "use of closed network connection") { | |
| t.Errorf("Unexpected error during shutdown: %s", err) | |
| } | |
| } | |
| case <-time.After(500 * time.Millisecond): | |
| t.Log("No connection error received (expected when protocols are stopped)") | |
| goto done | |
| case <-timeout: | |
| t.Error("timed out waiting for connection cleanup") | |
| goto done | |
| } | |
| } | |
| // With protocols stopped, we should not receive any errors | |
| timeout := time.After(2 * time.Second) | |
| for { | |
| select { | |
| case err, ok := <-oConn.ErrorChan(): | |
| if !ok { | |
| t.Log("Error channel closed") | |
| goto done | |
| } | |
| if err != nil { | |
| t.Fatalf("unexpected error after protocols stopped: %v", err) | |
| } | |
| case <-time.After(500 * time.Millisecond): | |
| // OK: no error received | |
| goto done | |
| case <-timeout: | |
| t.Fatal("test timeout") | |
| goto done | |
| } | |
| } |
🤖 Prompt for AI Agents
In connection_test.go around lines 225 to 247, the subtest currently permits EOF
and "use of closed network connection" errors after protocols are stopped, which
contradicts the PR objective that no errors should be reported; update the
select case that reads from oConn.ErrorChan() so that any non-nil err causes the
test to fail (remove the conditional that ignores EOF/closed connection
messages) while preserving the existing handling for the channel being closed
(ok == false) and the other timeout/no-error branches.
Signed-off-by: jkawan <kawanjenita@outlook.com>
Signed-off-by: Jenita <jkawan@blinklabs.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 issues found across 11 files
Prompt for AI agents (all 3 issues)
Understand the root cause of the following 3 issues and fix them.
<file name="connection_test.go">
<violation number="1" location="connection_test.go:130">
Fail the test instead of just logging when ErrorChan closes before any error is received so the expected connection error is actually enforced.</violation>
</file>
<file name="connection.go">
<violation number="1" location="connection.go:352">
Always call `Close()` when the muxer reports connection termination; otherwise normal EOFs leave the connection running and leak resources.</violation>
</file>
<file name="protocol/protocol.go">
<violation number="1" location="protocol/protocol.go:151">
`IsDone` now reports “done” whenever the protocol is merely back in its initial state, so clients no longer send the required Done messages during `Stop()`, leaving the remote side hanging.</violation>
</file>
Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR
| select { | ||
| case err, ok := <-oConn.ErrorChan(): | ||
| if !ok { | ||
| t.Log("Error channel closed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fail the test instead of just logging when ErrorChan closes before any error is received so the expected connection error is actually enforced.
Prompt for AI agents
Address the following comment on connection_test.go at line 130:
<comment>Fail the test instead of just logging when ErrorChan closes before any error is received so the expected connection error is actually enforced.</comment>
<file context>
@@ -15,70 +15,333 @@
+ select {
+ case err, ok := <-oConn.ErrorChan():
+ if !ok {
+ t.Log("Error channel closed")
+ goto done
+ }
</file context>
| c.errorChan <- fmt.Errorf("muxer error: %w", handledErr) | ||
| } | ||
| // Close connection on muxer errors | ||
| c.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always call Close() when the muxer reports connection termination; otherwise normal EOFs leave the connection running and leak resources.
Prompt for AI agents
Address the following comment on connection.go at line 352:
<comment>Always call `Close()` when the muxer reports connection termination; otherwise normal EOFs leave the connection running and leak resources.</comment>
<file context>
@@ -285,23 +337,28 @@ func (c *Connection) setupConnection() error {
+ c.errorChan <- fmt.Errorf("muxer error: %w", handledErr)
+ }
+ // Close connection on muxer errors
+ c.Close()
}
- // Close connection on muxer errors
</file context>
| } | ||
| } | ||
| // return true if current state is the initial state | ||
| return currentState == p.config.InitialState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IsDone now reports “done” whenever the protocol is merely back in its initial state, so clients no longer send the required Done messages during Stop(), leaving the remote side hanging.
Prompt for AI agents
Address the following comment on protocol/protocol.go at line 151:
<comment>`IsDone` now reports “done” whenever the protocol is merely back in its initial state, so clients no longer send the required Done messages during `Stop()`, leaving the remote side hanging.</comment>
<file context>
@@ -126,6 +131,36 @@ func New(config ProtocolConfig) *Protocol {
+ }
+ }
+ // return true if current state is the initial state
+ return currentState == p.config.InitialState
+}
+
</file context>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5 issues found across 11 files
Prompt for AI agents (all 5 issues)
Understand the root cause of the following 5 issues and fix them.
<file name="protocol/protocol.go">
<violation number="1" location="protocol/protocol.go:142">
`CurrentState` reads the shared state under a brand-new `stateMutex`, but state transitions still write under `currentStateMu`, so reads and writes are unsynchronized and can race.</violation>
</file>
<file name="connection.go">
<violation number="1" location="connection.go:368">
Graceful remote disconnects no longer trigger Close(), leaving doneChan and connection resources hanging because handleConnectionError suppresses io.EOF.
Move the Close() call outside the handledErr check (or otherwise ensure it is invoked even when handleConnectionError returns nil) so shutdown still runs on remote EOF.</violation>
</file>
<file name="protocol/blockfetch/blockfetch_test.go">
<violation number="1" location="protocol/blockfetch/blockfetch_test.go:51">
`testConn.Read` returns `(0, nil)`, violating the `io.Reader` contract and causing muxer read loops to spin forever instead of seeing EOF on close.</violation>
</file>
<file name="protocol/chainsync/chainsync_test.go">
<violation number="1" location="protocol/chainsync/chainsync_test.go:49">
testConn.Read returns 0 bytes with nil error, causing the muxer read loop to spin forever instead of blocking like a real net.Conn.</violation>
</file>
<file name="connection_test.go">
<violation number="1" location="connection_test.go:86">
The "ErrorsIgnoredWhenProtocolsStopped" sub-test also runs the connection as a server, so it cannot observe the client-side error suppression behavior it purports to validate.</violation>
</file>
Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR
| } | ||
|
|
||
| // IsDone checks if the protocol is in a done/completed state | ||
| func (p *Protocol) IsDone() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CurrentState reads the shared state under a brand-new stateMutex, but state transitions still write under currentStateMu, so reads and writes are unsynchronized and can race.
Prompt for AI agents
Address the following comment on protocol/protocol.go at line 142:
<comment>`CurrentState` reads the shared state under a brand-new `stateMutex`, but state transitions still write under `currentStateMu`, so reads and writes are unsynchronized and can race.</comment>
<file context>
@@ -132,6 +137,36 @@ func New(config ProtocolConfig) *Protocol {
+// CurrentState returns the current protocol state
+func (p *Protocol) CurrentState() State {
+ p.stateMutex.RLock()
+ defer p.stateMutex.RUnlock()
+ return p.currentState
</file context>
| c.errorChan <- fmt.Errorf("muxer error: %w", handledErr) | ||
| } | ||
| // Close connection on muxer errors | ||
| c.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Graceful remote disconnects no longer trigger Close(), leaving doneChan and connection resources hanging because handleConnectionError suppresses io.EOF.
Move the Close() call outside the handledErr check (or otherwise ensure it is invoked even when handleConnectionError returns nil) so shutdown still runs on remote EOF.
Prompt for AI agents
Address the following comment on connection.go at line 368:
<comment>Graceful remote disconnects no longer trigger Close(), leaving doneChan and connection resources hanging because handleConnectionError suppresses io.EOF.
Move the Close() call outside the handledErr check (or otherwise ensure it is invoked even when handleConnectionError returns nil) so shutdown still runs on remote EOF.</comment>
<file context>
@@ -301,23 +353,28 @@ func (c *Connection) setupConnection() error {
+ c.errorChan <- fmt.Errorf("muxer error: %w", handledErr)
+ }
+ // Close connection on muxer errors
+ c.Close()
}
- // Close connection on muxer errors
</file context>
| } | ||
| } | ||
|
|
||
| func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testConn.Read returns (0, nil), violating the io.Reader contract and causing muxer read loops to spin forever instead of seeing EOF on close.
Prompt for AI agents
Address the following comment on protocol/blockfetch/blockfetch_test.go at line 51:
<comment>`testConn.Read` returns `(0, nil)`, violating the `io.Reader` contract and causing muxer read loops to spin forever instead of seeing EOF on close.</comment>
<file context>
@@ -0,0 +1,166 @@
+ }
+}
+
+func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil }
+func (c *testConn) Write(b []byte) (n int, err error) {
+ select {
</file context>
| } | ||
| } | ||
|
|
||
| func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testConn.Read returns 0 bytes with nil error, causing the muxer read loop to spin forever instead of blocking like a real net.Conn.
Prompt for AI agents
Address the following comment on protocol/chainsync/chainsync_test.go at line 49:
<comment>testConn.Read returns 0 bytes with nil error, causing the muxer read loop to spin forever instead of blocking like a real net.Conn.</comment>
<file context>
@@ -0,0 +1,167 @@
+ }
+}
+
+func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil }
+func (c *testConn) Write(b []byte) (n int, err error) {
+ select {
</file context>
| oConn, err := ouroboros.New( | ||
| ouroboros.WithConnection(mockConn), | ||
| ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic), | ||
| ouroboros.WithServer(true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "ErrorsIgnoredWhenProtocolsStopped" sub-test also runs the connection as a server, so it cannot observe the client-side error suppression behavior it purports to validate.
Prompt for AI agents
Address the following comment on connection_test.go at line 86:
<comment>The "ErrorsIgnoredWhenProtocolsStopped" sub-test also runs the connection as a server, so it cannot observe the client-side error suppression behavior it purports to validate.</comment>
<file context>
@@ -15,70 +15,333 @@
+ oConn, err := ouroboros.New(
+ ouroboros.WithConnection(mockConn),
+ ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic),
+ ouroboros.WithServer(true),
+ ouroboros.WithChainSyncConfig(
+ chainsync.NewConfig(
</file context>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 issues found across 11 files
Prompt for AI agents (all 4 issues)
Understand the root cause of the following 4 issues and fix them.
<file name="protocol/blockfetch/blockfetch_test.go">
<violation number="1" location="protocol/blockfetch/blockfetch_test.go:51">
`testConn.Read` returns 0 bytes with a nil error, violating the `io.Reader` contract and causing the mux read loop to busy-spin.</violation>
</file>
<file name="connection.go">
<violation number="1" location="connection.go:368">
A normal remote EOF (handledErr == nil) no longer triggers Close(), so the connection never shuts down and goroutines leak. Close the connection even when the EOF is being suppressed.</violation>
</file>
<file name="protocol/txsubmission/txsubmission_test.go">
<violation number="1" location="protocol/txsubmission/txsubmission_test.go:51">
`testConn.Read` returns 0 bytes with a nil error, violating the `io.Reader` contract and causing callers to spin endlessly.</violation>
</file>
<file name="protocol/chainsync/chainsync_test.go">
<violation number="1" location="protocol/chainsync/chainsync_test.go:49">
testConn.Read must not return (0, nil) because it makes muxer.Start spin and violates the io.Reader contract.</violation>
</file>
Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR
| } | ||
| } | ||
|
|
||
| func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testConn.Read returns 0 bytes with a nil error, violating the io.Reader contract and causing the mux read loop to busy-spin.
Prompt for AI agents
Address the following comment on protocol/blockfetch/blockfetch_test.go at line 51:
<comment>`testConn.Read` returns 0 bytes with a nil error, violating the `io.Reader` contract and causing the mux read loop to busy-spin.</comment>
<file context>
@@ -0,0 +1,166 @@
+ }
+}
+
+func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil }
+func (c *testConn) Write(b []byte) (n int, err error) {
+ select {
</file context>
| c.errorChan <- fmt.Errorf("muxer error: %w", handledErr) | ||
| } | ||
| // Close connection on muxer errors | ||
| c.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A normal remote EOF (handledErr == nil) no longer triggers Close(), so the connection never shuts down and goroutines leak. Close the connection even when the EOF is being suppressed.
Prompt for AI agents
Address the following comment on connection.go at line 368:
<comment>A normal remote EOF (handledErr == nil) no longer triggers Close(), so the connection never shuts down and goroutines leak. Close the connection even when the EOF is being suppressed.</comment>
<file context>
@@ -301,23 +353,28 @@ func (c *Connection) setupConnection() error {
+ c.errorChan <- fmt.Errorf("muxer error: %w", handledErr)
+ }
+ // Close connection on muxer errors
+ c.Close()
}
- // Close connection on muxer errors
</file context>
| } | ||
| } | ||
|
|
||
| func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testConn.Read returns 0 bytes with a nil error, violating the io.Reader contract and causing callers to spin endlessly.
Prompt for AI agents
Address the following comment on protocol/txsubmission/txsubmission_test.go at line 51:
<comment>`testConn.Read` returns 0 bytes with a nil error, violating the `io.Reader` contract and causing callers to spin endlessly.</comment>
<file context>
@@ -0,0 +1,181 @@
+ }
+}
+
+func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil }
+func (c *testConn) Close() error {
+ c.closeOnce.Do(func() {
</file context>
| func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil } | |
| func (c *testConn) Read(b []byte) (n int, err error) { return 0, io.EOF } |
| } | ||
| } | ||
|
|
||
| func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testConn.Read must not return (0, nil) because it makes muxer.Start spin and violates the io.Reader contract.
Prompt for AI agents
Address the following comment on protocol/chainsync/chainsync_test.go at line 49:
<comment>testConn.Read must not return (0, nil) because it makes muxer.Start spin and violates the io.Reader contract.</comment>
<file context>
@@ -0,0 +1,167 @@
+ }
+}
+
+func (c *testConn) Read(b []byte) (n int, err error) { return 0, nil }
+func (c *testConn) Write(b []byte) (n int, err error) {
+ select {
</file context>
Signed-off-by: Jenita <jkawan@blinklabs.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
connection_test.go (1)
227-249: Test logic may not align with PR objectives.Per the PR description and linked issue #1112, closing the connection after protocols are explicitly stopped should NOT return any error. However, this test still accepts
EOFanduse of closed network connectionerrors (lines 237-240) rather than failing on any error.If the goal is to verify that no errors are propagated when protocols are stopped, consider failing on any received error:
- if err != nil { - if !strings.Contains(err.Error(), "EOF") && - !strings.Contains(err.Error(), "use of closed network connection") { - t.Errorf("Unexpected error during shutdown: %s", err) - } - } + if err != nil { + t.Fatalf("unexpected error after protocols stopped: %v", err) + }If certain connection-level errors are still expected and acceptable after protocol shutdown, please document this distinction explicitly.
🧹 Nitpick comments (3)
protocol/protocol.go (2)
112-116: Unused fieldstateRespChanis always nil.The new
stateRespChanfield is never populated—transitionState()at line 697 always passesnil. If this is intentional scaffolding for future use, consider adding a brief comment. Otherwise, remove it until needed.
138-156: Approve with note on duplicate accessor.The
CurrentState()implementation correctly usescurrentStateMufor thread-safe access, addressing the race condition concern from previous reviews.Note: There's also a private
getCurrentState()at lines 247-251 with identical logic. Consider consolidating to reduce duplication.connection_test.go (1)
128-146: Consider failing when error channel closes unexpectedly.When
!okat line 129, the test logs and exits. Per previous review feedback, if the channel closes before receiving the expected error, this could silently pass when it should fail.case err, ok := <-oConn.ErrorChan(): if !ok { - t.Log("Error channel closed") + t.Error("Error channel closed before receiving expected error") goto done }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
connection_test.go(2 hunks)protocol/protocol.go(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
protocol/protocol.go (1)
protocol/state.go (3)
State(32-35)StateMap(70-70)AgencyNone(26-26)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: cubic · AI code reviewer
🔇 Additional comments (8)
protocol/protocol.go (5)
18-31: LGTM!The new
contextimport is required to support theContextfield inProtocolOptions.
40-61: LGTM!Adding
currentStateas a struct field addresses the previous review feedback and enables thread-safe access via the existingcurrentStateMumutex.
99-110: LGTM!Adding the
Contextfield toProtocolOptionsenables connection-level context propagation to protocols.
695-700: LGTM!The
transitionStatefunction correctly initializes the transition struct with the error channel and leavesstateRespChanasnil(consistent with current usage).
158-166: FunctionGetDoneState()is unused/dead code; the identified concern about non-determinism is technically valid but practically moot.Verification confirms:
GetDoneState()has zero call sites in the entire codebase (including tests)- Each protocol state map contains exactly one state with
AgencyNone(the done state), confirming the design assumption- The non-deterministic map iteration concern is technically valid but irrelevant since the function is never called
The code is correct as-is. Consider removing
GetDoneState()as dead code if it's not part of a planned public API expansion.connection_test.go (3)
36-81: LGTM!The test correctly sets up the role pairing (mock as client,
oConnas server) and uses explicit handshake messages. The callback now blocks onctx.Done()instead oftime.Sleep(), addressing previous feedback.
315-353: LGTM!The basic error handling tests for dial failure and double close are straightforward and properly clean up resources.
258-313: Incorrect review comment – protocols auto-start after handshake in this test.The test correctly expects errors. NtN protocols auto-start after handshake (controlled by
delayProtocolStart, which defaults to false). WithWithNodeToNode(true)and no server configuration, the client-side protocols (blockFetch, chainSync, txSubmission, leiosNotify, leiosFetch, etc.) automatically start at line 476 during the handshake callback. These started protocols are "active," so expecting errors when the connection closes is correct—no additional protocol messages are needed for them to be active.Likely an incorrect or invalid review comment.
Signed-off-by: jkawan <kawanjenita@outlook.com>
Closes #1112
Summary by CodeRabbit
New Features
Improvements
Tests
✏️ Tip: You can customize this high-level summary in your review settings.