Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle io.EOF on successful stream termination #38

Closed
wants to merge 1 commit into from

Conversation

slimsag
Copy link

@slimsag slimsag commented Jul 10, 2024

Hi there! Cool project, thanks for working on it. It's nice to finally have an active SSE package in Go :) I am trying to make use of it at my work Sourcegraph to parse SSE streams from OpenAI-compatible API endpoints. In doing this, I ran into a limitation/bug with go-sse which I try to detail below.


Parser.Error is documented as always returning io.EOF at the end of normal input:

// Err returns the last read error. At the end of input
// it will always be equal to io.EOF.
func (r *Parser) Err() error {

And according to parser_test.go, the parser can return either io.EOF or parser.ErrUnexpectedEOF

Inherently, this means that Connection.read, which uses the parser internally, can also produce those two errors.

However, it only has a case at the end for io.EOF under a dirty case:

	err := p.Err()
	if dirty && err == io.EOF { //nolint:errorlint // Our scanner returns io.EOF unwrapped
		c.dispatch(ev)
	}

This means that Connection.read may return io.EOF at the end of a normal input stream. This is problematic because it means that Connection.doConnect will observe that io.EOF at the end of normal input here, and report it as connection to server lost:

	err = c.read(res.Body, setRetry)
	if errors.Is(err, ctx.Err()) {
		return false, err
	}

	return true, &ConnectionError{Req: c.request, Reason: "connection to server lost", Err: err}

Which will then trigger connection-retry logic, even though the stream just finished successfully.

And indeed, when I try using the client against some popular software which implements SSE, I encounter exactly that case via the OnRetry callback:

...all the valid events I would expect back in the stream...
"error": "request failed: connection to server lost: EOF"

...all the valid events I would expect back in the stream...
"error": "request failed: connection to server lost: EOF"

...all the valid events I would expect back in the stream...
"error": "request failed: connection to server lost: EOF"

...all the valid events I would expect back in the stream...
"error": "request failed: connection to server lost: EOF"

Now, I did see in the README that Connect always returning an error is an intentional design:

In other words, a successfully closed Connection will always return an error

But I think the tests do not experience this chronic-retry behavior because they set MaxRetries: -1, to disable retry logic.

TestConnection_reconnect does seem to observe this chronic-retry behavior, but due to cancellation in the OnRetry callback it doesn't fail the test:

		OnRetry: func(_ error, _ time.Duration) {
			retries++
			if retries == 3 {
				cancel() // this prevents infinite retries from happening
			}
		},

This PR fixes the issue in practice, but I wasn't able to fix TestConnection_reconnect correctly with the time I had to spend on this. Sorry about that. :) I hope this information is at least useful as a 'bug report' even if the PR is rejected :)

`Parser.Error` is documented as always returning `io.EOF` at the end of normal input:

```
// Err returns the last read error. At the end of input
// it will always be equal to io.EOF.
func (r *Parser) Err() error {
```

And according to `parser_test.go`, the parser can return either `io.EOF` or `parser.ErrUnexpectedEOF`

Inherently, this means that `Connection.read`, which uses the parser internally, can also produce those two errors.

However, it only has a case at the end for `io.EOF` under a `dirty` case:

```
	err := p.Err()
	if dirty && err == io.EOF { //nolint:errorlint // Our scanner returns io.EOF unwrapped
		c.dispatch(ev)
	}
```

This means that `Connection.read` may return `io.EOF` at the end of a normal input stream. This is problematic because it means that `Connection.doConnect` will observe that `io.EOF` at the end of normal input here, and report it as `connection to server lost`:

```
	err = c.read(res.Body, setRetry)
	if errors.Is(err, ctx.Err()) {
		return false, err
	}

	return true, &ConnectionError{Req: c.request, Reason: "connection to server lost", Err: err}
```

Which will then trigger connection-retry logic, even though the stream just finished successfully.

And indeed, when I try using the client against [some popular software](https://huggingface.co/docs/text-generation-inference/en/messages_api#streaming) which implements SSE, I encounter exactly that case via the `OnRetry` callback:

```
...all the valid events I would expect back in the stream...
"error": "request failed: connection to server lost: EOF"

...all the valid events I would expect back in the stream...
"error": "request failed: connection to server lost: EOF"

...all the valid events I would expect back in the stream...
"error": "request failed: connection to server lost: EOF"

...all the valid events I would expect back in the stream...
"error": "request failed: connection to server lost: EOF"
```

Now, I did see in the README that Connect always returning an error is an intentional design:

> In other words, a successfully closed `Connection` will always return an error

But I think the tests do not experience this chronic-retry behavior because they set `MaxRetries: -1,` to disable retry logic.

`TestConnection_reconnect` _does_ seem to observe this chronic-retry behavior, but due to cancellation in the `OnRetry` callback it doesn't fail the test:

```
		OnRetry: func(_ error, _ time.Duration) {
			retries++
			if retries == 3 {
				cancel() // this prevents infinite retries from happening
			}
		},
```

This PR fixes the issue in practice, but I wasn't able to fix `TestConnection_reconnect` correctly with the time I had to spend on this. Sorry about that. :) I hope this information is at least useful as a 'bug report' even if the PR is rejected :)

Signed-off-by: Stephen Gutekanst <stephen@sourcegraph.com>
@tmaxmax
Copy link
Owner

tmaxmax commented Jul 13, 2024

Hi there and thank you for putting in all this work for the PR!

Please confirm if my following observation is valid. There seems to be two ways in which an SSE stream may be used:

  1. to keep a persistent channel for a long or undetermined timespan for pushing messages to clients
  2. to send a one-off message which, due to the way it is generated, may take longer to do so and can be sent in fragments (for example, OpenAI responses)

When I've first designed the library – in the pre-ChatGPT era – I've only had the first use-case in mind. For the first use-case it's not really expected that the connection will be closed by the server; this is also validated by browser implementations of SSE clients, as on connection end they always attempt to reconnect (i.e. they also "chronically" retry, as you've described).

Just successfully returning on io.EOF satisfies the second case but may not work for the first one. It seems as if the client should be able to be customized for each of these use-cases – in other words, the handling of io.EOF should be personalizable.

I'm planning on rewriting the client from scratch – see #25 for reasoning and an API outline (some feedback on it would be immensely appreciated – if you find time, I'd be very grateful if you'd leave a comment with some thoughts under that issue). I will adapt that design to take differentiated io.EOF handling into consideration (note to self: see if #37 can also handled, should use browser's readyState handling as inspiration).

To work around this limitation, for now I think you could do something like this:

c := sse.Client{
    Backoff: sse.Backoff{MaxRetries: -1},
}
req, _ := http.NewRequestWithContext(ctx, ...)
conn := c.NewConnection(req)

var err error
for {
    err = conn.Connect()
    if err == ctx.Err() {
        break
    }
    if errors.Is(err, io.EOF) {
        err = nil
        break
    }
    // some waiting here before retrying or other backoff logic
}

if err != nil {
    // handle error
}

The disadvantage is that sadly you lose the builtin backoff functionality.

I'd be confortable also with merging some sort of TreatEOFAsSuccess bool field on sse.Client, which would change the behavior of the client to be the one implemented in this PR only when this field is true. If you wish to add this, make sure that you add a descriptive comment to the field and update CHANGELOG.md. A note in README.md under the "Using the client > Connection lost?" section would be nice too (probably the right approach would be to have a separate section tailored to OpenAI-style usage but at the moment I don't have the bandwith to think it through, so this should suffice; I'm open to suggestions, though).

The reason why I haven't proposed this in the context of #30 is that the situation there didn't seem compelling – in that case it rather looked to me as if the server had issues, from the provided information.

Do not worry about the test – another reason I want to rewrite this is because some tests are kind of flaky and I want to redo them.

Let me know what you think and how you wish to proceed further.

@slimsag
Copy link
Author

slimsag commented Jul 13, 2024

to send a one-off message which, due to the way it is generated, may take longer to do so and can be sent in fragments (for example, OpenAI responses)

Correct

When I've first designed the library – in the pre-ChatGPT era – I've only had the first use-case in mind. For the first use-case it's not really expected that the connection will be closed by the server; this is also validated by browser implementations of SSE clients, as on connection end they always attempt to reconnect (i.e. they also "chronically" retry, as you've described).

This makes sense, yeah, I hadn't considered that use case :)

I'm planning on rewriting the client from scratch – see #25 for reasoning and an API outline (some feedback on it would be immensely appreciated – if you find time, I'd be very grateful if you'd leave a comment with some thoughts under that issue).

I had briefly skimmed that issue and thought what you were describing in there seemed like a big improvement from my POV :)

for now I think you could do something like this
The disadvantage is that sadly you lose the builtin backoff functionality.

Yeah, this is what I ended up doing after I sent this PR. And I think that works fine for my use-case; in fact backoff/retry logic is not super important in my use-case (though is kind of an interesting thing to have)

I am happy with go-sse and the workaround above so far, and am moving forward with using it as-is, thanks for making this package! Feel free to treat this entire PR as just feedback / a bug report :) I am also happy to have the PR closed.

@tmaxmax
Copy link
Owner

tmaxmax commented Jul 14, 2024

Thank you too for your input! It is immensely appreciated.

@tmaxmax tmaxmax closed this Jul 14, 2024
@tmaxmax tmaxmax mentioned this pull request Aug 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants