Skip to content

Commit 98ba205

Browse files
committed
fix: move DB setup to happen after Connect.init
This change reduces the impact of slow DB setup impacting other tenants trying to connect at the same time that landed on the same partition
1 parent eeba306 commit 98ba205

File tree

8 files changed

+52
-109
lines changed

8 files changed

+52
-109
lines changed

lib/realtime/syn_handler.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ defmodule Realtime.SynHandler do
1010
@behaviour :syn_event_handler
1111

1212
@impl true
13-
def on_registry_process_updated(Connect, tenant_id, _pid, %{conn: conn}, :normal) when is_pid(conn) do
13+
def on_registry_process_updated(Connect, tenant_id, pid, %{conn: conn}, :normal) when is_pid(conn) do
1414
# Update that a database connection is ready
15-
Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{conn: conn})
15+
Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{pid: pid, conn: conn})
1616
end
1717

1818
def on_registry_process_updated(PostgresCdcRls, tenant_id, _pid, meta, _reason) do
@@ -38,7 +38,7 @@ defmodule Realtime.SynHandler do
3838
end
3939

4040
topic = topic(mod)
41-
Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", nil)
41+
Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", %{pid: pid, reason: reason})
4242

4343
:ok
4444
end

lib/realtime/tenants/connect.ex

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ defmodule Realtime.Tenants.Connect do
1919
alias Realtime.Tenants.Connect.GetTenant
2020
alias Realtime.Tenants.Connect.Piper
2121
alias Realtime.Tenants.Connect.RegisterProcess
22-
alias Realtime.Tenants.Connect.StartCounters
2322
alias Realtime.Tenants.Migrations
2423
alias Realtime.Tenants.ReplicationConnection
2524
alias Realtime.UsersCounter
@@ -83,14 +82,13 @@ defmodule Realtime.Tenants.Connect do
8382
| {:error, :tenant_database_connection_initializing}
8483
def get_status(tenant_id) do
8584
case :syn.lookup(__MODULE__, tenant_id) do
86-
{_pid, %{conn: nil}} ->
87-
wait_for_connection(tenant_id)
85+
{pid, %{conn: nil}} ->
86+
wait_for_connection(pid, tenant_id)
8887

8988
{_, %{conn: conn}} ->
9089
{:ok, conn}
9190

9291
:undefined ->
93-
Logger.warning("Connection process starting up")
9492
{:error, :tenant_database_connection_initializing}
9593

9694
error ->
@@ -101,7 +99,7 @@ defmodule Realtime.Tenants.Connect do
10199

102100
def syn_topic(tenant_id), do: "connect:#{tenant_id}"
103101

104-
defp wait_for_connection(tenant_id) do
102+
defp wait_for_connection(pid, tenant_id) do
105103
RealtimeWeb.Endpoint.subscribe(syn_topic(tenant_id))
106104

107105
# We do a lookup after subscribing because we could've missed a message while subscribing
@@ -112,9 +110,18 @@ defmodule Realtime.Tenants.Connect do
112110
_ ->
113111
# Wait for up to 5 seconds for the ready event
114112
receive do
115-
%{event: "ready", payload: %{conn: conn}} -> {:ok, conn}
113+
%{event: "ready", payload: %{pid: ^pid, conn: conn}} ->
114+
{:ok, conn}
115+
116+
%{event: "connect_down", payload: %{pid: ^pid, reason: {:shutdown, :tenant_db_too_many_connections}}} ->
117+
{:error, :tenant_db_too_many_connections}
118+
119+
%{event: "connect_down", payload: %{pid: ^pid, reason: _reason}} ->
120+
metadata = [external_id: tenant_id, project: tenant_id]
121+
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database", metadata)
122+
{:error, :tenant_database_unavailable}
116123
after
117-
5_000 -> {:error, :initializing}
124+
15_000 -> {:error, :initializing}
118125
end
119126
end
120127
after
@@ -139,16 +146,6 @@ defmodule Realtime.Tenants.Connect do
139146
{:error, {:already_started, _}} ->
140147
get_status(tenant_id)
141148

142-
{:error, {:shutdown, :tenant_db_too_many_connections}} ->
143-
{:error, :tenant_db_too_many_connections}
144-
145-
{:error, {:shutdown, :tenant_not_found}} ->
146-
{:error, :tenant_not_found}
147-
148-
{:error, :shutdown} ->
149-
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database", metadata)
150-
{:error, :tenant_database_unavailable}
151-
152149
{:error, error} ->
153150
log_error("UnableToConnectToTenantDatabase", error, metadata)
154151
{:error, :tenant_database_unavailable}
@@ -209,30 +206,33 @@ defmodule Realtime.Tenants.Connect do
209206
def init(%{tenant_id: tenant_id} = state) do
210207
Logger.metadata(external_id: tenant_id, project: tenant_id)
211208

209+
{:ok, state, {:continue, :db_connect}}
210+
end
211+
212+
@impl true
213+
def handle_continue(:db_connect, state) do
212214
pipes = [
213215
GetTenant,
214216
CheckConnection,
215-
StartCounters,
216217
RegisterProcess
217218
]
218219

219220
case Piper.run(pipes, state) do
220221
{:ok, acc} ->
221-
{:ok, acc, {:continue, :run_migrations}}
222+
{:noreply, acc, {:continue, :run_migrations}}
222223

223224
{:error, :tenant_not_found} ->
224-
{:stop, {:shutdown, :tenant_not_found}}
225+
{:stop, {:shutdown, :tenant_not_found}, state}
225226

226227
{:error, :tenant_db_too_many_connections} ->
227-
{:stop, {:shutdown, :tenant_db_too_many_connections}}
228+
{:stop, {:shutdown, :tenant_db_too_many_connections}, state}
228229

229230
{:error, error} ->
230231
log_error("UnableToConnectToTenantDatabase", error)
231-
{:stop, :shutdown}
232+
{:stop, :shutdown, state}
232233
end
233234
end
234235

235-
@impl true
236236
def handle_continue(:run_migrations, state) do
237237
%{tenant: tenant, db_conn_pid: db_conn_pid} = state
238238
Logger.warning("Tenant #{tenant.external_id} is initializing: #{inspect(node())}")
@@ -375,6 +375,7 @@ defmodule Realtime.Tenants.Connect do
375375

376376
## Private functions
377377
defp call_external_node(tenant_id, opts) do
378+
Logger.warning("Connection process starting up")
378379
rpc_timeout = Keyword.get(opts, :rpc_timeout, @rpc_timeout_default)
379380

380381
with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id),

lib/realtime/tenants/connect/check_connection.ex

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,14 @@ defmodule Realtime.Tenants.Connect.CheckConnection do
22
@moduledoc """
33
Check tenant database connection.
44
"""
5-
alias Realtime.Database
65

76
@behaviour Realtime.Tenants.Connect.Piper
87
@impl true
98
def run(acc) do
109
%{tenant: tenant} = acc
1110

12-
case Database.check_tenant_connection(tenant) do
11+
case Realtime.Database.check_tenant_connection(tenant) do
1312
{:ok, conn} ->
14-
Process.link(conn)
1513
db_conn_reference = Process.monitor(conn)
1614
{:ok, %{acc | db_conn_pid: conn, db_conn_reference: db_conn_reference}}
1715

lib/realtime/tenants/connect/start_counters.ex

Lines changed: 0 additions & 60 deletions
This file was deleted.

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.48.0",
7+
version: "2.48.1",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -653,8 +653,8 @@ defmodule Realtime.Integration.RtChannelTest do
653653
:syn.update_registry(Connect, tenant.external_id, fn _pid, meta -> %{meta | conn: nil} end)
654654
payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"}
655655
WebsocketClient.send_event(service_role_socket, topic, "broadcast", payload)
656-
# Waiting more than 5 seconds as this is the amount of time we will wait for the Connection to be ready
657-
refute_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 6000
656+
# Waiting more than 15 seconds as this is the amount of time we will wait for the Connection to be ready
657+
refute_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 16000
658658
end)
659659

660660
assert log =~ "UnableToHandleBroadcast"
@@ -831,7 +831,7 @@ defmodule Realtime.Integration.RtChannelTest do
831831

832832
refute_receive %Message{event: "presence_diff"}, 500
833833
# Waiting more than 5 seconds as this is the amount of time we will wait for the Connection to be ready
834-
refute_receive %Message{event: "phx_leave", topic: ^topic}, 6000
834+
refute_receive %Message{event: "phx_leave", topic: ^topic}, 16000
835835
end)
836836

837837
assert log =~ "UnableToHandlePresence"

test/realtime/syn_handler_test.exs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,32 +168,40 @@ defmodule Realtime.SynHandlerTest do
168168

169169
test "it handles :syn_conflict_resolution reason" do
170170
reason = :syn_conflict_resolution
171+
pid = self()
171172

172173
log =
173174
capture_log(fn ->
174-
assert SynHandler.on_process_unregistered(@mod, @name, self(), %{}, reason) == :ok
175+
assert SynHandler.on_process_unregistered(@mod, @name, pid, %{}, reason) == :ok
175176
end)
176177

177178
topic = "#{@topic}:#{@name}"
178179
event = "#{@topic}_down"
179180

180181
assert log =~ "#{@mod} terminated due to syn conflict resolution: #{inspect(@name)} #{inspect(self())}"
181-
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: nil}
182+
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: %{reason: ^reason, pid: ^pid}}
182183
end
183184

184185
test "it handles other reasons" do
185186
reason = :other_reason
187+
pid = self()
186188

187189
log =
188190
capture_log(fn ->
189-
assert SynHandler.on_process_unregistered(@mod, @name, self(), %{}, reason) == :ok
191+
assert SynHandler.on_process_unregistered(@mod, @name, pid, %{}, reason) == :ok
190192
end)
191193

192194
topic = "#{@topic}:#{@name}"
193195
event = "#{@topic}_down"
194196

195197
refute log =~ "#{@mod} terminated: #{inspect(@name)} #{node()}"
196-
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: nil}, 500
198+
199+
assert_receive %Phoenix.Socket.Broadcast{
200+
topic: ^topic,
201+
event: ^event,
202+
payload: %{reason: ^reason, pid: ^pid}
203+
},
204+
500
197205
end
198206
end
199207
end

test/realtime/tenants/connect_test.exs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,31 +78,27 @@ defmodule Realtime.Tenants.ConnectTest do
7878
assert_receive {:ok, ^pid}
7979
end
8080

81-
test "more than 5 seconds passed error out", %{tenant: tenant} do
81+
test "more than 15 seconds passed error out", %{tenant: tenant} do
8282
parent = self()
8383

8484
# Let's slow down Connect starting
8585
expect(Database, :check_tenant_connection, fn t ->
86-
:timer.sleep(5500)
86+
Process.sleep(15500)
8787
call_original(Database, :check_tenant_connection, [t])
8888
end)
8989

9090
connect = fn -> send(parent, Connect.lookup_or_start_connection(tenant.external_id)) end
9191

92-
# Start an early connect
93-
spawn(connect)
94-
:timer.sleep(100)
95-
96-
# Start others
9792
spawn(connect)
9893
spawn(connect)
9994

100-
{:error, :tenant_database_unavailable} = Connect.lookup_or_start_connection(tenant.external_id)
95+
{:error, :initializing} = Connect.lookup_or_start_connection(tenant.external_id)
96+
# The above call waited 15 seconds
97+
assert_receive {:error, :initializing}
98+
assert_receive {:error, :initializing}
10199

102-
# Only one will succeed the others timed out waiting
103-
assert_receive {:error, :tenant_database_unavailable}
104-
assert_receive {:error, :tenant_database_unavailable}
105-
assert_receive {:ok, _pid}, 7000
100+
# This one will succeed
101+
{:ok, _pid} = Connect.lookup_or_start_connection(tenant.external_id)
106102
end
107103
end
108104

0 commit comments

Comments
 (0)