Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ defmodule Realtime.Tenants do

@doc """
Checks if a tenant is healthy. A tenant is healthy if:
- Tenant has no db connection and zero client connetions
- Tenant has no db connection and zero client connections
- Tenant has a db connection and >0 client connections
A tenant is not healthy if a tenant has client connections and no database connection.
The response includes `replication_connected` to indicate if the replication connection
for broadcast changes is active. This is informational and does not affect the healthy status.
"""

@spec health_check(binary) ::
Expand All @@ -42,15 +45,17 @@ defmodule Realtime.Tenants do
| String.t()
| %{
connected_cluster: pos_integer,
db_connected: false,
db_connected: boolean,
replication_connected: boolean,
healthy: false,
region: String.t(),
node: String.t()
}}
| {:ok,
%{
connected_cluster: non_neg_integer,
db_connected: true,
db_connected: boolean,
replication_connected: boolean,
healthy: true,
region: String.t(),
node: String.t()
Expand All @@ -66,6 +71,7 @@ defmodule Realtime.Tenants do
%{
healthy: false,
db_connected: false,
replication_connected: false,
connected_cluster: connected_cluster,
region: region,
node: node
Expand All @@ -76,11 +82,13 @@ defmodule Realtime.Tenants do

{:ok, _health_conn} ->
connected_cluster = UsersCounter.tenant_users(external_id)
replication_connected = replication_connected?(external_id)

{:ok,
%{
healthy: true,
db_connected: true,
replication_connected: replication_connected,
connected_cluster: connected_cluster,
region: region,
node: node
Expand All @@ -94,13 +102,21 @@ defmodule Realtime.Tenants do
%{
healthy: result? == :ok || result? == :noop,
db_connected: false,
replication_connected: false,
connected_cluster: connected_cluster,
region: region,
node: node
}}
end
end

defp replication_connected?(external_id) do
case Connect.replication_status(external_id) do
{:ok, _pid} -> true
{:error, :not_connected} -> false
end
end

@doc """
All the keys that we use to create counters and RateLimiters for tenants.
"""
Expand Down
25 changes: 21 additions & 4 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,17 @@ defmodule Realtime.Tenants.Connect do
end
end

@doc """
Returns the replication connection status from :syn metadata without RPC calls.
"""
@spec replication_status(binary()) :: {:ok, pid()} | {:error, :not_connected}
def replication_status(tenant_id) do
case :syn.lookup(__MODULE__, tenant_id) do
{_, %{replication_conn: pid}} when is_pid(pid) -> {:ok, pid}
_ -> {:error, :not_connected}
end
end

@doc """
Shutdown the tenant Connection and linked processes
"""
Expand All @@ -225,7 +236,7 @@ defmodule Realtime.Tenants.Connect do

check_connect_region_interval = Keyword.get(opts, :check_connect_region_interval, rebalance_check_interval_in_ms())

name = {__MODULE__, tenant_id, %{conn: nil, region: region}}
name = {__MODULE__, tenant_id, %{conn: nil, region: region, replication_conn: nil}}

state = %__MODULE__{
tenant_id: tenant_id,
Expand Down Expand Up @@ -369,10 +380,11 @@ defmodule Realtime.Tenants.Connect do
# Handle replication connection termination
def handle_info(
{:DOWN, replication_connection_reference, _, _, _},
%{replication_connection_reference: replication_connection_reference} = state
%{replication_connection_reference: replication_connection_reference, tenant_id: tenant_id} = state
) do
%{backoff: backoff} = state
log_warning("ReplicationConnectionDown", "Replication connection has been terminated")
update_syn_replication_conn(tenant_id, nil)
{timeout, backoff} = Backoff.backoff(backoff)
Process.send_after(self(), :recover_replication_connection, timeout)
state = %{state | replication_connection_pid: nil, replication_connection_reference: nil, backoff: backoff}
Expand Down Expand Up @@ -467,10 +479,15 @@ defmodule Realtime.Tenants.Connect do

defp rebalance_check_interval_in_ms(), do: Application.fetch_env!(:realtime, :rebalance_check_interval_in_ms)

defp update_syn_replication_conn(tenant_id, pid) do
:syn.update_registry(__MODULE__, tenant_id, fn _pid, meta -> %{meta | replication_conn: pid} end)
end

defp start_replication_connection(state) do
%{tenant: tenant} = state
%{tenant: tenant, tenant_id: tenant_id} = state

with {:ok, replication_connection_pid} <- ReplicationConnection.start(tenant, self()) do
with {:ok, replication_connection_pid} <- ReplicationConnection.start(tenant, self()),
{:ok, _} <- update_syn_replication_conn(tenant_id, replication_connection_pid) do
replication_connection_reference = Process.monitor(replication_connection_pid)

state = %{
Expand Down
11 changes: 9 additions & 2 deletions lib/realtime_web/open_api_schemas.ex
Original file line number Diff line number Diff line change
Expand Up @@ -313,18 +313,25 @@ defmodule RealtimeWeb.OpenApiSchemas do
type: :boolean,
description: "Indicates if Realtime has an active connection to the tenant database"
},
replication_connected: %Schema{
type: :boolean,
description: "Indicates if Realtime has an active replication connection for broadcast changes"
},
connected_cluster: %Schema{
type: :integer,
description: "The count of currently connected clients for a tenant on the Realtime cluster"
}
},
required: [
:external_id,
:jwt_secret
:healthy,
:db_connected,
:replication_connected,
:connected_cluster
],
example: %{
healthy: true,
db_connected: true,
replication_connected: true,
connected_cluster: 10
}
})
Expand Down
36 changes: 31 additions & 5 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ defmodule Realtime.Tenants.ConnectTest do

for _ <- 1..50, do: spawn(connect)

assert_receive({:ok, pid}, 1100)
assert_receive({:ok, pid}, 2000)

for _ <- 1..49, do: assert_receive({:ok, ^pid})

Expand Down Expand Up @@ -200,11 +200,9 @@ defmodule Realtime.Tenants.ConnectTest do
log =
capture_log(fn ->
assert {:ok, db_conn} = Connect.lookup_or_start_connection(external_id, check_connect_region_interval: 100)

expect(Rebalancer, :check, 1, fn _, _, ^external_id -> {:error, :wrong_region} end)
reject(&Rebalancer.check/3)

assert_process_down(db_conn, 500, {:shutdown, :rebalancing})
assert_process_down(db_conn, 1000, {:shutdown, :rebalancing})
end)

assert log =~ "Rebalancing Tenant database connection"
Expand Down Expand Up @@ -459,11 +457,16 @@ defmodule Realtime.Tenants.ConnectTest do
replication_connection_before = ReplicationConnection.whereis(tenant.external_id)
assert Process.alive?(replication_connection_before)

assert {:ok, replication_conn_pid_before} = Connect.replication_status(tenant.external_id)

assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)

replication_connection_after = ReplicationConnection.whereis(tenant.external_id)
assert Process.alive?(replication_connection_after)
assert replication_connection_before == replication_connection_after

assert {:ok, replication_conn_pid_after} = Connect.replication_status(tenant.external_id)
assert replication_conn_pid_before == replication_conn_pid_after
end

test "on replication connection postgres pid being stopped, Connect module recovers it", %{tenant: tenant} do
Expand All @@ -476,6 +479,8 @@ defmodule Realtime.Tenants.ConnectTest do
assert Process.alive?(replication_connection_pid)
pid = Connect.whereis(tenant.external_id)

assert {:ok, replication_conn_before} = Connect.replication_status(tenant.external_id)

Postgrex.query!(
db_conn,
"SELECT pg_terminate_backend(pid) from pg_stat_activity where application_name='realtime_replication_connection'",
Expand All @@ -484,11 +489,17 @@ defmodule Realtime.Tenants.ConnectTest do

assert_receive {:DOWN, _, :process, ^replication_connection_pid, _}

Process.sleep(100)
assert {:error, :not_connected} = Connect.replication_status(tenant.external_id)

new_replication_connection_pid = assert_pid(fn -> ReplicationConnection.whereis(tenant.external_id) end)

assert replication_connection_pid != new_replication_connection_pid
assert Process.alive?(new_replication_connection_pid)
assert Process.alive?(pid)

assert {:ok, replication_conn_after} = Connect.replication_status(tenant.external_id)
assert replication_conn_before != replication_conn_after
end

test "on replication connection exit, Connect module recovers it", %{tenant: tenant} do
Expand All @@ -499,14 +510,23 @@ defmodule Realtime.Tenants.ConnectTest do
Process.monitor(replication_connection_pid)
assert Process.alive?(replication_connection_pid)
pid = Connect.whereis(tenant.external_id)

assert {:ok, replication_conn_before} = Connect.replication_status(tenant.external_id)

Process.exit(replication_connection_pid, :kill)
assert_receive {:DOWN, _, :process, ^replication_connection_pid, _}

Process.sleep(100)
assert {:error, :not_connected} = Connect.replication_status(tenant.external_id)

new_replication_connection_pid = assert_pid(fn -> ReplicationConnection.whereis(tenant.external_id) end)

assert replication_connection_pid != new_replication_connection_pid
assert Process.alive?(new_replication_connection_pid)
assert Process.alive?(pid)

assert {:ok, replication_conn_after} = Connect.replication_status(tenant.external_id)
assert replication_conn_before != replication_conn_after
end

test "handles replication connection timeout by logging and shutting down", %{tenant: tenant} do
Expand All @@ -526,8 +546,8 @@ defmodule Realtime.Tenants.ConnectTest do

test "handles max_wal_senders by logging the correct operational code", %{tenant: tenant} do
opts = tenant |> Database.from_tenant("realtime_test", :stop) |> Database.opts()
parent = self()

# This creates a loop of errors that occupies all WAL senders and lets us test the error handling
pids =
for i <- 0..4 do
replication_slot_opts =
Expand All @@ -543,13 +563,16 @@ defmodule Realtime.Tenants.ConnectTest do

spawn(fn ->
{:ok, pid} = PostgresReplication.start_link(replication_slot_opts)
send(parent, {:replication_ready, i})

receive do
:stop -> Process.exit(pid, :kill)
end
end)
end

for i <- 0..4, do: assert_receive({:replication_ready, ^i}, 5000)

on_exit(fn ->
Enum.each(pids, &send(&1, :stop))
Process.sleep(2000)
Expand Down Expand Up @@ -629,6 +652,9 @@ defmodule Realtime.Tenants.ConnectTest do
assert Process.alive?(connect_pid)
assert Process.alive?(replication_connection_pid)

assert {_, %{conn: ^db_conn}} = :syn.lookup(Connect, tenant.external_id)
assert {:ok, _replication_conn_pid} = Connect.replication_status(tenant.external_id)

Connect.shutdown(tenant.external_id)
assert_process_down(connect_pid)
assert_process_down(replication_connection_pid)
Expand Down
42 changes: 37 additions & 5 deletions test/realtime_web/controllers/tenant_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -409,13 +409,14 @@ defmodule RealtimeWeb.TenantControllerTest do
assert %{
"healthy" => true,
"db_connected" => false,
"replication_connected" => false,
"connected_cluster" => 0,
"region" => "us-east-1",
"node" => "#{node()}"
} == data
end

test "unhealthy tenant with 1 client connections", %{
test "unhealthy tenant with 1 client connections and no db connection", %{
conn: conn,
tenant: %Tenant{external_id: ext_id}
} do
Expand All @@ -429,19 +430,23 @@ defmodule RealtimeWeb.TenantControllerTest do
assert %{
"healthy" => false,
"db_connected" => false,
"replication_connected" => false,
"connected_cluster" => 1,
"region" => "us-east-1",
"node" => "#{node()}"
} == data
end

test "healthy tenant with 1 client connection", %{conn: conn, tenant: %Tenant{external_id: ext_id}} do
test "healthy tenant with db connection but no replication connection", %{
conn: conn,
tenant: %Tenant{external_id: ext_id}
} do
{:ok, db_conn} = Connect.lookup_or_start_connection(ext_id)
# Fake adding a connected client here
UsersCounter.add(self(), ext_id)

# Fake a db connection
:syn.register(Realtime.Tenants.Connect, ext_id, self(), %{conn: nil})
# Fake a db connection without replication (replication_conn: nil)
:syn.register(Realtime.Tenants.Connect, ext_id, self(), %{conn: nil, region: "us-east-1", replication_conn: nil})

:syn.update_registry(Realtime.Tenants.Connect, ext_id, fn _pid, meta ->
%{meta | conn: db_conn}
Expand All @@ -453,6 +458,32 @@ defmodule RealtimeWeb.TenantControllerTest do
assert %{
"healthy" => true,
"db_connected" => true,
"replication_connected" => false,
"connected_cluster" => 1,
"region" => "us-east-1",
"node" => "#{node()}"
} == data
end

test "healthy tenant with db and replication connection", %{conn: conn, tenant: %Tenant{external_id: ext_id}} do
{:ok, db_conn} = Connect.lookup_or_start_connection(ext_id)
# Fake adding a connected client here
UsersCounter.add(self(), ext_id)

# Fake a db connection with replication_conn in syn metadata
:syn.register(Realtime.Tenants.Connect, ext_id, self(), %{conn: nil, region: "us-east-1", replication_conn: nil})

:syn.update_registry(Realtime.Tenants.Connect, ext_id, fn _pid, meta ->
%{meta | conn: db_conn, replication_conn: self()}
end)

conn = get(conn, ~p"/api/tenants/#{ext_id}/health")
data = json_response(conn, 200)["data"]

assert %{
"healthy" => true,
"db_connected" => true,
"replication_connected" => true,
"connected_cluster" => 1,
"region" => "us-east-1",
"node" => "#{node()}"
Expand All @@ -477,7 +508,8 @@ defmodule RealtimeWeb.TenantControllerTest do

assert {:ok, %{rows: []}} = Postgrex.query(db_conn, "SELECT * FROM realtime.messages", [])

assert %{"healthy" => true, "db_connected" => false, "connected_cluster" => 0} = data
assert %{"healthy" => true, "db_connected" => false, "replication_connected" => false, "connected_cluster" => 0} =
data
end

test "sets appropriate observability metadata", %{conn: conn, tenant: tenant} do
Expand Down