Skip to content

feat: implement websocket subprotocols for subscriptions graphql-ws, graphql-transport-ws #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ StructTypes = "856f2bd8-1eba-4b0a-8007-ebc267875bd4"

[compat]
GraphQLParser = "0.1.1"
HTTP = "0.8.17, 0.9"
HTTP = "1"
JSON3 = "1.1.2"
StructTypes = "1.5"
julia = "1.6"
2 changes: 2 additions & 0 deletions src/GraphQLClient.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export query, mutate, open_subscription, Client, GQLEnum, Alias,
create_introspected_struct, list_all_introspected_objects, global_graphql_client,
@gql_str

include("constants.jl")
# Types
include("client.jl")
include("types.jl")
Expand All @@ -26,6 +27,7 @@ include("type_construction.jl")
include("http_execution.jl")
include("queries.jl")
include("mutations.jl")
include("ws_subscription_protocols.jl")
include("subscriptions.jl")
include("introspection.jl")
include("gql_string.jl")
Expand Down
32 changes: 32 additions & 0 deletions src/constants.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# GQL over WS Protocol constants
# https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md

const GQL_CLIENT_CONNECTION_INIT = "connection_init"
const GQL_SERVER_CONNECTION_ACK = "connection_ack"
const GQL_SERVER_CONNECTION_ERROR = "connection_error"
const GQL_SERVER_CONNECTION_KEEP_ALIVE = "ka"
const GQL_CLIENT_START = "start"
const GQL_CLIENT_STOP = "stop"
const GQL_CLIENT_CONNECTION_TERMINATE = "connection_terminate"
const GQL_SERVER_DATA = "data"
const GQL_SERVER_ERROR = "error"
const GQL_SERVER_COMPLETE = "complete"

# Subscription tracker
const SUBSCRIPTION_STATUS_OPEN = "open"
const SUBSCRIPTION_STATUS_ERROR = "errored"
const SUBSCRIPTION_STATUS_CLOSED = "closed"

# New GQL over WS constanst
# https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md

const GQLWS_CLIENT_INIT = "connection_init"
const GQLWS_SERVER_CONNECTION_ACK = "connection_ack"
const GQLWS_BI_PING = "ping"
const GQLWS_BI_PONG = "pong"
const GQLWS_CLIENT_SUBSCRIBE = "subscribe"
const GQLWS_SERVER_NEXT = "next"
const GQLWS_SERVER_ERROR = "error"
const GQLWS_BI_COMPLETE = "complete"


4 changes: 2 additions & 2 deletions src/gqlresponse.jl
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ end
Struct for subsriptions that wraps a `GQLReponse{T}` alongside various metadata.
"""
struct GQLSubscriptionResponse{T}
id::String
id::Union{String, Nothing}
type::String
payload::GQLResponse{T}
payload::Union{GQLResponse{T}, Nothing}
end
StructTypes.StructType(::Type{<:GQLSubscriptionResponse}) = StructTypes.Struct()

Expand Down
136 changes: 78 additions & 58 deletions src/subscriptions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ const subscription_tracker = Ref{Dict}(Dict())
retry=true,
subtimeout=0,
stopfn=nothing,
throw_on_execution_error=false)
throw_on_execution_error=false,
websocket_protocol=join(", ", GQS_WS_PROTOCOLS))

Subscribe to `subscription_name`, running `fn` on each received result and ending the
subcription when `fn` returns `true`.
Expand All @@ -26,7 +27,7 @@ The subscription uses the `ws_endpoint` field of the `client.`
This function is designed to be used with the `do` keyword.

# Arguments
- `fn::Function`: function to be run on each result, recieves the response from the
- `fn::Function`: function to be run on each result, receives the response from the
subscription`. Must return a boolean to indicate whether or not to close the subscription,
with `true` closing the subscription.
- `client::Client`: GraphQL client (optional). If not supplied, [`global_graphql_client`](@ref) is used.
Expand All @@ -51,7 +52,12 @@ This function is designed to be used with the `do` keyword.
- `throw_on_execution_error=false`: set to `true` to stop an error being thrown if the GraphQL server
response contains errors that occurred during execution.
- `verbose=0`: set to 1, 2 for extra logging.

- `websocket_protocol=join(", ", GQL_WS_PROTOCOLS)`: Will try to communicate with [apollographql's
subcription-transport-protocol](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md)
or with the newer [graphql-ws](https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md) protocol.
With the default setup, the protocol that is actually used, is selected by the server. If you want to
enforce the subprotocol, you can adjust this accordingly. The string constants `PROTOCOL_APOLLO_OLD`, `PROTOCOL_GRAPHQL_WS`
contain the names of the sub-protocols, respectively.
# Examples
```julia
julia> open_subscription("subSaveUser", sub_args=Dict("role" => "SYSTEM_ADMIN")) do result
Expand All @@ -78,67 +84,69 @@ function open_subscription(fn::Function,
subtimeout=0,
stopfn=nothing,
throw_on_execution_error=false,
verbose=0)
verbose=0,
websocket_protocol=join(", ", PROTOCOL_GRAPHQL_WS))

!in(get_name(subscription_name), get_subscriptions(client)) && throw(GraphQLError("$(get_name(subscription_name)) is not an existing subscription"))

output_str = get_output_str(output_fields)
payload = get_generic_query_payload(client, "subscription", subscription_name, sub_args, output_str)
subscription_payload = get_generic_query_payload(client, "subscription", subscription_name, sub_args, output_str)

sub_id = string(length(keys(subscription_tracker[])) + 1)
sub_id *= "-" * string(Threads.threadid())
message = Dict(
"id" => string(sub_id),
"type" => "start",
"payload" => payload
)
message_str = JSON3.write(message)

HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=client.headers) do ws
# Start sub
output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id")
write(ws, message_str)
subscription_tracker[][sub_id] = "open"

# Init function
if !isnothing(initfn)
output_debug(verbose) && println("Running subscription initialisation function")
initfn()
end
throw_if_assigned = Ref{GraphQLError}()
headers = Dict(client.headers)
# We currently implement the `apollographql/subscriptions-transport-ws` which is default in hasura and others
# Defined here https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
# TODO: Add support for the newer `graphql-transport-ws` from the graphql-ws library.
# ()
headers["Sec-WebSocket-Protocol"] = websocket_protocol

# Get listening
output_debug(verbose) && println("Listening to $(get_name(subscription_name)) with ID $sub_id...")

# Run function
finish = false
while !finish
data = readfromwebsocket(ws, stopfn, subtimeout)
if data === :timeout
output_info(verbose) && println("Subscription $sub_id timed out")
break
elseif data === :stopfn
output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied")
break
end
response = JSON3.read(data::Vector{UInt8}, GQLSubscriptionResponse{output_type})
payload = response.payload
if !isnothing(payload.errors) && !isempty(payload.errors) && throw_on_execution_error
subscription_tracker[][sub_id] = "errored"
throw(GraphQLError("Error during subscription.", payload))
end
# Handle multiple subs, do we need this?
if response.id == string(sub_id)
output_debug(verbose) && println("Result recieved on subscription with ID $sub_id")
finish = fn(payload)
if !isa(finish, Bool)
subscription_tracker[][sub_id] = "errored"
error("Subscription function must return a boolean")
end
end
HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=headers) do ws
# Start sub
output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id")
selected_protocol = HTTP.header(ws.response, "Sec-WebSocket-Protocol")
output_info(verbose) && println("Headers - $selected_protocol, $(join(' ', HTTP.headers(ws.response)))")
if selected_protocol == PROTOCOL_APOLLO_OLD
handle_apollo_old(
fn,
ws,
subscription_name,
subscription_payload,
sub_id,
output_type;
initfn=initfn,
subtimeout=subtimeout,
stopfn=stopfn,
throw_on_execution_error=throw_on_execution_error,
verbose=verbose,
throw_if_assigned_ref=throw_if_assigned
)
else
if selected_protocol != PROTOCOL_GRAPHQL_WS
@warn("None of the implemented protocols match - trying to use \"$(PROTOCOL_GRAPHQL_WS)\"")
end
handle_graphql_ws(
fn,
ws,
subscription_name,
subscription_payload,
sub_id,
output_type;
initfn=initfn,
subtimeout=subtimeout,
stopfn=stopfn,
throw_on_execution_error=throw_on_execution_error,
verbose=verbose,
throw_if_assigned_ref=throw_if_assigned
)
end
end
# We can't throw errors from the ws handle function in HTTP.jl 1.#, as they get digested.
isassigned(throw_if_assigned) && throw(throw_if_assigned[])
output_debug(verbose) && println("Finished. Closing subscription")
subscription_tracker[][sub_id] = "closed"
subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_CLOSED
return
end

Expand All @@ -155,7 +163,7 @@ function clear_subscriptions()
end
end

function async_reader_with_timeout(io::IO, subtimeout)::Channel
function async_reader_with_timeout(ws::HTTP.WebSockets.WebSocket, subtimeout)::Channel
ch = Channel(1)
task = @async begin
reader_task = current_task()
Expand All @@ -164,15 +172,15 @@ function async_reader_with_timeout(io::IO, subtimeout)::Channel
Base.throwto(reader_task, InterruptException())
end
timeout = Timer(timeout_cb, subtimeout)
data = readavailable(io)
data = HTTP.receive(ws)
subtimeout > 0 && close(timeout) # Cancel the timeout
put!(ch, data)
end
bind(ch, task)
return ch
end

function async_reader_with_stopfn(io::IO, stopfn, checktime)::Channel
function async_reader_with_stopfn(ws::HTTP.WebSockets.WebSocket, stopfn, checktime)::Channel
ch = Channel(1) # Could we make this channel concretely typed?
task = @async begin
reader_task = current_task()
Expand All @@ -185,7 +193,7 @@ function async_reader_with_stopfn(io::IO, stopfn, checktime)::Channel
end
end
timeout = Timer(timeout_cb, checktime)
data = readavailable(io)
data = HTTP.WebSockets.receive(ws)
close(timeout) # Cancel the timeout
put!(ch, data)
end
Expand All @@ -209,7 +217,7 @@ A channel is returned with the data. If `stopfn` stops the websocket,
the data will be `:stopfn`. If the timeout stops the websocket,
the data will be `:timeout`
"""
function readfromwebsocket(ws::IO, stopfn, subtimeout)
function readfromwebsocket(ws::HTTP.WebSockets.WebSocket, stopfn, subtimeout)
if isnothing(stopfn) && subtimeout > 0
ch_out = async_reader_with_timeout(ws, subtimeout)
data = take!(ch_out)
Expand All @@ -218,7 +226,19 @@ function readfromwebsocket(ws::IO, stopfn, subtimeout)
ch_out = async_reader_with_stopfn(ws, stopfn, checktime)
data = take!(ch_out)
else
data = readavailable(ws)
data = HTTP.receive(ws)
end
return data
end
struct Interrupt <: Exception
end

function checkreturn(data, verbose, sub_id)
if data === :timeout
output_info(verbose) && println("Subscription $sub_id timed out")
throw(Interrupt())
elseif data === :stopfn
output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied")
throw(Interrupt())
end
end
Loading