Skip to content

Commit 6523ca1

Browse files
committed
Ensure flow options effectively override producer dispatcher
1 parent 9686433 commit 6523ca1

File tree

3 files changed

+19
-5
lines changed

3 files changed

+19
-5
lines changed

lib/flow.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1291,7 +1291,7 @@ defmodule Flow do
12911291
"""
12921292
@spec partition(t | [t], keyword()) :: t
12931293
def partition(flow_or_flows, options \\ []) when is_list(options) do
1294-
merge(List.wrap(flow_or_flows), GenStage.PartitionDispatcher, options)
1294+
merge(flow_or_flows, GenStage.PartitionDispatcher, options)
12951295
end
12961296

12971297
@doc """

lib/flow/materialize.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ defmodule Flow.Materialize do
1313

1414
def materialize(%Flow{} = flow, demand, start_link, type, type_options) do
1515
%{operations: operations, options: options, producers: producers, window: window} = flow
16-
options = Keyword.merge(type_options, options)
16+
options = Keyword.merge(options, type_options)
1717
{ops, batchers} = split_operations(operations)
1818

1919
{producers, consumers, ops, window} =
@@ -148,9 +148,9 @@ defmodule Flow.Materialize do
148148
ops,
149149
start_link,
150150
window,
151-
options
151+
_options
152152
) do
153-
{producers, consumers} = materialize(flow, :forward, start_link, :producer_consumer, options)
153+
{producers, consumers} = materialize(flow, :forward, start_link, :producer_consumer, [])
154154
{type, {acc, fun, trigger}, ops} = ensure_ops(ops)
155155

156156
stages = Keyword.fetch!(flow.options, :stages)
@@ -195,7 +195,7 @@ defmodule Flow.Materialize do
195195
options
196196
) do
197197
{producers, intermediary} =
198-
materialize(flow, :forward, start_link, :producer_consumer, options)
198+
materialize(flow, :forward, start_link, :producer_consumer, [])
199199

200200
timeout = Keyword.get(options, :subscribe_timeout, 5_000)
201201

test/flow_test.exs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,6 +1099,20 @@ defmodule FlowTest do
10991099
|> Enum.sort() == [[1, 2, 4, 5], [3, 6]]
11001100
end
11011101

1102+
test "allows function based partitioning after shuffling" do
1103+
enumerables = [
1104+
[%{key: 1, value: 1}, %{key: 2, value: 2}, %{key: 3, value: 3}],
1105+
[%{key: 1, value: 4}, %{key: 2, value: 5}, %{key: 3, value: 6}]
1106+
]
1107+
1108+
assert Flow.from_enumerables(enumerables)
1109+
|> Flow.shuffle(stages: 2)
1110+
|> Flow.partition(key: & &1.key, stages: 2)
1111+
|> Flow.reduce(fn -> [] end, &[&1 | &2])
1112+
|> Flow.on_trigger(fn acc -> {[acc |> Enum.map(& &1.value) |> Enum.sort()], acc} end)
1113+
|> Enum.sort() == [[1, 2, 4, 5], [3, 6]]
1114+
end
1115+
11021116
test "allows custom windowing" do
11031117
window =
11041118
Flow.Window.fixed(1, :second, fn

0 commit comments

Comments
 (0)