Skip to content

Commit 0bb6aaf

Browse files
improve stream read and write error handling, close stream only for critical issues
1 parent a83054a commit 0bb6aaf

File tree

2 files changed

+265
-74
lines changed

2 files changed

+265
-74
lines changed

p2p/stream/protocols/sync/sync_stream.go

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,36 @@ func (st *syncStream) readMsgLoop() {
7575
default:
7676
msg, err := st.readMsg()
7777
if err != nil {
78-
if err := st.Close("read msg failed", false); err != nil {
79-
st.logger.Err(err).Msg("failed to close sync stream")
78+
// Classify error to determine appropriate handling
79+
errorType, errorDesc := sttypes.ClassifyStreamError(err)
80+
criticalErr := !sttypes.IsRecoverableError(errorType)
81+
82+
// Log error with classification
83+
if criticalErr {
84+
st.logger.Warn().
85+
Str("streamID", string(st.ID())).
86+
Str("errorType", errorType.String()).
87+
Str("description", errorDesc).
88+
Bool("critical", true).
89+
Msg("critical error, closing stream")
90+
} else {
91+
st.logger.Info().
92+
Str("streamID", string(st.ID())).
93+
Str("errorType", errorType.String()).
94+
Str("description", errorDesc).
95+
Bool("recoverable", true).
96+
Msg("recoverable error, continuing stream")
8097
}
81-
return
98+
99+
// Only close stream for non-recoverable errors
100+
if criticalErr {
101+
if err := st.Close("read msg failed", criticalErr); err != nil {
102+
st.logger.Err(err).Msg("failed to close sync stream")
103+
}
104+
return
105+
}
106+
// For recoverable errors, continue the loop
107+
continue
82108
}
83109
if msg == nil {
84110
if err := st.Close("remote closed stream", false); err != nil {
@@ -134,12 +160,17 @@ func (st *syncStream) handleReqLoop() {
134160
err := st.handleReq(req)
135161

136162
if err != nil {
137-
st.logger.Info().Err(err).Str("request", req.String()).
138-
Msg("handle request error. Closing stream")
139-
if err := st.Close("handle request error", false); err != nil {
140-
st.logger.Err(err).Msg("failed to close sync stream")
163+
st.logger.Error().Err(err).Str("request", req.String()).
164+
Msg("handle request by sync stream failed")
165+
// Use the centralized error handling to determine if stream should be closed
166+
if sttypes.ShouldCloseStream(err) {
167+
st.logger.Error().Err(err).Str("request", req.String()).
168+
Msg("sync stream critical error. Closing stream")
169+
if err := st.Close("stream error", false); err != nil {
170+
st.logger.Err(err).Msg("failed to close sync stream")
171+
}
172+
return
141173
}
142-
return
143174
}
144175

145176
case <-st.closeC:
@@ -443,26 +474,14 @@ func (st *syncStream) readMsg() (*syncpb.Message, error) {
443474
// Use progress-based reading with the tracker from BaseStream
444475
b, err := st.ReadBytesWithProgress(st.GetProgressTracker())
445476
if err != nil {
446-
// Log progress timeout specifically
447-
if err.Error() == "progress timeout" {
448-
st.logger.Warn().
449-
Str("streamID", string(st.ID())).
450-
Msg("stream timeout due to lack of progress")
451-
}
452-
// Log stream closure specifically
453-
if err.Error() == "stream closed" {
454-
st.logger.Debug().
455-
Str("streamID", string(st.ID())).
456-
Msg("stream closed by remote peer")
457-
}
458477
return nil, err
459478
}
460479
if b == nil || len(b) == 0 {
461480
// This should not happen
462481
st.logger.Warn().
463482
Str("streamID", string(st.ID())).
464483
Msg("received empty message data")
465-
return nil, errors.New("empty message data")
484+
return nil, errors.Wrap(errors.New("empty message data"), "unexpected empty message")
466485
}
467486
var msg = &syncpb.Message{}
468487
if err := protobuf.Unmarshal(b, msg); err != nil {

0 commit comments

Comments
 (0)