Skip to content

Commit 90b154f

Browse files
committed
Improve docs and error messages
1 parent c1dc880 commit 90b154f

File tree

2 files changed

+22
-38
lines changed

2 files changed

+22
-38
lines changed

lib/flow.ex

Lines changed: 21 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,16 @@ defmodule Flow do
290290
functionality provided by Flow can also be built by hand using the
291291
`emit_and_reduce/3` and `on_trigger/2` functions.
292292
293+
In a nutshell, each stage in Flow goes through those steps:
294+
295+
* mapping and filtering (`map/2`, `filter/2`, `flat_map/2`)
296+
* reducing (`reduce/3`, `group_by/3`, `emit_and_reduce/3`)
297+
* emitting events (`emit_and_reduce/3`, `emit/2`, `on_trigger/2`)
298+
299+
The accumulator from reducing operations is shared with the one
300+
from emitting events. `emit_and_reduce/3` is special operation
301+
that allows both emitting and reducing events in one step.
302+
293303
See `Flow.Window` for a complete introduction to windows and triggers.
294304
295305
## Supervisable flows
@@ -1546,9 +1556,7 @@ defmodule Flow do
15461556
15471557
`acc_fun` is a function that receives no arguments and returns
15481558
the actual accumulator. The `acc_fun` function is invoked per window
1549-
whenever a new window starts. If a trigger is emitted and it is
1550-
configured to reset the accumulator, the `acc_fun` function will
1551-
be invoked once again.
1559+
whenever a new window starts.
15521560
15531561
Reducing will accumulate data until a trigger is emitted
15541562
or until a window completes. When that happens, the returned
@@ -1593,9 +1601,7 @@ defmodule Flow do
15931601
15941602
`acc_fun` is a function that receives no arguments and returns
15951603
the actual accumulator. The `acc_fun` function is invoked per window
1596-
whenever a new window starts. If a trigger is emitted and it is
1597-
configured to reset the accumulator, the `acc_fun` function will
1598-
be invoked once again.
1604+
whenever a new window starts.
15991605
16001606
This function behaves similarly to `reduce/3`, but in addition to
16011607
accumulating data, it also gives full control over what will be
@@ -1837,18 +1843,12 @@ defmodule Flow do
18371843
def emit(flow, type) do
18381844
unless has_any_reduce?(flow) do
18391845
raise ArgumentError,
1840-
"emit/2 must be called after a group_by/reduce operation as it works on the accumulated state"
1841-
end
1842-
1843-
if has_emit_reduce?(flow) do
1844-
raise ArgumentError,
1845-
"emit/2 cannot be called after emit_and_reduce/3 since events have already been emitted " <>
1846-
"(use on_trigger/2 if you want to further emit events or modify the state)"
1846+
"emit/2 must be called after a group_by/reduce/emit_and_reduce operation " <>
1847+
"as it works on the accumulated state"
18471848
end
18481849

18491850
if has_on_trigger?(flow) do
1850-
raise ArgumentError,
1851-
"emit/2 cannot be called after on_trigger/2 since events have already been emitted"
1851+
raise ArgumentError, "emit/2 cannot be called after emit/on_trigger"
18521852
end
18531853

18541854
case type do
@@ -1868,7 +1868,8 @@ defmodule Flow do
18681868
data while leveraging the parallelism between stages.
18691869
18701870
The given callback must return a tuple with elements to emit
1871-
and the new accumulator.
1871+
and the new accumulator. The new accumulator will then be used
1872+
for subsequent reductions by `reduce/3`, `group_by/3`, and friends.
18721873
18731874
## The callback arguments
18741875
@@ -1963,30 +1964,17 @@ defmodule Flow do
19631964
end
19641965

19651966
if has_on_trigger?(flow) do
1966-
raise ArgumentError, "on_trigger/2 can only be called once per partition"
1967+
raise ArgumentError, "on_trigger/2 cannot be called after emit/on_trigger"
19671968
end
19681969

19691970
add_operation(flow, {:on_trigger, on_trigger})
19701971
end
19711972

19721973
defp add_mapper(flow, name, args) do
1973-
if has_emit_reduce?(flow) do
1974-
raise ArgumentError,
1975-
"#{name}/#{length(args) + 1} cannot be called after emit_and_reduce/3 since events " <>
1976-
"have already been emitted (use on_trigger/2 if you want to further emit events or modify the state)"
1977-
end
1978-
1979-
if has_on_trigger?(flow) do
1980-
raise ArgumentError,
1981-
"#{name}/#{length(args) + 1} cannot be called after emit/1 and on_trigger/2 " <>
1982-
"since events have already been emitted"
1983-
end
1984-
19851974
if has_any_reduce?(flow) do
1986-
IO.warn(
1987-
"Using a mapper operation, such as map/filter/reject, after reduce/3 is deprecated. " <>
1988-
"Use Flow.on_trigger/2 instead"
1989-
)
1975+
raise ArgumentError,
1976+
"#{name}/#{length(args) + 1} cannot be called after group_by/reduce/emit_and_reduce operation " <>
1977+
"(use on_trigger/2 if you want to further emit events or the accumulated state)"
19901978
end
19911979

19921980
add_operation(flow, {:mapper, name, args})
@@ -2008,10 +1996,6 @@ defmodule Flow do
20081996
Enum.any?(operations, &match?({op, _, _} when op in [:reduce, :emit_and_reduce], &1))
20091997
end
20101998

2011-
defp has_emit_reduce?(%{operations: operations}) do
2012-
Enum.any?(operations, &match?({:emit_and_reduce, _, _}, &1))
2013-
end
2014-
20151999
defp has_on_trigger?(%{operations: operations}) do
20162000
Enum.any?(operations, &match?({:on_trigger, _}, &1))
20172001
end

test/flow_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ defmodule FlowTest do
157157

158158
test "on mapper after emit/1" do
159159
message =
160-
~r"map/2 cannot be called after emit/1 and on_trigger/2 since events have already been emitted"
160+
~r"map/2 cannot be called after group_by/reduce/emit_and_reduce operation"
161161

162162
assert_raise ArgumentError, message, fn ->
163163
Flow.from_enumerable([1, 2, 3])

0 commit comments

Comments
 (0)