Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/realtime/syn_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ defmodule Realtime.SynHandler do
@behaviour :syn_event_handler

@impl true
def on_registry_process_updated(Connect, tenant_id, _pid, %{conn: conn}, :normal) when is_pid(conn) do
def on_registry_process_updated(Connect, tenant_id, pid, %{conn: conn}, :normal) when is_pid(conn) do
# Update that a database connection is ready
Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{conn: conn})
Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{pid: pid, conn: conn})
end

def on_registry_process_updated(PostgresCdcRls, tenant_id, _pid, meta, _reason) do
Expand All @@ -38,7 +38,7 @@ defmodule Realtime.SynHandler do
end

topic = topic(mod)
Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", nil)
Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", %{pid: pid, reason: reason})

:ok
end
Expand Down
47 changes: 24 additions & 23 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ defmodule Realtime.Tenants.Connect do
alias Realtime.Tenants.Connect.GetTenant
alias Realtime.Tenants.Connect.Piper
alias Realtime.Tenants.Connect.RegisterProcess
alias Realtime.Tenants.Connect.StartCounters
alias Realtime.Tenants.Migrations
alias Realtime.Tenants.ReplicationConnection
alias Realtime.UsersCounter
Expand Down Expand Up @@ -83,14 +82,13 @@ defmodule Realtime.Tenants.Connect do
| {:error, :tenant_database_connection_initializing}
def get_status(tenant_id) do
case :syn.lookup(__MODULE__, tenant_id) do
{_pid, %{conn: nil}} ->
wait_for_connection(tenant_id)
{pid, %{conn: nil}} ->
wait_for_connection(pid, tenant_id)

{_, %{conn: conn}} ->
{:ok, conn}

:undefined ->
Logger.warning("Connection process starting up")
{:error, :tenant_database_connection_initializing}

error ->
Expand All @@ -101,7 +99,7 @@ defmodule Realtime.Tenants.Connect do

def syn_topic(tenant_id), do: "connect:#{tenant_id}"

defp wait_for_connection(tenant_id) do
defp wait_for_connection(pid, tenant_id) do
RealtimeWeb.Endpoint.subscribe(syn_topic(tenant_id))

# We do a lookup after subscribing because we could've missed a message while subscribing
Expand All @@ -112,9 +110,18 @@ defmodule Realtime.Tenants.Connect do
_ ->
# Wait for up to 5 seconds for the ready event
receive do
%{event: "ready", payload: %{conn: conn}} -> {:ok, conn}
%{event: "ready", payload: %{pid: ^pid, conn: conn}} ->
{:ok, conn}

%{event: "connect_down", payload: %{pid: ^pid, reason: {:shutdown, :tenant_db_too_many_connections}}} ->
{:error, :tenant_db_too_many_connections}

%{event: "connect_down", payload: %{pid: ^pid, reason: _reason}} ->
metadata = [external_id: tenant_id, project: tenant_id]
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database", metadata)
{:error, :tenant_database_unavailable}
after
5_000 -> {:error, :initializing}
15_000 -> {:error, :initializing}
end
end
after
Expand All @@ -139,16 +146,6 @@ defmodule Realtime.Tenants.Connect do
{:error, {:already_started, _}} ->
get_status(tenant_id)

{:error, {:shutdown, :tenant_db_too_many_connections}} ->
{:error, :tenant_db_too_many_connections}

{:error, {:shutdown, :tenant_not_found}} ->
{:error, :tenant_not_found}

{:error, :shutdown} ->
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database", metadata)
{:error, :tenant_database_unavailable}

{:error, error} ->
log_error("UnableToConnectToTenantDatabase", error, metadata)
{:error, :tenant_database_unavailable}
Expand Down Expand Up @@ -209,30 +206,33 @@ defmodule Realtime.Tenants.Connect do
def init(%{tenant_id: tenant_id} = state) do
Logger.metadata(external_id: tenant_id, project: tenant_id)

{:ok, state, {:continue, :db_connect}}
end

@impl true
def handle_continue(:db_connect, state) do
pipes = [
GetTenant,
CheckConnection,
StartCounters,
RegisterProcess
]

case Piper.run(pipes, state) do
{:ok, acc} ->
{:ok, acc, {:continue, :run_migrations}}
{:noreply, acc, {:continue, :run_migrations}}

{:error, :tenant_not_found} ->
{:stop, {:shutdown, :tenant_not_found}}
{:stop, {:shutdown, :tenant_not_found}, state}

{:error, :tenant_db_too_many_connections} ->
{:stop, {:shutdown, :tenant_db_too_many_connections}}
{:stop, {:shutdown, :tenant_db_too_many_connections}, state}

{:error, error} ->
log_error("UnableToConnectToTenantDatabase", error)
{:stop, :shutdown}
{:stop, :shutdown, state}
end
end

@impl true
def handle_continue(:run_migrations, state) do
%{tenant: tenant, db_conn_pid: db_conn_pid} = state
Logger.warning("Tenant #{tenant.external_id} is initializing: #{inspect(node())}")
Expand Down Expand Up @@ -375,6 +375,7 @@ defmodule Realtime.Tenants.Connect do

## Private functions
defp call_external_node(tenant_id, opts) do
Logger.warning("Connection process starting up")
rpc_timeout = Keyword.get(opts, :rpc_timeout, @rpc_timeout_default)

with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id),
Expand Down
4 changes: 1 addition & 3 deletions lib/realtime/tenants/connect/check_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ defmodule Realtime.Tenants.Connect.CheckConnection do
@moduledoc """
Check tenant database connection.
"""
alias Realtime.Database

@behaviour Realtime.Tenants.Connect.Piper
@impl true
def run(acc) do
%{tenant: tenant} = acc

case Database.check_tenant_connection(tenant) do
case Realtime.Database.check_tenant_connection(tenant) do
{:ok, conn} ->
Process.link(conn)
Copy link
Member Author

@edgurgel edgurgel Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to link as Database.check_tenant_connection calls Postgrex.start_link which links the process.

db_conn_reference = Process.monitor(conn)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact this Process.monitor is also not doing much given that they are linked. Most likely won't have time to react to the DOWN message as the linked process will crash Connect

{:ok, %{acc | db_conn_pid: conn, db_conn_reference: db_conn_reference}}

Expand Down
60 changes: 0 additions & 60 deletions lib/realtime/tenants/connect/start_counters.ex

This file was deleted.

2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.48.0",
version: "2.48.1",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
6 changes: 3 additions & 3 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,8 @@ defmodule Realtime.Integration.RtChannelTest do
:syn.update_registry(Connect, tenant.external_id, fn _pid, meta -> %{meta | conn: nil} end)
payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"}
WebsocketClient.send_event(service_role_socket, topic, "broadcast", payload)
# Waiting more than 5 seconds as this is the amount of time we will wait for the Connection to be ready
refute_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 6000
# Waiting more than 15 seconds as this is the amount of time we will wait for the Connection to be ready
refute_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 16000
end)

assert log =~ "UnableToHandleBroadcast"
Expand Down Expand Up @@ -831,7 +831,7 @@ defmodule Realtime.Integration.RtChannelTest do

refute_receive %Message{event: "presence_diff"}, 500
# Waiting more than 5 seconds as this is the amount of time we will wait for the Connection to be ready
refute_receive %Message{event: "phx_leave", topic: ^topic}, 6000
refute_receive %Message{event: "phx_leave", topic: ^topic}, 16000
end)

assert log =~ "UnableToHandlePresence"
Expand Down
16 changes: 12 additions & 4 deletions test/realtime/syn_handler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -168,32 +168,40 @@ defmodule Realtime.SynHandlerTest do

test "it handles :syn_conflict_resolution reason" do
reason = :syn_conflict_resolution
pid = self()

log =
capture_log(fn ->
assert SynHandler.on_process_unregistered(@mod, @name, self(), %{}, reason) == :ok
assert SynHandler.on_process_unregistered(@mod, @name, pid, %{}, reason) == :ok
end)

topic = "#{@topic}:#{@name}"
event = "#{@topic}_down"

assert log =~ "#{@mod} terminated due to syn conflict resolution: #{inspect(@name)} #{inspect(self())}"
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: nil}
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: %{reason: ^reason, pid: ^pid}}
end

test "it handles other reasons" do
reason = :other_reason
pid = self()

log =
capture_log(fn ->
assert SynHandler.on_process_unregistered(@mod, @name, self(), %{}, reason) == :ok
assert SynHandler.on_process_unregistered(@mod, @name, pid, %{}, reason) == :ok
end)

topic = "#{@topic}:#{@name}"
event = "#{@topic}_down"

refute log =~ "#{@mod} terminated: #{inspect(@name)} #{node()}"
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: nil}, 500

assert_receive %Phoenix.Socket.Broadcast{
topic: ^topic,
event: ^event,
payload: %{reason: ^reason, pid: ^pid}
},
500
end
end
end
20 changes: 8 additions & 12 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -78,31 +78,27 @@ defmodule Realtime.Tenants.ConnectTest do
assert_receive {:ok, ^pid}
end

test "more than 5 seconds passed error out", %{tenant: tenant} do
test "more than 15 seconds passed error out", %{tenant: tenant} do
parent = self()

# Let's slow down Connect starting
expect(Database, :check_tenant_connection, fn t ->
:timer.sleep(5500)
Process.sleep(15500)
call_original(Database, :check_tenant_connection, [t])
end)

connect = fn -> send(parent, Connect.lookup_or_start_connection(tenant.external_id)) end

# Start an early connect
spawn(connect)
:timer.sleep(100)

# Start others
spawn(connect)
spawn(connect)

{:error, :tenant_database_unavailable} = Connect.lookup_or_start_connection(tenant.external_id)
{:error, :initializing} = Connect.lookup_or_start_connection(tenant.external_id)
# The above call waited 15 seconds
assert_receive {:error, :initializing}
assert_receive {:error, :initializing}

# Only one will succeed the others timed out waiting
assert_receive {:error, :tenant_database_unavailable}
assert_receive {:error, :tenant_database_unavailable}
assert_receive {:ok, _pid}, 7000
# This one will succeed
{:ok, _pid} = Connect.lookup_or_start_connection(tenant.external_id)
end
end

Expand Down
Loading