Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 40 additions & 21 deletions p2p/stream/protocols/sync/sync_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,36 @@ func (st *syncStream) readMsgLoop() {
default:
msg, err := st.readMsg()
if err != nil {
if err := st.Close("read msg failed", false); err != nil {
st.logger.Err(err).Msg("failed to close sync stream")
// Classify error to determine appropriate handling
errorType, errorDesc := sttypes.ClassifyStreamError(err)
criticalErr := !sttypes.IsRecoverableError(errorType)

// Log error with classification
if criticalErr {
st.logger.Warn().
Str("streamID", string(st.ID())).
Str("errorType", errorType.String()).
Str("description", errorDesc).
Bool("critical", true).
Msg("critical error, closing stream")
} else {
st.logger.Info().
Str("streamID", string(st.ID())).
Str("errorType", errorType.String()).
Str("description", errorDesc).
Bool("recoverable", true).
Msg("recoverable error, continuing stream")
}
return

// Only close stream for non-recoverable errors
if criticalErr {
if err := st.Close("read msg failed", criticalErr); err != nil {
st.logger.Err(err).Msg("failed to close sync stream")
}
return
}
// For recoverable errors, continue the loop
continue
}
if msg == nil {
if err := st.Close("remote closed stream", false); err != nil {
Expand Down Expand Up @@ -134,12 +160,17 @@ func (st *syncStream) handleReqLoop() {
err := st.handleReq(req)

if err != nil {
st.logger.Info().Err(err).Str("request", req.String()).
Msg("handle request error. Closing stream")
if err := st.Close("handle request error", false); err != nil {
st.logger.Err(err).Msg("failed to close sync stream")
st.logger.Error().Err(err).Str("request", req.String()).
Msg("handle request by sync stream failed")
// Use the centralized error handling to determine if stream should be closed
if sttypes.ShouldCloseStream(err) {
st.logger.Error().Err(err).Str("request", req.String()).
Msg("sync stream critical error. Closing stream")
if err := st.Close("stream error", false); err != nil {
st.logger.Err(err).Msg("failed to close sync stream")
}
return
}
return
}

case <-st.closeC:
Expand Down Expand Up @@ -443,26 +474,14 @@ func (st *syncStream) readMsg() (*syncpb.Message, error) {
// Use progress-based reading with the tracker from BaseStream
b, err := st.ReadBytesWithProgress(st.GetProgressTracker())
if err != nil {
// Log progress timeout specifically
if err.Error() == "progress timeout" {
st.logger.Warn().
Str("streamID", string(st.ID())).
Msg("stream timeout due to lack of progress")
}
// Log stream closure specifically
if err.Error() == "stream closed" {
st.logger.Debug().
Str("streamID", string(st.ID())).
Msg("stream closed by remote peer")
}
return nil, err
}
if b == nil || len(b) == 0 {
// This should not happen
st.logger.Warn().
Str("streamID", string(st.ID())).
Msg("received empty message data")
return nil, errors.New("empty message data")
return nil, errors.Wrap(errors.New("empty message data"), "unexpected empty message")
}
var msg = &syncpb.Message{}
if err := protobuf.Unmarshal(b, msg); err != nil {
Expand Down
Loading