Skip to content

Commit e2f2a58

Browse files
committed
fix: move DB setup to happen after Connect.init
This change reduces the impact of slow DB setup impacting other tenants trying to connect at the same time that landed on the same partition
1 parent eeba306 commit e2f2a58

File tree

8 files changed

+55
-109
lines changed

8 files changed

+55
-109
lines changed

lib/realtime/syn_handler.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ defmodule Realtime.SynHandler do
1010
@behaviour :syn_event_handler
1111

1212
@impl true
13-
def on_registry_process_updated(Connect, tenant_id, _pid, %{conn: conn}, :normal) when is_pid(conn) do
13+
def on_registry_process_updated(Connect, tenant_id, pid, %{conn: conn}, :normal) when is_pid(conn) do
1414
# Update that a database connection is ready
15-
Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{conn: conn})
15+
Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{pid: pid, conn: conn})
1616
end
1717

1818
def on_registry_process_updated(PostgresCdcRls, tenant_id, _pid, meta, _reason) do
@@ -38,7 +38,7 @@ defmodule Realtime.SynHandler do
3838
end
3939

4040
topic = topic(mod)
41-
Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", nil)
41+
Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", %{pid: pid, reason: reason})
4242

4343
:ok
4444
end

lib/realtime/tenants/connect.ex

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ defmodule Realtime.Tenants.Connect do
1919
alias Realtime.Tenants.Connect.GetTenant
2020
alias Realtime.Tenants.Connect.Piper
2121
alias Realtime.Tenants.Connect.RegisterProcess
22-
alias Realtime.Tenants.Connect.StartCounters
2322
alias Realtime.Tenants.Migrations
2423
alias Realtime.Tenants.ReplicationConnection
2524
alias Realtime.UsersCounter
@@ -83,14 +82,13 @@ defmodule Realtime.Tenants.Connect do
8382
| {:error, :tenant_database_connection_initializing}
8483
def get_status(tenant_id) do
8584
case :syn.lookup(__MODULE__, tenant_id) do
86-
{_pid, %{conn: nil}} ->
87-
wait_for_connection(tenant_id)
85+
{pid, %{conn: nil}} ->
86+
wait_for_connection(pid, tenant_id)
8887

8988
{_, %{conn: conn}} ->
9089
{:ok, conn}
9190

9291
:undefined ->
93-
Logger.warning("Connection process starting up")
9492
{:error, :tenant_database_connection_initializing}
9593

9694
error ->
@@ -101,7 +99,7 @@ defmodule Realtime.Tenants.Connect do
10199

102100
def syn_topic(tenant_id), do: "connect:#{tenant_id}"
103101

104-
defp wait_for_connection(tenant_id) do
102+
defp wait_for_connection(pid, tenant_id) do
105103
RealtimeWeb.Endpoint.subscribe(syn_topic(tenant_id))
106104

107105
# We do a lookup after subscribing because we could've missed a message while subscribing
@@ -112,9 +110,21 @@ defmodule Realtime.Tenants.Connect do
112110
_ ->
113111
# Wait for up to 5 seconds for the ready event
114112
receive do
115-
%{event: "ready", payload: %{conn: conn}} -> {:ok, conn}
113+
%{event: "ready", payload: %{pid: ^pid, conn: conn}} ->
114+
{:ok, conn}
115+
116+
%{event: "connect_down", payload: %{pid: ^pid, reason: {:shutdown, :tenant_db_too_many_connections}}} ->
117+
{:error, :tenant_db_too_many_connections}
118+
119+
%{event: "connect_down", payload: %{pid: ^pid, reason: _reason}} ->
120+
metadata = [external_id: tenant_id, project: tenant_id]
121+
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database", metadata)
122+
{:error, :tenant_database_unavailable}
123+
124+
msg ->
125+
dbg(msg)
116126
after
117-
5_000 -> {:error, :initializing}
127+
15_000 -> {:error, :initializing}
118128
end
119129
end
120130
after
@@ -139,16 +149,6 @@ defmodule Realtime.Tenants.Connect do
139149
{:error, {:already_started, _}} ->
140150
get_status(tenant_id)
141151

142-
{:error, {:shutdown, :tenant_db_too_many_connections}} ->
143-
{:error, :tenant_db_too_many_connections}
144-
145-
{:error, {:shutdown, :tenant_not_found}} ->
146-
{:error, :tenant_not_found}
147-
148-
{:error, :shutdown} ->
149-
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database", metadata)
150-
{:error, :tenant_database_unavailable}
151-
152152
{:error, error} ->
153153
log_error("UnableToConnectToTenantDatabase", error, metadata)
154154
{:error, :tenant_database_unavailable}
@@ -209,30 +209,33 @@ defmodule Realtime.Tenants.Connect do
209209
def init(%{tenant_id: tenant_id} = state) do
210210
Logger.metadata(external_id: tenant_id, project: tenant_id)
211211

212+
{:ok, state, {:continue, :db_connect}}
213+
end
214+
215+
@impl true
216+
def handle_continue(:db_connect, state) do
212217
pipes = [
213218
GetTenant,
214219
CheckConnection,
215-
StartCounters,
216220
RegisterProcess
217221
]
218222

219223
case Piper.run(pipes, state) do
220224
{:ok, acc} ->
221-
{:ok, acc, {:continue, :run_migrations}}
225+
{:noreply, acc, {:continue, :run_migrations}}
222226

223227
{:error, :tenant_not_found} ->
224-
{:stop, {:shutdown, :tenant_not_found}}
228+
{:stop, {:shutdown, :tenant_not_found}, state}
225229

226230
{:error, :tenant_db_too_many_connections} ->
227-
{:stop, {:shutdown, :tenant_db_too_many_connections}}
231+
{:stop, {:shutdown, :tenant_db_too_many_connections}, state}
228232

229233
{:error, error} ->
230234
log_error("UnableToConnectToTenantDatabase", error)
231-
{:stop, :shutdown}
235+
{:stop, :shutdown, state}
232236
end
233237
end
234238

235-
@impl true
236239
def handle_continue(:run_migrations, state) do
237240
%{tenant: tenant, db_conn_pid: db_conn_pid} = state
238241
Logger.warning("Tenant #{tenant.external_id} is initializing: #{inspect(node())}")
@@ -375,6 +378,7 @@ defmodule Realtime.Tenants.Connect do
375378

376379
## Private functions
377380
defp call_external_node(tenant_id, opts) do
381+
Logger.warning("Connection process starting up")
378382
rpc_timeout = Keyword.get(opts, :rpc_timeout, @rpc_timeout_default)
379383

380384
with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id),

lib/realtime/tenants/connect/check_connection.ex

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,14 @@ defmodule Realtime.Tenants.Connect.CheckConnection do
22
@moduledoc """
33
Check tenant database connection.
44
"""
5-
alias Realtime.Database
65

76
@behaviour Realtime.Tenants.Connect.Piper
87
@impl true
98
def run(acc) do
109
%{tenant: tenant} = acc
1110

12-
case Database.check_tenant_connection(tenant) do
11+
case Realtime.Database.check_tenant_connection(tenant) do
1312
{:ok, conn} ->
14-
Process.link(conn)
1513
db_conn_reference = Process.monitor(conn)
1614
{:ok, %{acc | db_conn_pid: conn, db_conn_reference: db_conn_reference}}
1715

lib/realtime/tenants/connect/start_counters.ex

Lines changed: 0 additions & 60 deletions
This file was deleted.

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.48.0",
7+
version: "2.48.1",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -653,8 +653,8 @@ defmodule Realtime.Integration.RtChannelTest do
653653
:syn.update_registry(Connect, tenant.external_id, fn _pid, meta -> %{meta | conn: nil} end)
654654
payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"}
655655
WebsocketClient.send_event(service_role_socket, topic, "broadcast", payload)
656-
# Waiting more than 5 seconds as this is the amount of time we will wait for the Connection to be ready
657-
refute_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 6000
656+
# Waiting more than 15 seconds as this is the amount of time we will wait for the Connection to be ready
657+
refute_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 16000
658658
end)
659659

660660
assert log =~ "UnableToHandleBroadcast"
@@ -831,7 +831,7 @@ defmodule Realtime.Integration.RtChannelTest do
831831

832832
refute_receive %Message{event: "presence_diff"}, 500
833833
# Waiting more than 5 seconds as this is the amount of time we will wait for the Connection to be ready
834-
refute_receive %Message{event: "phx_leave", topic: ^topic}, 6000
834+
refute_receive %Message{event: "phx_leave", topic: ^topic}, 16000
835835
end)
836836

837837
assert log =~ "UnableToHandlePresence"

test/realtime/syn_handler_test.exs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,32 +168,40 @@ defmodule Realtime.SynHandlerTest do
168168

169169
test "it handles :syn_conflict_resolution reason" do
170170
reason = :syn_conflict_resolution
171+
pid = self()
171172

172173
log =
173174
capture_log(fn ->
174-
assert SynHandler.on_process_unregistered(@mod, @name, self(), %{}, reason) == :ok
175+
assert SynHandler.on_process_unregistered(@mod, @name, pid, %{}, reason) == :ok
175176
end)
176177

177178
topic = "#{@topic}:#{@name}"
178179
event = "#{@topic}_down"
179180

180181
assert log =~ "#{@mod} terminated due to syn conflict resolution: #{inspect(@name)} #{inspect(self())}"
181-
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: nil}
182+
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: %{reason: ^reason, pid: ^pid}}
182183
end
183184

184185
test "it handles other reasons" do
185186
reason = :other_reason
187+
pid = self()
186188

187189
log =
188190
capture_log(fn ->
189-
assert SynHandler.on_process_unregistered(@mod, @name, self(), %{}, reason) == :ok
191+
assert SynHandler.on_process_unregistered(@mod, @name, pid, %{}, reason) == :ok
190192
end)
191193

192194
topic = "#{@topic}:#{@name}"
193195
event = "#{@topic}_down"
194196

195197
refute log =~ "#{@mod} terminated: #{inspect(@name)} #{node()}"
196-
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: nil}, 500
198+
199+
assert_receive %Phoenix.Socket.Broadcast{
200+
topic: ^topic,
201+
event: ^event,
202+
payload: %{reason: ^reason, pid: ^pid}
203+
},
204+
500
197205
end
198206
end
199207
end

test/realtime/tenants/connect_test.exs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,31 +78,27 @@ defmodule Realtime.Tenants.ConnectTest do
7878
assert_receive {:ok, ^pid}
7979
end
8080

81-
test "more than 5 seconds passed error out", %{tenant: tenant} do
81+
test "more than 15 seconds passed error out", %{tenant: tenant} do
8282
parent = self()
8383

8484
# Let's slow down Connect starting
8585
expect(Database, :check_tenant_connection, fn t ->
86-
:timer.sleep(5500)
86+
Process.sleep(15500)
8787
call_original(Database, :check_tenant_connection, [t])
8888
end)
8989

9090
connect = fn -> send(parent, Connect.lookup_or_start_connection(tenant.external_id)) end
9191

92-
# Start an early connect
93-
spawn(connect)
94-
:timer.sleep(100)
95-
96-
# Start others
9792
spawn(connect)
9893
spawn(connect)
9994

100-
{:error, :tenant_database_unavailable} = Connect.lookup_or_start_connection(tenant.external_id)
95+
{:error, :initializing} = Connect.lookup_or_start_connection(tenant.external_id)
96+
# The above call waited 15 seconds
97+
assert_receive {:error, :initializing}
98+
assert_receive {:error, :initializing}
10199

102-
# Only one will succeed the others timed out waiting
103-
assert_receive {:error, :tenant_database_unavailable}
104-
assert_receive {:error, :tenant_database_unavailable}
105-
assert_receive {:ok, _pid}, 7000
100+
# This one will succeed
101+
{:ok, _pid} = Connect.lookup_or_start_connection(tenant.external_id)
106102
end
107103
end
108104

0 commit comments

Comments
 (0)