Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,24 @@ defmodule Realtime.Tenants.Connect do
| {:error, :tenant_database_unavailable}
| {:error, :initializing}
| {:error, :tenant_database_connection_initializing}
| {:error, :tenant_db_too_many_connections}
| {:error, :rpc_error, term()}
def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do
case get_status(tenant_id) do
{:ok, conn} ->
{:ok, conn}

{:error, :tenant_database_unavailable} ->
call_external_node(tenant_id, opts)
{:error, :tenant_database_unavailable}

{:error, :tenant_database_connection_initializing} ->
call_external_node(tenant_id, opts)

{:error, :initializing} ->
{:error, :tenant_database_unavailable}

{:error, :tenant_db_too_many_connections} ->
{:error, :tenant_db_too_many_connections}
end
end

Expand All @@ -80,6 +84,7 @@ defmodule Realtime.Tenants.Connect do
| {:error, :tenant_database_unavailable}
| {:error, :initializing}
| {:error, :tenant_database_connection_initializing}
| {:error, :tenant_db_too_many_connections}
def get_status(tenant_id) do
case :syn.lookup(__MODULE__, tenant_id) do
{pid, %{conn: nil}} ->
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.50.1",
version: "2.50.2",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
100 changes: 76 additions & 24 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,54 @@ defmodule Realtime.Tenants.ConnectTest do
# This one will succeed
{:ok, _pid} = Connect.lookup_or_start_connection(tenant.external_id)
end

test "too many db connections", %{tenant: tenant} do
extension = %{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "127.0.0.1",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"ssl_enforced" => false,
"db_pool" => 100,
"subcriber_pool_size" => 100,
"subs_pool_size" => 100
}
}

{:ok, tenant} = update_extension(tenant, extension)

parent = self()

# Let's slow down Connect starting
expect(Database, :check_tenant_connection, fn t ->
:timer.sleep(1000)
call_original(Database, :check_tenant_connection, [t])
end)

connect = fn -> send(parent, Connect.lookup_or_start_connection(tenant.external_id)) end

# Start an early connect
spawn(connect)
:timer.sleep(100)

# Start others
spawn(connect)
spawn(connect)

# This one should block and wait for the first Connect
{:error, :tenant_db_too_many_connections} = Connect.lookup_or_start_connection(tenant.external_id)

assert_receive {:error, :tenant_db_too_many_connections}
assert_receive {:error, :tenant_db_too_many_connections}
assert_receive {:error, :tenant_db_too_many_connections}
refute_receive _any
end
end

describe "region rebalancing" do
Expand Down Expand Up @@ -263,6 +311,34 @@ defmodule Realtime.Tenants.ConnectTest do
assert {:error, :tenant_suspended} = Connect.lookup_or_start_connection(tenant.external_id)
end

test "tenant not able to connect if database has not enough connections", %{
tenant: tenant
} do
extension = %{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "127.0.0.1",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"ssl_enforced" => false,
"db_pool" => 100,
"subcriber_pool_size" => 100,
"subs_pool_size" => 100
}
}

{:ok, tenant} = update_extension(tenant, extension)

assert capture_log(fn ->
assert {:error, :tenant_db_too_many_connections} = Connect.lookup_or_start_connection(tenant.external_id)
end) =~ ~r/Only \d+ available connections\. At least \d+ connections are required/
end

test "handles tenant suspension and unsuspension in a reactive way", %{tenant: tenant} do
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
assert Connect.ready?(tenant.external_id)
Expand Down Expand Up @@ -459,30 +535,6 @@ defmodule Realtime.Tenants.ConnectTest do
test "if tenant does not exist, does nothing" do
assert :ok = Connect.shutdown("none")
end

test "tenant not able to connect if database has not enough connections", %{tenant: tenant} do
extension = %{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "127.0.0.1",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"ssl_enforced" => false,
"db_pool" => 100,
"subcriber_pool_size" => 100,
"subs_pool_size" => 100
}
}

{:ok, tenant} = update_extension(tenant, extension)

assert {:error, :tenant_db_too_many_connections} = Connect.lookup_or_start_connection(tenant.external_id)
end
end

describe "registers into local registry" do
Expand Down
Loading