From 8ca2231856cba88cd3ab1e018d5c55e1f1dab853 Mon Sep 17 00:00:00 2001 From: Christopher Bertels Date: Sun, 23 Oct 2016 02:39:07 +0200 Subject: [PATCH] Add support for hackney async once & stream_next/1 This will prevent the calling process to be overwhelmed with messages and lets it decide when to get the next async message from hackney. --- lib/httpoison/base.ex | 21 ++++++++++++++++++++- test/httpoison_test.exs | 20 ++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/lib/httpoison/base.ex b/lib/httpoison/base.ex index f33ad87..05bd234 100644 --- a/lib/httpoison/base.ex +++ b/lib/httpoison/base.ex @@ -128,6 +128,7 @@ defmodule HTTPoison.Base do * `:timeout` - timeout to establish a connection, in milliseconds. Default is 8000 * `:recv_timeout` - timeout used when receiving a connection. Default is 5000 * `:stream_to` - a PID to stream the response to + * `:async` - if given `:once`, will only stream one message at a time, requires call to `stream_next` * `:proxy` - a proxy to be used for the request; it can be a regular url or a `{Host, Proxy}` tuple * `:proxy_auth` - proxy authentication `{User, Password}` tuple @@ -331,6 +332,19 @@ defmodule HTTPoison.Base do @spec options!(binary, headers, Keyword.t) :: Response.t | AsyncResponse.t def options!(url, headers \\ [], options \\ []), do: request!(:options, url, "", headers, options) + @doc """ + Requests the next message to be streamed for a given `HTTPoison.AsyncResponse`. + + See `request!/5` for more detailed information. + """ + @spec stream_next(AsyncResponse.t) :: {:ok, AsyncResponse.t} | {:error, Error.t} + def stream_next(resp = %AsyncResponse{ id: id }) do + case :hackney.stream_next(id) do + :ok -> {:ok, resp} + err -> {:error, %Error{reason: "stream_next/1 failed", id: id}} + end + end + defoverridable Module.definitions_in(__MODULE__) end end @@ -370,6 +384,7 @@ defmodule HTTPoison.Base do timeout = Keyword.get options, :timeout recv_timeout = Keyword.get options, :recv_timeout stream_to = Keyword.get options, :stream_to + async = Keyword.get options, :async proxy = Keyword.get options, :proxy proxy_auth = Keyword.get options, :proxy_auth ssl = Keyword.get options, :ssl @@ -388,7 +403,11 @@ defmodule HTTPoison.Base do hn_options = if stream_to do - [:async, {:stream_to, spawn(module, :transformer, [stream_to])} | hn_options] + async_option = case async do + nil -> :async + :once -> {:async, :once} + end + [async_option, {:stream_to, spawn(module, :transformer, [stream_to])} | hn_options] else hn_options end diff --git a/test/httpoison_test.exs b/test/httpoison_test.exs index d08c530..2572488 100644 --- a/test/httpoison_test.exs +++ b/test/httpoison_test.exs @@ -142,6 +142,26 @@ defmodule HTTPoisonTest do assert is_list(headers) end + test "asynchronous request with explicit streaming using [async: :once]" do + {:ok, resp = %HTTPoison.AsyncResponse{id: id}} = HTTPoison.get "localhost:8080/get", [], [stream_to: self(), async: :once] + + assert_receive %HTTPoison.AsyncStatus{ id: ^id, code: 200 }, 100 + + refute_receive %HTTPoison.AsyncHeaders{ id: ^id, headers: headers }, 100 + {:ok, ^resp} = HTTPoison.stream_next(resp) + assert_receive %HTTPoison.AsyncHeaders{ id: ^id, headers: headers }, 100 + + refute_receive %HTTPoison.AsyncChunk{ id: ^id, chunk: _chunk }, 100 + {:ok, ^resp} = HTTPoison.stream_next(resp) + assert_receive %HTTPoison.AsyncChunk{ id: ^id, chunk: _chunk }, 100 + + refute_receive %HTTPoison.AsyncEnd{ id: ^id }, 100 + {:ok, ^resp} = HTTPoison.stream_next(resp) + assert_receive %HTTPoison.AsyncEnd{ id: ^id }, 100 + + assert is_list(headers) + end + test "asynchronous redirected get request" do {:ok, %HTTPoison.AsyncResponse{id: id}} = HTTPoison.get "localhost:8080/redirect/2", [], [stream_to: self(), hackney: [follow_redirect: true]]