Skip to content

Commit

Permalink
feat: improve support for extensions and initial payloads (wundergrap…
Browse files Browse the repository at this point in the history
…h#324)

Co-authored-by: Jens Neuse <jens.neuse@gmx.de>
  • Loading branch information
fiam and jensneuse authored Dec 1, 2023
1 parent 6e77713 commit 77a033f
Show file tree
Hide file tree
Showing 18 changed files with 407 additions and 33 deletions.
99 changes: 99 additions & 0 deletions demo/pkg/subgraphs/test1/subgraph/generated/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions demo/pkg/subgraphs/test1/subgraph/schema.graphqls
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ type Subscription {
headerValue(name: String!, repeat: Int): TimestampedString!
"Returns a stream with the value of value of the given key in the WS initial payload."
initPayloadValue(key: String!, repeat: Int): TimestampedString!
"Returns a stream with the value of the WS initial payload."
initialPayload(repeat: Int): Map
}
31 changes: 31 additions & 0 deletions demo/pkg/subgraphs/test1/subgraph/schema.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion router-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ require (
github.com/tidwall/pretty v1.2.1 // indirect
github.com/urfave/cli/v2 v2.25.5 // indirect
github.com/vektah/gqlparser/v2 v2.5.10 // indirect
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.2.0.20231129152302-eefd2348b48a // indirect
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.2.0.20231130155906-dcd50bd528ab // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions router-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ github.com/urfave/cli/v2 v2.25.5 h1:d0NIAyhh5shGscroL7ek/Ya9QYQE0KNabJgiUinIQkc=
github.com/urfave/cli/v2 v2.25.5/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc=
github.com/vektah/gqlparser/v2 v2.5.10 h1:6zSM4azXC9u4Nxy5YmdmGu4uKamfwsdKTwp5zsEealU=
github.com/vektah/gqlparser/v2 v2.5.10/go.mod h1:1rCcfwB2ekJofmluGWXMSEnPMZgbxzwj6FaZ/4OT8Cc=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.2.0.20231129152302-eefd2348b48a h1:TyGWLh7tPdr8OuxV7iE9zwGr7LR/jY0l/4UMiBlkJ5c=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.2.0.20231129152302-eefd2348b48a/go.mod h1:yEpUcSfSL8VDe6DRSGGFu+0l9aJ+Xdv6ckm4Zsx9txo=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.2.0.20231130155906-dcd50bd528ab h1:jw9eOZqB+PGn72i/ZbIwTAVJKezoL7pgaAgXV2sMEU4=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.2.0.20231130155906-dcd50bd528ab/go.mod h1:yEpUcSfSL8VDe6DRSGGFu+0l9aJ+Xdv6ckm4Zsx9txo=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
Expand Down
6 changes: 3 additions & 3 deletions router-tests/headers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestForwardHeaders(t *testing.T) {
header := http.Header{
c.headerName: []string{headerValue},
}
conn := connectedWebsocket(t, serverPort, header)
conn := connectedWebsocket(t, serverPort, &connectedWebsocketOptions{Header: header})
err := conn.WriteJSON(&wsMessage{
ID: "1",
Type: "subscribe",
Expand Down Expand Up @@ -135,8 +135,8 @@ func TestForwardHeaders(t *testing.T) {
header2 := http.Header{
c.headerName: []string{headerValue2},
}
conn1 := connectedWebsocket(t, serverPort, header1)
conn2 := connectedWebsocket(t, serverPort, header2)
conn1 := connectedWebsocket(t, serverPort, &connectedWebsocketOptions{Header: header1})
conn2 := connectedWebsocket(t, serverPort, &connectedWebsocketOptions{Header: header2})
var err error
err = conn1.WriteJSON(&wsMessage{
ID: "1",
Expand Down
132 changes: 129 additions & 3 deletions router-tests/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,31 @@ func connReadJSON(conn *websocket.Conn, v interface{}) error {
return conn.ReadJSON(v)
}

func connectedWebsocket(tb testing.TB, serverPort int, header http.Header) *websocket.Conn {
type connectedWebsocketOptions struct {
Header http.Header
InitialPayload map[string]interface{}
}

func connectedWebsocket(tb testing.TB, serverPort int, opts *connectedWebsocketOptions) *websocket.Conn {
dialer := websocket.Dialer{
Subprotocols: []string{"graphql-transport-ws"},
}
var header http.Header
var payload []byte
if opts != nil {
header = opts.Header

if len(opts.InitialPayload) > 0 {
var err error
payload, err = json.Marshal(opts.InitialPayload)
require.NoError(tb, err)
}
}
conn, _, err := dialer.Dial(fmt.Sprintf("ws://localhost:%d/graphql", serverPort), header)
require.NoError(tb, err)
err = conn.WriteJSON(&wsMessage{
Type: "connection_init",
Type: "connection_init",
Payload: payload,
})
require.NoError(tb, err)
var msg wsMessage
Expand Down Expand Up @@ -239,5 +256,114 @@ func TestSubscriptionsOverWebsocketLibrary(t *testing.T) {
}
}

func TestHeaderForwardingOverWebsocket(t *testing.T) {
func TestExtensionsForwardingOverWebsocket(t *testing.T) {
// Make sure sending two simultaneous subscriptions with different extensions
// triggers two subscriptions to the upstream
_, port := setupListeningServer(t)
conn1 := connectedWebsocket(t, port, nil)
conn2 := connectedWebsocket(t, port, nil)
var err error
err = conn1.WriteJSON(&wsMessage{
ID: "1",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { initialPayload(repeat:3) }","extensions":{"token":"123"}}`),
})
require.NoError(t, err)

err = conn2.WriteJSON(&wsMessage{
ID: "2",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { initialPayload(repeat:3) }","extensions":{"token":"456"}}`),
})
require.NoError(t, err)

var msg wsMessage
var payload struct {
Data struct {
InitialPayload struct {
Extensions struct {
Token string `json:"token"`
} `json:"extensions"`
} `json:"initialPayload"`
} `json:"data"`
}
err = connReadJSON(conn1, &msg)
require.NoError(t, err)
err = json.Unmarshal(msg.Payload, &payload)
require.NoError(t, err)
assert.Equal(t, "123", payload.Data.InitialPayload.Extensions.Token)

err = connReadJSON(conn2, &msg)
require.NoError(t, err)
err = json.Unmarshal(msg.Payload, &payload)
require.NoError(t, err)
assert.Equal(t, "456", payload.Data.InitialPayload.Extensions.Token)
}

func TestExtensionsForwardingOverWebsocketWithInitialPayload(t *testing.T) {
_, port := setupListeningServer(t)
t.Run("single connection with initial payload", func(t *testing.T) {
conn := connectedWebsocket(t, port, &connectedWebsocketOptions{
InitialPayload: map[string]any{"123": 456, "extensions": map[string]any{"hello": "world"}},
})
var err error
err = conn.WriteJSON(&wsMessage{
ID: "1",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { initialPayload(repeat:3) }"}`),
})
require.NoError(t, err)
var msg wsMessage
err = connReadJSON(conn, &msg)
require.NoError(t, err)
assert.JSONEq(t, `{"data":{"initialPayload":{"123":456,"extensions":{"hello":"world"}}}}`, string(msg.Payload))
})
t.Run("single connection with initial payload and extensions in the request", func(t *testing.T) {
// "extensions" in the request should override the "extensions" in initial payload
conn := connectedWebsocket(t, port, &connectedWebsocketOptions{
InitialPayload: map[string]any{"123": 456, "extensions": map[string]any{"hello": "world"}},
})
var err error
err = conn.WriteJSON(&wsMessage{
ID: "1",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { initialPayload(repeat:3) }", "extensions": {"hello": "world2"}}`),
})
require.NoError(t, err)
var msg wsMessage
err = connReadJSON(conn, &msg)
require.NoError(t, err)
assert.JSONEq(t, `{"data":{"initialPayload":{"123":456,"extensions":{"hello":"world2"}}}}`, string(msg.Payload))
})

t.Run("multiple connections with different initial payloads", func(t *testing.T) {
// "extensions" in the request should override the "extensions" in initial payload
conn1 := connectedWebsocket(t, port, &connectedWebsocketOptions{
InitialPayload: map[string]any{"id": 1},
})
conn2 := connectedWebsocket(t, port, &connectedWebsocketOptions{
InitialPayload: map[string]any{"id": 2},
})
var err error
err = conn1.WriteJSON(&wsMessage{
ID: "1",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { initialPayload(repeat:3) }"}`),
})
require.NoError(t, err)
err = conn2.WriteJSON(&wsMessage{
ID: "2",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { initialPayload(repeat:3) }"}`),
})
require.NoError(t, err)
var msg wsMessage
err = connReadJSON(conn1, &msg)
require.NoError(t, err)
assert.JSONEq(t, `{"data":{"initialPayload":{"id":1}}}`, string(msg.Payload))

err = connReadJSON(conn2, &msg)
require.NoError(t, err)
assert.JSONEq(t, `{"data":{"initialPayload":{"id":2}}}`, string(msg.Payload))
})
}
9 changes: 5 additions & 4 deletions router/core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,11 @@ type operationContext struct {
variables []byte
clientInfo *ClientInfo
// preparedPlan is the prepared plan of the operation
preparedPlan *planWithMetaData
traceOptions resolve.RequestTraceOptions
planCacheHit bool
extensions []byte
preparedPlan *planWithMetaData
traceOptions resolve.RequestTraceOptions
planCacheHit bool
initialPayload []byte
extensions []byte
}

func (o *operationContext) Variables() []byte {
Expand Down
Loading

0 comments on commit 77a033f

Please sign in to comment.