Skip to content

Commit

Permalink
Add streaming upload
Browse files Browse the repository at this point in the history
* Body for put/post/patch can be a {:stream, enumerable}
* Stream.transform puts bytes into the socket lazily,
  halting at the first error and emitting that error
  • Loading branch information
rozap committed Dec 1, 2016
1 parent 7291d3f commit 7144868
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
28 changes: 26 additions & 2 deletions lib/httpoison/base.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ defmodule HTTPoison.Base do
* binary, char list or an iolist
* `{:form, [{K, V}, ...]}` - send a form url encoded
* `{:file, "/path/to/file"}` - send a file
* `{:stream, enumerable} - lazily send a stream of binaries/charlists
Options:
* `:timeout` - timeout to establish a connection, in milliseconds. Default is 8000
Expand Down Expand Up @@ -415,12 +416,12 @@ defmodule HTTPoison.Base do
hn_options
end


@doc false
def request(module, method, request_url, request_body, request_headers, options, process_status_code, process_headers, process_response_body) do
hn_options = build_hackney_options(module, options)

case :hackney.request(method, request_url, request_headers,
request_body, hn_options) do
case do_request(method, request_url, request_headers, request_body, hn_options) do
{:ok, status_code, headers} -> response(process_status_code, process_headers, process_response_body, status_code, headers, "")
{:ok, status_code, headers, client} ->
case :hackney.body(client) do
Expand All @@ -432,6 +433,29 @@ defmodule HTTPoison.Base do
end
end

defp do_request(method, request_url, request_headers, {:stream, enumerable}, hn_options) do
with {:ok, ref} <- :hackney.request(method, request_url, request_headers, :stream, hn_options) do

failures = Stream.transform(enumerable, :ok, fn
_, :error -> {:halt, :error}
bin, :ok -> {[], :hackney.send_body(ref, bin)}
_, error -> {[error], :error}
end) |> Enum.into([])

case failures do
[] ->
:hackney.start_response(ref)
[failure] ->
failure
end
end
end

defp do_request(method, request_url, request_headers, request_body, hn_options) do
:hackney.request(method, request_url, request_headers,
request_body, hn_options)
end

defp response(process_status_code, process_headers, process_response_body, status_code, headers, body) do
{:ok, %Response {
status_code: process_status_code.(status_code),
Expand Down
11 changes: 11 additions & 0 deletions test/httpoison_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ defmodule HTTPoisonTest do
assert_response(response)
end

test "post streaming body" do
expected = %{"some" => "bytes"}
enumerable = JSX.encode!(expected) |> String.split("")
headers = %{"Content-type" => "application/json"}
response = HTTPoison.post("localhost:8080/post", {:stream, enumerable}, headers)
assert_response response
{:ok, %HTTPoison.Response{body: body}} = response

assert JSX.decode!(body)["json"] == expected
end

defp assert_response({:ok, response}, function \\ nil) do
assert is_list(response.headers)
assert response.status_code == 200
Expand Down

0 comments on commit 7144868

Please sign in to comment.