generated from cloudwego/.github
-
Notifications
You must be signed in to change notification settings - Fork 747
Open
Description
Describe the bug
StreamReaderWithConvert is documented to ignore a stream item when the WithErrWrapper returns ErrNoValue, but the current implementation returns ErrNoValue to the caller instead of skipping and continuing.
Reproduction
package main
import (
"errors"
"fmt"
"github.com/cloudwego/eino/schema"
)
func main() {
sr, sw := schema.Pipe[int](2)
sw.Send(1, errors.New("boom"))
sw.Send(2, nil)
sw.Close()
cr := schema.StreamReaderWithConvert(sr, func(i int) (int, error) {
return i, nil
}, schema.WithErrWrapper(func(err error) error {
// Skip bad items
return schema.ErrNoValue
}))
v, err := cr.Recv()
fmt.Printf("v=%v err=%v\n", v, err)
// Expected: v=2 err=<nil> (the error item is skipped)
// Actual: v=0 err=no value
}Expected behavior
When the wrapper returns ErrNoValue, the reader should skip that item and continue (same behavior as when convert returns ErrNoValue).
Where it happens
schema/stream.go -> streamReaderWithConvert.recv()
Current logic:
if srw.errWrapper != nil {
err = srw.errWrapper(err)
if err != nil && !errors.Is(err, ErrNoValue) {
return t, err
}
}
return t, errIf errWrapper returns ErrNoValue, it still returns to caller instead of continuing the loop.
Suggested fix
Treat ErrNoValue like the convert path and continue the loop (skip item), e.g.:
if srw.errWrapper != nil {
err = srw.errWrapper(err)
if err == nil || errors.Is(err, ErrNoValue) {
continue
}
return t, err
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels