Skip to content

Commit d21517c

Browse files
committed
fix: use dynamic adapter_name
This also allows the usage of the pubsub shared adapter test
1 parent f1c898e commit d21517c

File tree

2 files changed

+14
-201
lines changed

2 files changed

+14
-201
lines changed

lib/realtime/gen_rpc/pub_sub.ex

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,27 @@ defmodule Realtime.GenRpcPubSub do
77
alias Realtime.GenRpc
88
use Supervisor
99

10-
@adapter_name :gen_rpc
11-
1210
@impl true
1311
def node_name(_), do: node()
1412

1513
# Supervisor callbacks
1614

1715
def start_link(opts) do
16+
adapter_name = Keyword.fetch!(opts, :adapter_name)
1817
name = Keyword.fetch!(opts, :name)
1918
pool_size = Keyword.get(opts, :pool_size, 1)
2019
broadcast_pool_size = Keyword.get(opts, :broadcast_pool_size, pool_size)
2120

22-
Supervisor.start_link(__MODULE__, {name, broadcast_pool_size}, name: :"#{name}#{@adapter_name}_supervisor")
21+
Supervisor.start_link(__MODULE__, {adapter_name, name, broadcast_pool_size},
22+
name: :"#{name}#{adapter_name}_supervisor"
23+
)
2324
end
2425

2526
@impl true
26-
def init({pubsub, pool_size}) do
27-
workers = for number <- 1..pool_size, do: :"#{pubsub}#{@adapter_name}_#{number}"
27+
def init({adapter_name, pubsub, pool_size}) do
28+
workers = for number <- 1..pool_size, do: :"#{pubsub}#{adapter_name}_#{number}"
2829

29-
:persistent_term.put(@adapter_name, List.to_tuple(workers))
30+
:persistent_term.put(adapter_name, List.to_tuple(workers))
3031

3132
children =
3233
for worker <- workers do
@@ -36,20 +37,20 @@ defmodule Realtime.GenRpcPubSub do
3637
Supervisor.init(children, strategy: :one_for_one)
3738
end
3839

39-
defp worker(key) do
40-
workers = :persistent_term.get(@adapter_name)
40+
defp worker_name(adapter_name, key) do
41+
workers = :persistent_term.get(adapter_name)
4142
elem(workers, :erlang.phash2(key, tuple_size(workers)))
4243
end
4344

4445
@impl true
45-
def broadcast(_adapter_name, topic, message, dispatcher) do
46-
worker = worker(self())
46+
def broadcast(adapter_name, topic, message, dispatcher) do
47+
worker = worker_name(adapter_name, self())
4748
GenRpc.abcast(Node.list(), worker, forward_to_local(topic, message, dispatcher), key: worker)
4849
end
4950

5051
@impl true
51-
def direct_broadcast(_adapter_name, node_name, topic, message, dispatcher) do
52-
worker = worker(self())
52+
def direct_broadcast(adapter_name, node_name, topic, message, dispatcher) do
53+
worker = worker_name(adapter_name, self())
5354
GenRpc.abcast([node_name], worker, forward_to_local(topic, message, dispatcher), key: worker)
5455
end
5556

Lines changed: 1 addition & 189 deletions
Original file line numberDiff line numberDiff line change
@@ -1,190 +1,2 @@
11
Application.put_env(:phoenix_pubsub, :test_adapter, {Realtime.GenRpcPubSub, []})
2-
3-
# Original: https://github.com/phoenixframework/phoenix_pubsub/blob/v2.1.3/test/shared/pubsub_test.exs
4-
# We are copying this test from phoenix_pubsub because we don't want to run this
5-
# test case "async: true" as it conflicts with the running pub sub adapter
6-
# We also need to reset the persitent_term.
7-
defmodule Phoenix.PubSubTest do
8-
@moduledoc """
9-
Sets up PubSub Adapter testcases.
10-
11-
## Usage
12-
13-
To test a PubSub adapter, set the `:test_adapter` on the `:phoenix_pubsub`
14-
configuration and require this file, ie:
15-
16-
# your_pubsub_adapter_test.exs
17-
Application.put_env(:phoenix_pubsub, :test_adapter, {Phoenix.PubSub.PG2, []})
18-
Code.require_file "../deps/phoenix_pubsub/test/shared/pubsub_test.exs", __DIR__
19-
20-
"""
21-
22-
use ExUnit.Case, async: false
23-
alias Phoenix.PubSub
24-
25-
# Reset the persistent term that GenRpc adapter uses
26-
setup_all do
27-
previous = :persistent_term.get(:gen_rpc)
28-
on_exit(fn -> :persistent_term.put(:gen_rpc, previous) end)
29-
:ok
30-
end
31-
32-
defp subscribers(config, topic) do
33-
Registry.lookup(config.pubsub, topic)
34-
end
35-
36-
defp rpc(pid, func) do
37-
Agent.get(pid, fn :ok -> func.() end)
38-
end
39-
40-
defp spawn_pid do
41-
{:ok, pid} = Agent.start_link(fn -> :ok end)
42-
pid
43-
end
44-
45-
defmodule CustomDispatcher do
46-
def dispatch(entries, from, message) do
47-
for {pid, metadata} <- entries do
48-
send(pid, {:custom, metadata, from, message})
49-
end
50-
51-
:ok
52-
end
53-
end
54-
55-
setup config do
56-
size = config[:pool_size] || 1
57-
{adapter, adapter_opts} = Application.get_env(:phoenix_pubsub, :test_adapter)
58-
adapter_opts = [adapter: adapter, name: config.test, pool_size: size] ++ adapter_opts
59-
start_supervised!({Phoenix.PubSub, adapter_opts})
60-
61-
opts = %{
62-
pubsub: config.test,
63-
topic: to_string(config.test),
64-
pool_size: size,
65-
node: Phoenix.PubSub.node_name(config.test)
66-
}
67-
68-
{:ok, opts}
69-
end
70-
71-
test "node_name/1 returns the node name", config do
72-
assert is_atom(config.node) or is_binary(config.node)
73-
end
74-
75-
for size <- [1, 8] do
76-
@tag pool_size: size
77-
test "pool #{size}: subscribe and unsubscribe", config do
78-
pid = spawn_pid()
79-
assert subscribers(config, config.topic) |> length == 0
80-
assert rpc(pid, fn -> PubSub.subscribe(config.pubsub, config.topic) end)
81-
assert subscribers(config, config.topic) == [{pid, nil}]
82-
assert rpc(pid, fn -> PubSub.unsubscribe(config.pubsub, config.topic) end)
83-
assert subscribers(config, config.topic) |> length == 0
84-
end
85-
86-
@tag pool_size: size
87-
test "pool #{size}: broadcast/3 and broadcast!/3 publishes message to each subscriber",
88-
config do
89-
PubSub.subscribe(config.pubsub, config.topic)
90-
:ok = PubSub.broadcast(config.pubsub, config.topic, :ping)
91-
assert_receive :ping
92-
:ok = PubSub.broadcast!(config.pubsub, config.topic, :ping)
93-
assert_receive :ping
94-
end
95-
96-
@tag pool_size: size
97-
test "pool #{size}: broadcast/3 does not publish message to other topic subscribers",
98-
config do
99-
PubSub.subscribe(config.pubsub, "unknown")
100-
101-
Enum.each(0..10, fn _ ->
102-
rpc(spawn_pid(), fn -> PubSub.subscribe(config.pubsub, config.topic) end)
103-
end)
104-
105-
:ok = PubSub.broadcast(config.pubsub, config.topic, :ping)
106-
refute_received :ping
107-
end
108-
109-
@tag pool_size: size
110-
test "pool #{size}: broadcast_from/4 and broadcast_from!/4 skips sender", config do
111-
PubSub.subscribe(config.pubsub, config.topic)
112-
113-
PubSub.broadcast_from(config.pubsub, self(), config.topic, :ping)
114-
refute_received :ping
115-
116-
PubSub.broadcast_from!(config.pubsub, self(), config.topic, :ping)
117-
refute_received :ping
118-
end
119-
120-
@tag pool_size: size
121-
test "pool #{size}: unsubscribe on not subscribed topic noops", config do
122-
assert :ok = PubSub.unsubscribe(config.pubsub, config.topic)
123-
assert subscribers(config, config.topic) == []
124-
end
125-
126-
@tag pool_size: size
127-
test "pool #{size}: direct_broadcast sends to given node", config do
128-
PubSub.subscribe(config.pubsub, config.topic)
129-
130-
PubSub.direct_broadcast(config.node, config.pubsub, config.topic, :ping)
131-
assert_receive :ping
132-
133-
PubSub.direct_broadcast!(config.node, config.pubsub, config.topic, :ping)
134-
assert_receive :ping
135-
end
136-
137-
@tag pool_size: size
138-
test "pool #{size}: direct_broadcast sends to unknown node", config do
139-
PubSub.subscribe(config.pubsub, config.topic)
140-
141-
PubSub.direct_broadcast(:"IDONTKNOW@127.0.0.1", config.pubsub, config.topic, :ping)
142-
refute_received :ping
143-
144-
PubSub.direct_broadcast!(:"IDONTKNOW@127.0.0.1", config.pubsub, config.topic, :ping)
145-
refute_received :ping
146-
end
147-
148-
@tag pool_size: size
149-
test "pool #{size}: local_broadcast sends to the current node", config do
150-
PubSub.subscribe(config.pubsub, config.topic)
151-
152-
PubSub.local_broadcast(config.pubsub, config.topic, :ping)
153-
assert_receive :ping
154-
end
155-
156-
@tag pool_size: size
157-
test "pool #{size}: local_broadcast_from/5 skips sender", config do
158-
PubSub.subscribe(config.pubsub, config.topic)
159-
160-
PubSub.local_broadcast_from(config.pubsub, self(), config.topic, :ping)
161-
refute_received :ping
162-
end
163-
164-
@tag pool_size: size
165-
test "pool #{size}: with custom dispatching", %{topic: topic, test: test, node: node} do
166-
PubSub.subscribe(test, topic)
167-
PubSub.subscribe(test, topic, metadata: :special)
168-
169-
PubSub.broadcast(test, topic, :broadcast, CustomDispatcher)
170-
assert_receive {:custom, nil, :none, :broadcast}
171-
assert_receive {:custom, :special, :none, :broadcast}
172-
173-
PubSub.broadcast_from(test, self(), topic, :broadcast_from, CustomDispatcher)
174-
assert_receive {:custom, nil, pid, :broadcast_from} when pid == self()
175-
assert_receive {:custom, :special, pid, :broadcast_from} when pid == self()
176-
177-
PubSub.local_broadcast(test, topic, :local, CustomDispatcher)
178-
assert_receive {:custom, nil, :none, :local}
179-
assert_receive {:custom, :special, :none, :local}
180-
181-
PubSub.local_broadcast_from(test, self(), topic, :local_from, CustomDispatcher)
182-
assert_receive {:custom, nil, pid, :local_from} when pid == self()
183-
assert_receive {:custom, :special, pid, :local_from} when pid == self()
184-
185-
PubSub.direct_broadcast(node, test, topic, :direct, CustomDispatcher)
186-
assert_receive {:custom, nil, :none, :direct}
187-
assert_receive {:custom, :special, :none, :direct}
188-
end
189-
end
190-
end
2+
Code.require_file("../../deps/phoenix_pubsub/test/shared/pubsub_test.exs", __DIR__)

0 commit comments

Comments
 (0)