Skip to content

Commit e746800

Browse files
authored
Fix bug in expiry/3 (#46)
* Fix bug in expiry/3 This fix changes expiry/3 in such a way that the expiry time is evaluated at runtime of the stream, rather than definition time. This grants greater flexibility and enables setting up a configured stream in advance (e.g. at compile time or in a gen_server init) and passing the stream around for use in various places, at various times. See the added tests for an example. * Refactor expiry/3 to make credo happy Co-authored-by: Joshua Trees <me@jtrees.io>
1 parent 1e9572c commit e746800

File tree

2 files changed

+43
-16
lines changed

2 files changed

+43
-16
lines changed

lib/retry/delay_streams.ex

+25-16
Original file line numberDiff line numberDiff line change
@@ -159,25 +159,34 @@ defmodule Retry.DelayStreams do
159159
"""
160160
@spec expiry(Enumerable.t(), pos_integer(), pos_integer()) :: Enumerable.t()
161161
def expiry(delays, time_budget, min_delay \\ 100) do
162-
end_t = :os.system_time(:milli_seconds) + time_budget
163-
164-
Stream.transform(delays, :normal, fn preferred_delay, status ->
165-
now_t = :os.system_time(:milli_seconds)
166-
remaining_t = Enum.max([end_t - now_t, min_delay])
162+
Stream.resource(
163+
fn -> {delays, :os.system_time(:milli_seconds) + time_budget} end,
164+
fn
165+
:at_end -> {:halt, :at_end}
166+
{remaining_delays, end_t} -> reduce_delays(remaining_delays, end_t, min_delay)
167+
end,
168+
fn _ -> :noop end
169+
)
170+
end
167171

168-
cond do
169-
# time expired!
170-
:at_end == status ->
171-
{:halt, status}
172+
defp reduce_delays(remaining_delays, end_t, min_delay) do
173+
case Enum.take(remaining_delays, 1) do
174+
[preferred_delay] ->
175+
now_t = :os.system_time(:milli_seconds)
176+
remaining_t = Enum.max([end_t - now_t, min_delay])
172177

173-
# one last try
174-
preferred_delay >= remaining_t or remaining_t == min_delay ->
178+
if preferred_delay >= remaining_t or remaining_t == min_delay do
179+
# one last try before time budget is exceeded
175180
{[remaining_t], :at_end}
176-
177-
true ->
178-
{[preferred_delay], status}
179-
end
180-
end)
181+
else
182+
# default
183+
{[preferred_delay], {Stream.drop(remaining_delays, 1), end_t}}
184+
end
185+
186+
_ ->
187+
# reached end of stream - no more tries
188+
{:halt, :at_end}
189+
end
181190
end
182191

183192
defp random_uniform(n) when n <= 0, do: 0

test/retry/delay_streams_test.exs

+18
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,24 @@ defmodule Retry.DelayStreamsTest do
9999
end)
100100
|> Enum.min() >= 10
101101
end
102+
103+
test "stops before the expiry is reached, if there are no elements left in the delay stream" do
104+
assert exponential_backoff() |> Stream.take(5) |> expiry(1_000) |> Enum.count() == 5
105+
end
106+
107+
test "evaluates expiry at runtime of the stream, rather than execution time of expiry/3" do
108+
delay_stream =
109+
[50]
110+
|> Stream.cycle()
111+
|> expiry(75, 50)
112+
113+
assert Enum.count(delay_stream, fn delay -> :timer.sleep(delay) end) == 2
114+
115+
# If the expiry time is determined when `expiry/3` is executed, we will
116+
# already be past it (due to all the sleeps performed previously) and jump
117+
# out after the first run.
118+
assert Enum.count(delay_stream, fn delay -> :timer.sleep(delay) end) == 2
119+
end
102120
end
103121

104122
describe "randomize/2" do

0 commit comments

Comments
 (0)