|  | 
|  | 1 | +defmodule Realtime.GenRpcPubSub do | 
|  | 2 | +  @moduledoc """ | 
|  | 3 | +  gen_rpc Phoenix.PubSub adapter | 
|  | 4 | +  """ | 
|  | 5 | + | 
|  | 6 | +  @behaviour Phoenix.PubSub.Adapter | 
|  | 7 | +  alias Realtime.GenRpc | 
|  | 8 | +  use Supervisor | 
|  | 9 | + | 
|  | 10 | +  @impl true | 
|  | 11 | +  def node_name(_), do: node() | 
|  | 12 | + | 
|  | 13 | +  # Supervisor callbacks | 
|  | 14 | + | 
|  | 15 | +  def start_link(opts) do | 
|  | 16 | +    adapter_name = Keyword.fetch!(opts, :adapter_name) | 
|  | 17 | +    name = Keyword.fetch!(opts, :name) | 
|  | 18 | +    pool_size = Keyword.get(opts, :pool_size, 1) | 
|  | 19 | +    broadcast_pool_size = Keyword.get(opts, :broadcast_pool_size, pool_size) | 
|  | 20 | + | 
|  | 21 | +    Supervisor.start_link(__MODULE__, {adapter_name, name, broadcast_pool_size}, | 
|  | 22 | +      name: :"#{name}#{adapter_name}_supervisor" | 
|  | 23 | +    ) | 
|  | 24 | +  end | 
|  | 25 | + | 
|  | 26 | +  @impl true | 
|  | 27 | +  def init({adapter_name, pubsub, pool_size}) do | 
|  | 28 | +    workers = for number <- 1..pool_size, do: :"#{pubsub}#{adapter_name}_#{number}" | 
|  | 29 | + | 
|  | 30 | +    :persistent_term.put(adapter_name, List.to_tuple(workers)) | 
|  | 31 | + | 
|  | 32 | +    children = | 
|  | 33 | +      for worker <- workers do | 
|  | 34 | +        Supervisor.child_spec({Realtime.GenRpcPubSub.Worker, {pubsub, worker}}, id: worker) | 
|  | 35 | +      end | 
|  | 36 | + | 
|  | 37 | +    Supervisor.init(children, strategy: :one_for_one) | 
|  | 38 | +  end | 
|  | 39 | + | 
|  | 40 | +  defp worker_name(adapter_name, key) do | 
|  | 41 | +    workers = :persistent_term.get(adapter_name) | 
|  | 42 | +    elem(workers, :erlang.phash2(key, tuple_size(workers))) | 
|  | 43 | +  end | 
|  | 44 | + | 
|  | 45 | +  @impl true | 
|  | 46 | +  def broadcast(adapter_name, topic, message, dispatcher) do | 
|  | 47 | +    worker = worker_name(adapter_name, self()) | 
|  | 48 | +    GenRpc.abcast(Node.list(), worker, forward_to_local(topic, message, dispatcher), key: worker) | 
|  | 49 | +  end | 
|  | 50 | + | 
|  | 51 | +  @impl true | 
|  | 52 | +  def direct_broadcast(adapter_name, node_name, topic, message, dispatcher) do | 
|  | 53 | +    worker = worker_name(adapter_name, self()) | 
|  | 54 | +    GenRpc.abcast([node_name], worker, forward_to_local(topic, message, dispatcher), key: worker) | 
|  | 55 | +  end | 
|  | 56 | + | 
|  | 57 | +  defp forward_to_local(topic, message, dispatcher), do: {:ftl, topic, message, dispatcher} | 
|  | 58 | +end | 
|  | 59 | + | 
|  | 60 | +defmodule Realtime.GenRpcPubSub.Worker do | 
|  | 61 | +  @moduledoc false | 
|  | 62 | +  use GenServer | 
|  | 63 | + | 
|  | 64 | +  @doc false | 
|  | 65 | +  def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, pubsub, name: worker) | 
|  | 66 | + | 
|  | 67 | +  @impl true | 
|  | 68 | +  def init(pubsub), do: {:ok, pubsub} | 
|  | 69 | + | 
|  | 70 | +  @impl true | 
|  | 71 | +  def handle_info({:ftl, topic, message, dispatcher}, pubsub) do | 
|  | 72 | +    Phoenix.PubSub.local_broadcast(pubsub, topic, message, dispatcher) | 
|  | 73 | +    {:noreply, pubsub} | 
|  | 74 | +  end | 
|  | 75 | + | 
|  | 76 | +  @impl true | 
|  | 77 | +  def handle_info(_, pubsub), do: {:noreply, pubsub} | 
|  | 78 | +end | 
0 commit comments