Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
| MAX_GEN_RPC_CLIENTS | number | Max amount of `gen_rpc` TCP connections per node-to-node channel |
| REBALANCE_CHECK_INTERVAL_IN_MS | number | Time in ms to check if process is in the right region |
| DISCONNECT_SOCKET_ON_NO_CHANNELS_INTERVAL_IN_MS | number | Time in ms to check if a socket has no channels open and if so, disconnect it |
| BROADCAST_POOL_SIZE | number | Number of processes to relay Phoenix.PubSub messages across the cluster |


The OpenTelemetry variables mentioned above are not an exhaustive list of all [supported environment variables](https://opentelemetry.io/docs/languages/sdk-configuration/).

Expand Down
4 changes: 3 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ janitor_run_after_in_ms = Env.get_integer("JANITOR_RUN_AFTER_IN_MS", :timer.minu
janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.seconds(5))
janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4))
platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)

no_channel_timeout_in_ms =
if config_env() == :test,
Expand Down Expand Up @@ -120,7 +121,8 @@ config :realtime,
rpc_timeout: rpc_timeout,
max_gen_rpc_clients: max_gen_rpc_clients,
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
platform: platform
platform: platform,
broadcast_pool_size: broadcast_pool_size

if config_env() != :test && run_janitor? do
config :realtime,
Expand Down
4 changes: 3 additions & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ defmodule Realtime.Application do
region = Application.get_env(:realtime, :region)
:syn.join(RegionNodes, region, self(), node: node())

broadcast_pool_size = Application.get_env(:realtime, :broadcast_pool_size, 10)
migration_partition_slots = Application.get_env(:realtime, :migration_partition_slots)
connect_partition_slots = Application.get_env(:realtime, :connect_partition_slots)
no_channel_timeout_in_ms = Application.get_env(:realtime, :no_channel_timeout_in_ms)
Expand All @@ -65,7 +66,8 @@ defmodule Realtime.Application do
Realtime.Repo,
RealtimeWeb.Telemetry,
{Cluster.Supervisor, [topologies, [name: Realtime.ClusterSupervisor]]},
{Phoenix.PubSub, name: Realtime.PubSub, pool_size: 10},
{Phoenix.PubSub,
name: Realtime.PubSub, pool_size: 10, adapter: Realtime.GenRpcPubSub, broadcast_pool_size: broadcast_pool_size},
{Cachex, name: Realtime.RateCounter},
Realtime.Tenants.Cache,
Realtime.RateCounter.DynamicSupervisor,
Expand Down
16 changes: 16 additions & 0 deletions lib/realtime/gen_rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,22 @@ defmodule Realtime.GenRpc do

@type result :: any | {:error, :rpc_error, reason :: any}

@doc """
Broadcasts the message `msg` asynchronously to the registered process `name` on the specified `nodes`.
Options:
- `:key` - Optional key to consistently select the same gen_rpc clients to guarantee message order between nodes
"""
@spec abcast([node], atom, any, keyword()) :: :ok
def abcast(nodes, name, msg, opts) when is_list(nodes) and is_atom(name) and is_list(opts) do
key = Keyword.get(opts, :key, nil)
nodes = rpc_nodes(nodes, key)

:gen_rpc.abcast(nodes, name, msg)
:ok
end

@doc """
Fire and forget apply(mod, func, args) on all nodes
Expand Down
78 changes: 78 additions & 0 deletions lib/realtime/gen_rpc/pub_sub.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule Realtime.GenRpcPubSub do
@moduledoc """
gen_rpc Phoenix.PubSub adapter
"""

@behaviour Phoenix.PubSub.Adapter
alias Realtime.GenRpc
use Supervisor

@impl true
def node_name(_), do: node()

# Supervisor callbacks

def start_link(opts) do
adapter_name = Keyword.fetch!(opts, :adapter_name)
name = Keyword.fetch!(opts, :name)
pool_size = Keyword.get(opts, :pool_size, 1)
broadcast_pool_size = Keyword.get(opts, :broadcast_pool_size, pool_size)

Supervisor.start_link(__MODULE__, {adapter_name, name, broadcast_pool_size},
name: :"#{name}#{adapter_name}_supervisor"
)
end

@impl true
def init({adapter_name, pubsub, pool_size}) do
workers = for number <- 1..pool_size, do: :"#{pubsub}#{adapter_name}_#{number}"

:persistent_term.put(adapter_name, List.to_tuple(workers))

children =
for worker <- workers do
Supervisor.child_spec({Realtime.GenRpcPubSub.Worker, {pubsub, worker}}, id: worker)
end

Supervisor.init(children, strategy: :one_for_one)
end

defp worker_name(adapter_name, key) do
workers = :persistent_term.get(adapter_name)
elem(workers, :erlang.phash2(key, tuple_size(workers)))
end

@impl true
def broadcast(adapter_name, topic, message, dispatcher) do
worker = worker_name(adapter_name, self())
GenRpc.abcast(Node.list(), worker, forward_to_local(topic, message, dispatcher), key: worker)
end

@impl true
def direct_broadcast(adapter_name, node_name, topic, message, dispatcher) do
worker = worker_name(adapter_name, self())
GenRpc.abcast([node_name], worker, forward_to_local(topic, message, dispatcher), key: worker)
end

defp forward_to_local(topic, message, dispatcher), do: {:ftl, topic, message, dispatcher}
end

defmodule Realtime.GenRpcPubSub.Worker do
@moduledoc false
use GenServer

@doc false
def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, pubsub, name: worker)

@impl true
def init(pubsub), do: {:ok, pubsub}

@impl true
def handle_info({:ftl, topic, message, dispatcher}, pubsub) do
Phoenix.PubSub.local_broadcast(pubsub, topic, message, dispatcher)
{:noreply, pubsub}
end

@impl true
def handle_info(_, pubsub), do: {:noreply, pubsub}
end
10 changes: 2 additions & 8 deletions lib/realtime_web/tenant_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule RealtimeWeb.TenantBroadcaster do
def pubsub_broadcast(tenant_id, topic, message, dispatcher) do
collect_payload_size(tenant_id, message)

Realtime.GenRpc.multicast(PubSub, :local_broadcast, [Realtime.PubSub, topic, message, dispatcher], key: topic)
PubSub.broadcast(Realtime.PubSub, topic, message, dispatcher)

:ok
end
Expand All @@ -25,13 +25,7 @@ defmodule RealtimeWeb.TenantBroadcaster do
def pubsub_broadcast_from(tenant_id, from, topic, message, dispatcher) do
collect_payload_size(tenant_id, message)

Realtime.GenRpc.multicast(
PubSub,
:local_broadcast_from,
[Realtime.PubSub, from, topic, message, dispatcher],
key: topic
)

PubSub.broadcast_from(Realtime.PubSub, from, topic, message, dispatcher)
:ok
end

Expand Down
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.49.0",
version: "2.50.0",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down Expand Up @@ -90,7 +90,7 @@ defmodule Realtime.MixProject do
{:opentelemetry_phoenix, "~> 2.0"},
{:opentelemetry_cowboy, "~> 1.0"},
{:opentelemetry_ecto, "~> 1.2"},
{:gen_rpc, git: "https://github.com/supabase/gen_rpc.git", ref: "d161cf263c661a534eaabf80aac7a34484dac772"},
{:gen_rpc, git: "https://github.com/supabase/gen_rpc.git", ref: "5aea098b300a0a6ad13533e030230132cbe9ca2c"},
{:mimic, "~> 1.0", only: :test},
{:floki, ">= 0.30.0", only: :test},
{:mint_web_socket, "~> 1.0", only: :test},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},
"finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"},
"floki": {:hex, :floki, "0.37.0", "b83e0280bbc6372f2a403b2848013650b16640cd2470aea6701f0632223d719e", [:mix], [], "hexpm", "516a0c15a69f78c47dc8e0b9b3724b29608aa6619379f91b1ffa47109b5d0dd3"},
"gen_rpc": {:git, "https://github.com/supabase/gen_rpc.git", "d161cf263c661a534eaabf80aac7a34484dac772", [ref: "d161cf263c661a534eaabf80aac7a34484dac772"]},
"gen_rpc": {:git, "https://github.com/supabase/gen_rpc.git", "5aea098b300a0a6ad13533e030230132cbe9ca2c", [ref: "5aea098b300a0a6ad13533e030230132cbe9ca2c"]},
"gettext": {:hex, :gettext, "0.26.2", "5978aa7b21fada6deabf1f6341ddba50bc69c999e812211903b169799208f2a8", [:mix], [{:expo, "~> 0.5.1 or ~> 1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "aa978504bcf76511efdc22d580ba08e2279caab1066b76bb9aa81c4a1e0a32a5"},
"gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"},
"grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"},
Expand Down
2 changes: 2 additions & 0 deletions test/realtime/gen_rpc_pub_sub_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Application.put_env(:phoenix_pubsub, :test_adapter, {Realtime.GenRpcPubSub, []})
Code.require_file("../../deps/phoenix_pubsub/test/shared/pubsub_test.exs", __DIR__)
33 changes: 33 additions & 0 deletions test/realtime/gen_rpc_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,39 @@ defmodule Realtime.GenRpcTest do
end
end

describe "abcast/4" do
test "abcast to registered process", %{node: node} do
name =
System.unique_integer()
|> to_string()
|> String.to_atom()

:erlang.register(name, self())

# Use erpc to make the other node abcast to this one
:erpc.call(node, GenRpc, :abcast, [[node()], name, "a message", []])

assert_receive "a message"
refute_receive _any
end

@tag extra_config: [{:gen_rpc, :tcp_server_port, 9999}]
test "tcp error" do
Logger.put_process_level(self(), :debug)

log =
capture_log(fn ->
assert GenRpc.abcast(Node.list(), :some_process_name, "a message", []) == :ok
# We have to wait for gen_rpc logs to show up
Process.sleep(100)
end)

assert log =~ "[error] event=connect_to_remote_server"

refute_receive _any
end
end

describe "multicast/4" do
test "evals everywhere" do
parent = self()
Expand Down
Loading