Skip to content

Update HTTP.jl compat #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "GraphQLClient"
uuid = "09d831e3-9c21-47a9-bfd8-076871817219"
version = "0.7.5"
version = "0.7.6"

[deps]
GraphQLParser = "0ae10fbf-af58-4883-b66b-ff0ac82d20dd"
Expand All @@ -10,7 +10,7 @@ StructTypes = "856f2bd8-1eba-4b0a-8007-ebc267875bd4"

[compat]
GraphQLParser = "0.1.1"
HTTP = "0.8.17, 0.9"
HTTP = "1"
JSON3 = "1.1.2"
StructTypes = "1.5"
julia = "1.6"
25 changes: 14 additions & 11 deletions src/subscriptions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ function open_subscription(fn::Function,
"payload" => payload
)
message_str = JSON3.write(message)

throw_if_assigned = Ref{GraphQLError}()
HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=client.headers) do ws
# Start sub
output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id")
write(ws, message_str)
HTTP.send(ws, message_str)
subscription_tracker[][sub_id] = "open"

# Init function
Expand All @@ -120,11 +120,12 @@ function open_subscription(fn::Function,
output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied")
break
end
response = JSON3.read(data::Vector{UInt8}, GQLSubscriptionResponse{output_type})
response = JSON3.read(data, GQLSubscriptionResponse{output_type})
payload = response.payload
if !isnothing(payload.errors) && !isempty(payload.errors) && throw_on_execution_error
subscription_tracker[][sub_id] = "errored"
throw(GraphQLError("Error during subscription.", payload))
throw_if_assigned[] = GraphQLError("Error during subscription.", payload)
break
end
# Handle multiple subs, do we need this?
if response.id == string(sub_id)
Expand All @@ -137,6 +138,8 @@ function open_subscription(fn::Function,
end
end
end
# We can't throw errors from the ws handle function in HTTP.jl 1.0, as they get digested.
isassigned(throw_if_assigned) && throw(throw_if_assigned[])
output_debug(verbose) && println("Finished. Closing subscription")
subscription_tracker[][sub_id] = "closed"
return
Expand All @@ -155,7 +158,7 @@ function clear_subscriptions()
end
end

function async_reader_with_timeout(io::IO, subtimeout)::Channel
function async_reader_with_timeout(ws::HTTP.WebSockets.WebSocket, subtimeout)::Channel
ch = Channel(1)
task = @async begin
reader_task = current_task()
Expand All @@ -164,15 +167,15 @@ function async_reader_with_timeout(io::IO, subtimeout)::Channel
Base.throwto(reader_task, InterruptException())
end
timeout = Timer(timeout_cb, subtimeout)
data = readavailable(io)
data = HTTP.receive(ws)
subtimeout > 0 && close(timeout) # Cancel the timeout
put!(ch, data)
end
bind(ch, task)
return ch
end

function async_reader_with_stopfn(io::IO, stopfn, checktime)::Channel
function async_reader_with_stopfn(ws::HTTP.WebSockets.WebSocket, stopfn, checktime)::Channel
ch = Channel(1) # Could we make this channel concretely typed?
task = @async begin
reader_task = current_task()
Expand All @@ -185,7 +188,7 @@ function async_reader_with_stopfn(io::IO, stopfn, checktime)::Channel
end
end
timeout = Timer(timeout_cb, checktime)
data = readavailable(io)
data = HTTP.WebSockets.receive(ws)
close(timeout) # Cancel the timeout
put!(ch, data)
end
Expand All @@ -209,7 +212,7 @@ A channel is returned with the data. If `stopfn` stops the websocket,
the data will be `:stopfn`. If the timeout stops the websocket,
the data will be `:timeout`
"""
function readfromwebsocket(ws::IO, stopfn, subtimeout)
function readfromwebsocket(ws::HTTP.WebSockets.WebSocket, stopfn, subtimeout)
if isnothing(stopfn) && subtimeout > 0
ch_out = async_reader_with_timeout(ws, subtimeout)
data = take!(ch_out)
Expand All @@ -218,7 +221,7 @@ function readfromwebsocket(ws::IO, stopfn, subtimeout)
ch_out = async_reader_with_stopfn(ws, stopfn, checktime)
data = take!(ch_out)
else
data = readavailable(ws)
data = HTTP.receive(ws)
end
return data
end
end
4 changes: 2 additions & 2 deletions test/http_execution.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ end

# handle_error
@test_throws ArgumentError test_error_handler(GraphQLClient.handle_error, ArgumentError("msg"))
@test_throws HTTP.StatusError test_error_handler(GraphQLClient.handle_error, HTTP.StatusError(404, HTTP.Response(404;request=HTTP.Request(), body="{}")))
@test_throws GraphQLClient.GraphQLError test_error_handler(GraphQLClient.handle_error, HTTP.StatusError(400, HTTP.Response(400;request=HTTP.Request(), body="{}")))
@test_throws HTTP.StatusError test_error_handler(GraphQLClient.handle_error, HTTP.StatusError(404, "POST", "", HTTP.Response(404;request=HTTP.Request(), body="{}")))
@test_throws GraphQLClient.GraphQLError test_error_handler(GraphQLClient.handle_error, HTTP.StatusError(400, "POST", "", HTTP.Response(400;request=HTTP.Request(), body="{}")))

# handle_deserialisation_error
@test_throws MethodError test_error_handler(GraphQLClient.handle_deserialisation_error, MethodError(""), "", "")
Expand Down
35 changes: 16 additions & 19 deletions test/subscriptions.jl
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
function listen_localhost()
@async HTTP.listen(HTTP.Sockets.localhost, 8080) do http
if HTTP.WebSockets.is_upgrade(http.message)
if HTTP.WebSockets.isupgrade(http.message)
HTTP.WebSockets.upgrade(http) do ws
while !eof(ws)
data = readavailable(ws)
write(ws, data)
for data in ws
HTTP.send(ws, data)
end
end
end
Expand All @@ -13,10 +12,10 @@ end

function do_nothing_localhost()
@async HTTP.listen(HTTP.Sockets.localhost, 8081) do http
if HTTP.WebSockets.is_upgrade(http.message)
if HTTP.WebSockets.isupgrade(http.message)
HTTP.WebSockets.upgrade(http) do ws
while !eof(ws)
data = readavailable(ws)
for data in ws
nothing;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the semi colon add anything here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really, it just looked nicer at the time 😅

end
end
end
Expand All @@ -31,7 +30,7 @@ end
@test take!(ch) == :timeout

ch = GraphQLClient.async_reader_with_timeout(ws, 5)
write(ws, "Data")
HTTP.send(ws, "Data")
@test String(take!(ch)) == "Data"

# stopfn
Expand All @@ -44,11 +43,11 @@ end
@test take!(ch) == :stopfn
stop[] = false
ch = GraphQLClient.async_reader_with_stopfn(ws, stopfn, 0.5)
write(ws, "Data")
HTTP.send(ws, "Data")
@test String(take!(ch)) == "Data"

# readfromwebsocket - no timeout or stopfn
write(ws, "Data")
HTTP.send(ws, "Data")
@test String(GraphQLClient.readfromwebsocket(ws, nothing, 0)) == "Data"

# readfromwebsocket - timeout
Expand All @@ -70,10 +69,9 @@ end

function send_error_localhost(message, port)
@async HTTP.listen(HTTP.Sockets.localhost, port) do http
if HTTP.WebSockets.is_upgrade(http.message)
if HTTP.WebSockets.isupgrade(http.message)
HTTP.WebSockets.upgrade(http) do ws
while !eof(ws)
data = readavailable(ws)
for data in ws
isempty(data) && continue
query = JSON3.read(data)
error_payload = """
Expand All @@ -92,7 +90,7 @@ function send_error_localhost(message, port)
}
}
"""
write(ws, error_payload)
HTTP.send(ws, error_payload)
end
end
end
Expand All @@ -101,10 +99,9 @@ end

function send_data_localhost(sub_name, port)
@async HTTP.listen(HTTP.Sockets.localhost, port) do http
if HTTP.WebSockets.is_upgrade(http.message)
if HTTP.WebSockets.isupgrade(http.message)
HTTP.WebSockets.upgrade(http) do ws
while !eof(ws)
data = readavailable(ws)
for data in ws
isempty(data) && continue
query = JSON3.read(data)
data_payload = """
Expand All @@ -119,7 +116,7 @@ function send_data_localhost(sub_name, port)
}
}
"""
write(ws, data_payload)
HTTP.send(ws, data_payload)
end
end
end
Expand Down Expand Up @@ -208,4 +205,4 @@ end
@test results[1] isa GraphQLClient.GQLResponse{Response}
@test isnothing(results[1].errors)
@test !isnothing(results[1].data) # No point testing content as we've coded it in the test function
end
end