Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
topic = "realtime:postgres:" <> tenant_id

RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher)
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
end

{:ok, rows_count}
Expand Down
7 changes: 4 additions & 3 deletions lib/realtime/monitoring/prom_ex/plugins/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,21 @@ defmodule Realtime.PromEx.Plugins.Tenant do
event_name: [:realtime, :tenants, :payload, :size],
measurement: :size,
description: "Tenant payload size",
tags: [:tenant],
tags: [:tenant, :message_type],
unit: :byte,
reporter_options: [
buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000]
buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000]
]
),
distribution(
[:realtime, :payload, :size],
event_name: [:realtime, :tenants, :payload, :size],
measurement: :size,
description: "Payload size",
tags: [:message_type],
unit: :byte,
reporter_options: [
buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000]
buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000]
]
)
]
Expand Down
9 changes: 8 additions & 1 deletion lib/realtime/tenants/batch_broadcast.ex
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,14 @@ defmodule Realtime.Tenants.BatchBroadcast do
broadcast = %Phoenix.Socket.Broadcast{topic: message.topic, event: @event_type, payload: payload}

GenCounter.add(events_per_second_rate.id)
TenantBroadcaster.pubsub_broadcast(tenant.external_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)

TenantBroadcaster.pubsub_broadcast(
tenant.external_id,
tenant_topic,
broadcast,
RealtimeChannel.MessageDispatcher,
:broadcast
)
end

defp permissions_for_message(_, nil, _), do: nil
Expand Down
11 changes: 9 additions & 2 deletions lib/realtime_web/channels/realtime_channel/broadcast_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,21 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
broadcast = %Phoenix.Socket.Broadcast{topic: tenant_topic, event: @event_type, payload: payload}

if self_broadcast do
TenantBroadcaster.pubsub_broadcast(tenant_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
TenantBroadcaster.pubsub_broadcast(
tenant_id,
tenant_topic,
broadcast,
RealtimeChannel.MessageDispatcher,
:broadcast
)
else
TenantBroadcaster.pubsub_broadcast_from(
tenant_id,
self(),
tenant_topic,
broadcast,
RealtimeChannel.MessageDispatcher
RealtimeChannel.MessageDispatcher,
:broadcast
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do

%{assigns: %{presence_key: presence_key, tenant_topic: tenant_topic}} = socket
payload = Map.get(payload, "payload", %{})
RealtimeWeb.TenantBroadcaster.collect_payload_size(socket.assigns.tenant, payload, :presence)

with :ok <- limit_presence_event(socket),
{:ok, _} <- Presence.track(self(), tenant_topic, presence_key, payload) do
Expand Down
32 changes: 19 additions & 13 deletions lib/realtime_web/tenant_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ defmodule RealtimeWeb.TenantBroadcaster do

alias Phoenix.PubSub

@spec pubsub_broadcast(tenant_id :: String.t(), PubSub.topic(), PubSub.message(), PubSub.dispatcher()) :: :ok
def pubsub_broadcast(tenant_id, topic, message, dispatcher) do
collect_payload_size(tenant_id, message)
@type message_type :: :broadcast | :presence | :postgres_changes

@spec pubsub_broadcast(tenant_id :: String.t(), PubSub.topic(), PubSub.message(), PubSub.dispatcher(), message_type) ::
:ok
def pubsub_broadcast(tenant_id, topic, message, dispatcher, message_type) do
collect_payload_size(tenant_id, message, message_type)

if pubsub_adapter() == :gen_rpc do
PubSub.broadcast(Realtime.PubSub, topic, message, dispatcher)
Expand All @@ -23,11 +26,12 @@ defmodule RealtimeWeb.TenantBroadcaster do
from :: pid,
PubSub.topic(),
PubSub.message(),
PubSub.dispatcher()
PubSub.dispatcher(),
message_type
) ::
:ok
def pubsub_broadcast_from(tenant_id, from, topic, message, dispatcher) do
collect_payload_size(tenant_id, message)
def pubsub_broadcast_from(tenant_id, from, topic, message, dispatcher, message_type) do
collect_payload_size(tenant_id, message, message_type)

if pubsub_adapter() == :gen_rpc do
PubSub.broadcast_from(Realtime.PubSub, from, topic, message, dispatcher)
Expand All @@ -45,16 +49,18 @@ defmodule RealtimeWeb.TenantBroadcaster do

@payload_size_event [:realtime, :tenants, :payload, :size]

defp collect_payload_size(tenant_id, payload) when is_struct(payload) do
@spec collect_payload_size(tenant_id :: String.t(), payload :: term, message_type :: message_type) :: :ok
def collect_payload_size(tenant_id, payload, message_type) when is_struct(payload) do
# Extracting from struct so the __struct__ bit is not calculated as part of the payload
collect_payload_size(tenant_id, Map.from_struct(payload))
collect_payload_size(tenant_id, Map.from_struct(payload), message_type)
end

defp collect_payload_size(tenant_id, payload) do
:telemetry.execute(@payload_size_event, %{size: :erlang.external_size(payload)}, %{tenant: tenant_id})
def collect_payload_size(tenant_id, payload, message_type) do
:telemetry.execute(@payload_size_event, %{size: :erlang.external_size(payload)}, %{
tenant: tenant_id,
message_type: message_type
})
end

defp pubsub_adapter do
Application.fetch_env!(:realtime, :pubsub_adapter)
end
defp pubsub_adapter, do: Application.fetch_env!(:realtime, :pubsub_adapter)
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.51.15",
version: "2.52.0",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
18 changes: 18 additions & 0 deletions test/realtime/extensions/cdc_rls/cdc_rls_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@ defmodule Realtime.Extensions.CdcRlsTest do

RateCounter.stop(tenant.external_id)

on_exit(fn -> :telemetry.detach(__MODULE__) end)

:telemetry.attach(
__MODULE__,
[:realtime, :tenants, :payload, :size],
&__MODULE__.handle_telemetry/4,
pid: self()
)

%{tenant: tenant, conn: conn}
end

Expand Down Expand Up @@ -317,6 +326,13 @@ defmodule Realtime.Extensions.CdcRlsTest do

assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate)
assert 1 in bucket

assert_receive {
:telemetry,
[:realtime, :tenants, :payload, :size],
%{size: 341},
%{tenant: "dev_tenant", message_type: :postgres_changes}
}
end

@aux_mod (quote do
Expand Down Expand Up @@ -414,4 +430,6 @@ defmodule Realtime.Extensions.CdcRlsTest do
:erpc.call(node, PostgresCdcRls, :handle_stop, [tenant.external_id, 10_000])
end
end

def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {:telemetry, event, measures, metadata})
end
12 changes: 6 additions & 6 deletions test/realtime/monitoring/prom_ex/plugins/tenant_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -262,36 +262,36 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
external_id = context.tenant.external_id

pattern =
~r/realtime_tenants_payload_size_count{tenant="#{external_id}"}\s(?<number>\d+)/
~r/realtime_tenants_payload_size_count{message_type=\"presence\",tenant="#{external_id}"}\s(?<number>\d+)/

metric_value = metric_value(pattern)

message = %{topic: "a topic", event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub)
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub, :presence)

Process.sleep(200)
assert metric_value(pattern) == metric_value + 1

bucket_pattern =
~r/realtime_tenants_payload_size_bucket{tenant="#{external_id}",le="100"}\s(?<number>\d+)/
~r/realtime_tenants_payload_size_bucket{message_type=\"presence\",tenant="#{external_id}",le="250"}\s(?<number>\d+)/

assert metric_value(bucket_pattern) > 0
end

test "global metric payload size", context do
external_id = context.tenant.external_id

pattern = ~r/realtime_payload_size_count\s(?<number>\d+)/
pattern = ~r/realtime_payload_size_count{message_type=\"broadcast\"}\s(?<number>\d+)/

metric_value = metric_value(pattern)

message = %{topic: "a topic", event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub)
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub, :broadcast)

Process.sleep(200)
assert metric_value(pattern) == metric_value + 1

bucket_pattern = ~r/realtime_payload_size_bucket{le="100"}\s(?<number>\d+)/
bucket_pattern = ~r/realtime_payload_size_bucket{message_type=\"broadcast\",le="250"}\s(?<number>\d+)/

assert metric_value(bucket_pattern) > 0
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,25 +100,41 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
end

describe "handle/3" do
setup do
on_exit(fn -> :telemetry.detach(__MODULE__) end)

:telemetry.attach(
__MODULE__,
[:realtime, :tenants, :payload, :size],
&__MODULE__.handle_telemetry/4,
pid: self()
)
end

test "with true policy and is private, user can track their presence and changes", %{
tenant: tenant,
topic: topic,
db_conn: db_conn
} do
external_id = tenant.external_id
key = random_string()
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}

socket =
socket_fixture(tenant, topic, key, policies: policies)

PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
PresenceHandler.handle(%{"event" => "track", "payload" => %{"A" => "b", "c" => "b"}}, db_conn, socket)
topic = socket.assigns.tenant_topic

assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}}
assert Map.has_key?(joins, key)

assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 30},
%{tenant: ^external_id, message_type: :presence}}
end

test "when tracking already existing user, metadata updated", %{tenant: tenant, topic: topic, db_conn: db_conn} do
external_id = tenant.external_id
key = random_string()
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}
socket = socket_fixture(tenant, topic, key, policies: policies)
Expand All @@ -134,10 +150,18 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do

assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}}
assert Map.has_key?(joins, key)

assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 6},
%{tenant: ^external_id, message_type: :presence}}

assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 55},
%{tenant: ^external_id, message_type: :presence}}

refute_receive :_
end

test "with false policy and is public, user can track their presence and changes", %{tenant: tenant, topic: topic} do
external_id = tenant.external_id
key = random_string()
policies = %Policies{presence: %PresencePolicies{read: false, write: false}}
socket = socket_fixture(tenant, topic, key, policies: policies, private?: false)
Expand All @@ -147,6 +171,9 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
topic = socket.assigns.tenant_topic
assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}}
assert Map.has_key?(joins, key)

assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 6},
%{tenant: ^external_id, message_type: :presence}}
end

test "user can untrack when they want", %{tenant: tenant, topic: topic, db_conn: db_conn} do
Expand Down Expand Up @@ -518,4 +545,6 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
}
}
end

def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {:telemetry, event, measures, metadata})
end
Loading
Loading