Skip to content

feat: Adding environmentID support to SSEHandler #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions httphelpers/handlers_sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,23 @@ func SSEHandler(initialEvent *SSEEvent) (http.Handler, SSEStreamControl) {
return handler, &sseStreamControlImpl{streamControl}
}

// SSEHandlerWithEnvironmentID creates an HTTP handler that streams Server-Sent Events data.
//
// The behavior is exactly the same as SSEHandler except environmentID will be returned in
// the response header X-Ld-Envid.
func SSEHandlerWithEnvironmentID(initialEvent *SSEEvent, environmentID string) (http.Handler, SSEStreamControl) {
var initialData []byte
if initialEvent != nil {
initialData = initialEvent.Bytes()
}
handler, streamControl := ChunkedStreamingHandler(
initialData,
"text/event-stream; charset=utf-8",
ChunkedStreamingHandlerOptionEnvironmentID(environmentID),
)
return handler, &sseStreamControlImpl{streamControl}
}

func (s *sseStreamControlImpl) Enqueue(event SSEEvent) {
s.streamControl.Enqueue(event.Bytes())
}
Expand Down
26 changes: 26 additions & 0 deletions httphelpers/handlers_sse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,29 @@ data: data3
`, string(data))
})
}

func TestSSEHandlerWithEnvironmentID(t *testing.T) {
initialEvent := SSEEvent{"id1", "event1", "data1", 0}
handler, stream := SSEHandlerWithEnvironmentID(&initialEvent, "env-id")
defer stream.Close()

WithServer(handler, func(server *httptest.Server) {
resp1, err := http.DefaultClient.Get(server.URL)
require.NoError(t, err)
defer resp1.Body.Close()

assert.Equal(t, 200, resp1.StatusCode)
assert.Equal(t, "text/event-stream; charset=utf-8", resp1.Header.Get("Content-Type"))
assert.Equal(t, "env-id", resp1.Header.Get("X-Ld-Envid"))

stream.EndAll()

data, err := io.ReadAll(resp1.Body)
assert.NoError(t, err)
assert.Equal(t, `id: id1
event: event1
data: data1

`, string(data))
})
}
44 changes: 37 additions & 7 deletions httphelpers/handlers_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,25 @@ type StreamControl interface {
Close() error
}

// ChunkedStreamingHandlerOption is a common interface for optional configuration parameters that
// can be used in creating a ChunkedStreamingHandler.
type ChunkedStreamingHandlerOption interface {
apply(h *chunkedStreamingHandlerImpl)
}

type environmentIDChunkedStreamingHandlerOption string

func (o environmentIDChunkedStreamingHandlerOption) apply(h *chunkedStreamingHandlerImpl) {
h.environmentID = string(o)
}

// ChunkedStreamingHandlerOptionEnvironmentID returns an option that sets the environment ID
// for a ChunkedStreamingHandler when the handler is created. The environment ID will be
// returned in the response header X-Ld-Envid.
func ChunkedStreamingHandlerOptionEnvironmentID(environmentID string) ChunkedStreamingHandlerOption {
return environmentIDChunkedStreamingHandlerOption(environmentID)
}

// ChunkedStreamingHandler creates an HTTP handler that streams arbitrary data using chunked encoding.
//
// The initialData parameter, if not nil, specifies a starting chunk that should always be sent to any
Expand Down Expand Up @@ -52,21 +71,29 @@ type StreamControl interface {
// }
// }
// }()
func ChunkedStreamingHandler(initialChunk []byte, contentType string) (http.Handler, StreamControl) {
func ChunkedStreamingHandler(
initialChunk []byte,
contentType string,
options ...ChunkedStreamingHandlerOption,
) (http.Handler, StreamControl) {
sh := &chunkedStreamingHandlerImpl{
initialChunk: initialChunk,
contentType: contentType,
}
for _, o := range options {
o.apply(sh)
}
return sh, sh
}

type chunkedStreamingHandlerImpl struct {
initialChunk []byte
contentType string
queued [][]byte
channels []chan []byte
closed bool
lock sync.Mutex
initialChunk []byte
contentType string
queued [][]byte
channels []chan []byte
closed bool
lock sync.Mutex
environmentID string
}

func (s *chunkedStreamingHandlerImpl) Enqueue(data []byte) {
Expand Down Expand Up @@ -173,6 +200,9 @@ func (s *chunkedStreamingHandlerImpl) ServeHTTP(w http.ResponseWriter, r *http.R
h := w.Header()
h.Set("Content-Type", s.contentType)
h.Set("Cache-Control", "no-cache, no-store, must-revalidate")
if len(s.environmentID) > 0 {
h.Set("X-Ld-Envid", s.environmentID)
}

if s.initialChunk != nil {
_, _ = w.Write(s.initialChunk)
Expand Down
26 changes: 26 additions & 0 deletions httphelpers/handlers_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,29 @@ func TestChunkedStreamingHandlerClose(t *testing.T) {
assert.Equal(t, 500, resp2.StatusCode)
})
}

func TestChunkedStreamingHandlerWithEnvironmentID(t *testing.T) {
initialData := []byte("hello")
handler, stream := ChunkedStreamingHandler(
initialData,
"text/plain",
ChunkedStreamingHandlerOptionEnvironmentID("env-id"),
)
defer stream.Close()

WithServer(handler, func(server *httptest.Server) {
resp1, err := http.DefaultClient.Get(server.URL)
require.NoError(t, err)
defer resp1.Body.Close()

assert.Equal(t, 200, resp1.StatusCode)
assert.Equal(t, "text/plain", resp1.Header.Get("Content-Type"))
assert.Equal(t, "env-id", resp1.Header.Get("X-Ld-Envid"))

stream.EndAll()

data, err := io.ReadAll(resp1.Body)
require.NoError(t, err)
assert.Equal(t, "hello", string(data))
})
}
Loading