Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.se
janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4))
platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "pg2") |> String.to_atom()
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))

no_channel_timeout_in_ms =
Expand Down Expand Up @@ -124,6 +125,7 @@ config :realtime,
max_gen_rpc_clients: max_gen_rpc_clients,
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
platform: platform,
pubsub_adapter: pubsub_adapter,
broadcast_pool_size: broadcast_pool_size

if config_env() != :test && run_janitor? do
Expand Down
10 changes: 9 additions & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ defmodule Realtime.Application do
RealtimeWeb.Telemetry,
{Cluster.Supervisor, [topologies, [name: Realtime.ClusterSupervisor]]},
{Phoenix.PubSub,
name: Realtime.PubSub, pool_size: 10, adapter: Realtime.GenRpcPubSub, broadcast_pool_size: broadcast_pool_size},
name: Realtime.PubSub, pool_size: 10, adapter: pubsub_adapter(), broadcast_pool_size: broadcast_pool_size},
{Cachex, name: Realtime.RateCounter},
Realtime.Tenants.Cache,
Realtime.RateCounter.DynamicSupervisor,
Expand Down Expand Up @@ -154,4 +154,12 @@ defmodule Realtime.Application do
OpentelemetryPhoenix.setup(adapter: :cowboy2)
OpentelemetryEcto.setup([:realtime, :repo], db_statement: :enabled)
end

defp pubsub_adapter do
if Application.fetch_env!(:realtime, :pubsub_adapter) == :gen_rpc do
Realtime.GenRpcPubSub
else
Phoenix.PubSub.PG2
end
end
end
22 changes: 20 additions & 2 deletions lib/realtime_web/tenant_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ defmodule RealtimeWeb.TenantBroadcaster do
def pubsub_broadcast(tenant_id, topic, message, dispatcher) do
collect_payload_size(tenant_id, message)

PubSub.broadcast(Realtime.PubSub, topic, message, dispatcher)
if pubsub_adapter() == :gen_rpc do
PubSub.broadcast(Realtime.PubSub, topic, message, dispatcher)
else
Realtime.GenRpc.multicast(PubSub, :local_broadcast, [Realtime.PubSub, topic, message, dispatcher], key: topic)
end

:ok
end
Expand All @@ -25,7 +29,17 @@ defmodule RealtimeWeb.TenantBroadcaster do
def pubsub_broadcast_from(tenant_id, from, topic, message, dispatcher) do
collect_payload_size(tenant_id, message)

PubSub.broadcast_from(Realtime.PubSub, from, topic, message, dispatcher)
if pubsub_adapter() == :gen_rpc do
PubSub.broadcast_from(Realtime.PubSub, from, topic, message, dispatcher)
else
Realtime.GenRpc.multicast(
PubSub,
:local_broadcast_from,
[Realtime.PubSub, from, topic, message, dispatcher],
key: topic
)
end

:ok
end

Expand All @@ -39,4 +53,8 @@ defmodule RealtimeWeb.TenantBroadcaster do
defp collect_payload_size(tenant_id, payload) do
:telemetry.execute(@payload_size_event, %{size: :erlang.external_size(payload)}, %{tenant: tenant_id})
end

defp pubsub_adapter do
Application.fetch_env!(:realtime, :pubsub_adapter)
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.51.2",
version: "2.51.3",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
140 changes: 76 additions & 64 deletions test/realtime_web/tenant_broadcaster_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule RealtimeWeb.TenantBroadcasterTest do
# Usage of Clustered
# Usage of Clustered and changing Application env
use Realtime.DataCase, async: false

alias Phoenix.Socket.Broadcast
Expand Down Expand Up @@ -47,95 +47,107 @@ defmodule RealtimeWeb.TenantBroadcasterTest do
pid: self()
)

original = Application.fetch_env!(:realtime, :pubsub_adapter)
on_exit(fn -> Application.put_env(:realtime, :pubsub_adapter, original) end)
Application.put_env(:realtime, :pubsub_adapter, context.pubsub_adapter)

:ok
end

describe "pubsub_broadcast/4" do
test "pubsub_broadcast", %{node: node} do
message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}}
TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)
for pubsub_adapter <- [:gen_rpc, :pg2] do
describe "pubsub_broadcast/4 #{pubsub_adapter}" do
@describetag pubsub_adapter: pubsub_adapter

assert_receive ^message
test "pubsub_broadcast", %{node: node} do
message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}}
TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)

# Remote node received the broadcast
assert_receive {:relay, ^node, ^message}
assert_receive ^message

assert_receive {
:telemetry,
[:realtime, :tenants, :payload, :size],
%{size: 114},
%{tenant: "realtime-dev"}
}
end
# Remote node received the broadcast
assert_receive {:relay, ^node, ^message}

test "pubsub_broadcast list payload", %{node: node} do
message = %Broadcast{topic: @topic, event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)
assert_receive {
:telemetry,
[:realtime, :tenants, :payload, :size],
%{size: 114},
%{tenant: "realtime-dev"}
}
end

assert_receive ^message
test "pubsub_broadcast list payload", %{node: node} do
message = %Broadcast{topic: @topic, event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)

# Remote node received the broadcast
assert_receive {:relay, ^node, ^message}
assert_receive ^message

assert_receive {
:telemetry,
[:realtime, :tenants, :payload, :size],
%{size: 130},
%{tenant: "realtime-dev"}
}
end
# Remote node received the broadcast
assert_receive {:relay, ^node, ^message}

test "pubsub_broadcast string payload", %{node: node} do
message = %Broadcast{topic: @topic, event: "an event", payload: "some text payload"}
TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)
assert_receive {
:telemetry,
[:realtime, :tenants, :payload, :size],
%{size: 130},
%{tenant: "realtime-dev"}
}
end

assert_receive ^message
test "pubsub_broadcast string payload", %{node: node} do
message = %Broadcast{topic: @topic, event: "an event", payload: "some text payload"}
TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub)

# Remote node received the broadcast
assert_receive {:relay, ^node, ^message}
assert_receive ^message

assert_receive {
:telemetry,
[:realtime, :tenants, :payload, :size],
%{size: 119},
%{tenant: "realtime-dev"}
}
# Remote node received the broadcast
assert_receive {:relay, ^node, ^message}

assert_receive {
:telemetry,
[:realtime, :tenants, :payload, :size],
%{size: 119},
%{tenant: "realtime-dev"}
}
end
end
end

describe "pubsub_broadcast_from/5" do
test "pubsub_broadcast_from", %{node: node} do
parent = self()
for pubsub_adapter <- [:gen_rpc, :pg2] do
describe "pubsub_broadcast_from/5 #{pubsub_adapter}" do
@describetag pubsub_adapter: pubsub_adapter

test "pubsub_broadcast_from", %{node: node} do
parent = self()

spawn_link(fn ->
Endpoint.subscribe(@topic)
send(parent, :ready)
spawn_link(fn ->
Endpoint.subscribe(@topic)
send(parent, :ready)

receive do
msg -> send(parent, {:other_process, msg})
end
end)
receive do
msg -> send(parent, {:other_process, msg})
end
end)

assert_receive :ready
assert_receive :ready

message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}}
message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}}

TenantBroadcaster.pubsub_broadcast_from("realtime-dev", self(), @topic, message, Phoenix.PubSub)
TenantBroadcaster.pubsub_broadcast_from("realtime-dev", self(), @topic, message, Phoenix.PubSub)

assert_receive {:other_process, ^message}
assert_receive {:other_process, ^message}

# Remote node received the broadcast
assert_receive {:relay, ^node, ^message}
# Remote node received the broadcast
assert_receive {:relay, ^node, ^message}

assert_receive {
:telemetry,
[:realtime, :tenants, :payload, :size],
%{size: 114},
%{tenant: "realtime-dev"}
}
assert_receive {
:telemetry,
[:realtime, :tenants, :payload, :size],
%{size: 114},
%{tenant: "realtime-dev"}
}

# This process does not receive the message
refute_receive _any
# This process does not receive the message
refute_receive _any
end
end
end

Expand Down
Loading