Skip to content

Commit

Permalink
fix: Add migrations to be less blocking (#1218)
Browse files Browse the repository at this point in the history
* Partition the Migrations Supervisor
* Use maybe_migrate
* Fix maybe_migrate to avoid transactions and potentially reduce transaction from Migrations
* Move code for migrations and partition creation from init to handle_continue so the process does not get locked
* bring back the 30 second ttl
* fix dialyzer
* remove timer sleep from get_status
* move start replication to handle_continue
  • Loading branch information
filipecabaco authored Nov 12, 2024
1 parent 1169953 commit 61907d3
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 125 deletions.
6 changes: 4 additions & 2 deletions lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ defmodule Realtime.Application do
{Registry, keys: :duplicate, name: Realtime.Registry},
{Registry, keys: :unique, name: Realtime.Registry.Unique},
{Task.Supervisor, name: Realtime.TaskSupervisor},
{PartitionSupervisor,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
name: Realtime.Tenants.Migrations.DynamicSupervisor},
{PartitionSupervisor,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
Expand All @@ -76,8 +80,6 @@ defmodule Realtime.Application do
strategy: :one_for_one,
name: Realtime.Tenants.Listen.DynamicSupervisor,
max_restarts: 5},
{DynamicSupervisor,
name: Realtime.Tenants.Migrations.DynamicSupervisor, strategy: :one_for_one},
{PartitionSupervisor,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/tenants/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Realtime.Tenants.Cache do
require Cachex.Spec

alias Realtime.Tenants
@expiration :timer.hours(6)
@expiration :timer.seconds(30)
def child_spec(_) do
%{
id: __MODULE__,
Expand Down
49 changes: 36 additions & 13 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,22 @@ defmodule Realtime.Tenants.Connect do
import Realtime.Helpers, only: [log_error: 2]

alias Realtime.Api.Tenant
alias Realtime.BroadcastChanges.Handler
alias Realtime.Rpc
alias Realtime.Tenants
alias Realtime.Tenants.Migrations
alias Realtime.UsersCounter
alias Realtime.Tenants.Connect.Piper
alias Realtime.Tenants.Connect.CheckConnection
alias Realtime.Tenants.Connect.StartReplication
alias Realtime.Tenants.Connect.Migrations
alias Realtime.Tenants.Connect.GetTenant
alias Realtime.Tenants.Connect.Piper
alias Realtime.Tenants.Connect.RegisterProcess
alias Realtime.Tenants.Connect.StartCounters
alias Realtime.Tenants.Connect.CreatePartitions
alias Realtime.Tenants.Migrations
alias Realtime.UsersCounter

@pipes [
GetTenant,
CheckConnection,
Migrations,
StartCounters,
StartReplication,
RegisterProcess,
CreatePartitions
RegisterProcess
]
@rpc_timeout_default 30_000
@check_connected_user_interval_default 50_000
Expand Down Expand Up @@ -87,7 +82,6 @@ defmodule Realtime.Tenants.Connect do

:undefined ->
Logger.warning("Connection process starting up")
:timer.sleep(100)
{:error, :tenant_database_connection_initializing}

error ->
Expand Down Expand Up @@ -149,8 +143,7 @@ defmodule Realtime.Tenants.Connect do
Logger.metadata(external_id: tenant_id, project: tenant_id)

with {:ok, acc} <- Piper.run(@pipes, state) do
acc = Map.delete(acc, :tenant)
{:ok, acc, {:continue, :setup_connected_user_events}}
{:ok, acc, {:continue, :run_migrations}}
else
{:error, :tenant_not_found} ->
log_error("TenantNotFound", "Tenant not found")
Expand All @@ -162,6 +155,16 @@ defmodule Realtime.Tenants.Connect do
end
end

def handle_continue(:run_migrations, state) do
%{tenant: tenant, db_conn_pid: db_conn_pid} = state
:ok = Migrations.maybe_run_migrations(db_conn_pid, tenant)
:ok = Migrations.create_partitions(db_conn_pid)
{:ok, broadcast_changes_pid} = start_replication(tenant)

{:noreply, %{state | broadcast_changes_pid: broadcast_changes_pid},
{:continue, :setup_connected_user_events}}
end

@impl true
def handle_continue(:setup_connected_user_events, state) do
%{
Expand Down Expand Up @@ -272,4 +275,24 @@ defmodule Realtime.Tenants.Connect do

defp tenant_suspended?(%Tenant{suspend: true}), do: {:error, :tenant_suspended}
defp tenant_suspended?(_), do: :ok

defp start_replication(%{notify_private_alpha: false}), do: {:ok, nil}

defp start_replication(tenant) do
opts = %Handler{tenant_id: tenant.external_id}
supervisor_spec = Handler.supervisor_spec(tenant)

child_spec = %{
id: Handler,
start: {Handler, :start_link, [opts]},
restart: :transient,
type: :worker
}

case DynamicSupervisor.start_child(supervisor_spec, child_spec) do
{:ok, pid} -> {:ok, pid}
{:error, {:already_started, pid}} -> {:ok, pid}
error -> {:error, error}
end
end
end
34 changes: 0 additions & 34 deletions lib/realtime/tenants/connect/create_partitions.ex

This file was deleted.

19 changes: 0 additions & 19 deletions lib/realtime/tenants/connect/migrations.ex

This file was deleted.

33 changes: 0 additions & 33 deletions lib/realtime/tenants/connect/start_replication.ex

This file was deleted.

53 changes: 45 additions & 8 deletions lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,14 @@ defmodule Realtime.Tenants.Migrations do

defstruct [:tenant_external_id, :settings]
@spec run_migrations(map()) :: :ok | {:error, any()}
def run_migrations(%__MODULE__{} = attrs) do
case DynamicSupervisor.start_child(__MODULE__.DynamicSupervisor, {__MODULE__, attrs}) do
def run_migrations(%__MODULE__{tenant_external_id: tenant_external_id} = attrs) do
supervisor =
{:via, PartitionSupervisor,
{Realtime.Tenants.Migrations.DynamicSupervisor, tenant_external_id}}

spec = {__MODULE__, attrs}

case DynamicSupervisor.start_child(supervisor, spec) do
:ignore -> :ok
error -> error
end
Expand Down Expand Up @@ -196,19 +202,50 @@ defmodule Realtime.Tenants.Migrations do
If not all migrations have been run, it will run the missing migrations.
"""
@spec maybe_run_migrations(pid(), Tenant.t()) :: {:ok, any()} | {:error, any()}
@spec maybe_run_migrations(pid(), Tenant.t()) :: :ok
def maybe_run_migrations(db_conn, tenant) do
query =
"select * from pg_catalog.pg_tables where schemaname = 'realtime' and tablename = 'schema_migrations';"

%{extensions: [%{settings: settings} | _]} = tenant
%{num_rows: num_rows} = Postgrex.query!(db_conn, query, [])

Database.transaction(db_conn, fn transaction_conn ->
%{num_rows: num_rows} = Postgrex.query!(transaction_conn, query, [])
if num_rows < @expected_migration_count do
run_migrations(%__MODULE__{tenant_external_id: tenant.external_id, settings: settings})
end

if num_rows < @expected_migration_count do
run_migrations(%__MODULE__{tenant_external_id: tenant.external_id, settings: settings})
end
:ok
end

@doc """
Create partitions against tenant db connection
"""
@spec create_partitions(pid()) :: :ok
def create_partitions(db_conn_pid) do
today = Date.utc_today()
yesterday = Date.add(today, -1)
tomorrow = Date.add(today, 1)

dates = [yesterday, today, tomorrow]

Enum.each(dates, fn date ->
partition_name = "messages_#{date |> Date.to_iso8601() |> String.replace("-", "_")}"
start_timestamp = Date.to_string(date)
end_timestamp = Date.to_string(Date.add(date, 1))

Database.transaction(db_conn_pid, fn conn ->
Postgrex.query(
conn,
"""
CREATE TABLE IF NOT EXISTS realtime.#{partition_name}
PARTITION OF realtime.messages
FOR VALUES FROM ('#{start_timestamp}') TO ('#{end_timestamp}');
""",
[]
)
end)
end)

:ok
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.42",
version: "2.33.43",
elixir: "~> 1.16.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/repo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ defmodule Realtime.RepoTest do

describe "insert/3" do
setup %{db_conn: db_conn} do
Realtime.Tenants.Connect.CreatePartitions.run(%{db_conn_pid: db_conn})
Realtime.Tenants.Migrations.create_partitions(db_conn)
end

test "inserts a new entry with a given changeset and returns struct", %{db_conn: db_conn} do
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/tenants/authorization_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ defmodule Realtime.Tenants.AuthorizationTest do
role: claims.role
})

Realtime.Tenants.Connect.CreatePartitions.run(%{db_conn_pid: db_conn})
Realtime.Tenants.Migrations.create_partitions(db_conn)

on_exit(fn -> Process.exit(db_conn, :normal) end)

Expand Down
23 changes: 12 additions & 11 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule Realtime.Tenants.ConnectTest do

test "on connect, tracks tenant as active", %{tenant: tenant} do
assert {:ok, _} = Connect.lookup_or_start_connection(tenant.external_id)
:timer.sleep(200)
:timer.sleep(500)

assert Enum.find(Tenants.list_active_tenants(), &(elem(&1, 0) == tenant.external_id))
end
Expand Down Expand Up @@ -79,11 +79,11 @@ defmodule Realtime.Tenants.ConnectTest do

Sandbox.allow(Repo, self(), db_conn)
# Not enough time has passed, connection still alive
:timer.sleep(100)
:timer.sleep(500)
assert {_, %{conn: _}} = :syn.lookup(Connect, tenant_id)

# Enough time has passed, connection stopped
:timer.sleep(1000)
:timer.sleep(5000)
assert :undefined = :syn.lookup(Connect, tenant_id)
refute Process.alive?(db_conn)
end
Expand Down Expand Up @@ -113,9 +113,9 @@ defmodule Realtime.Tenants.ConnectTest do
Connect.lookup_or_start_connection(tenant_id, check_connected_user_interval: 10)

assert {_pid, %{conn: _conn_pid}} = :syn.lookup(Connect, tenant_id)
:timer.sleep(300)
:timer.sleep(1000)
:syn.leave(:users, tenant_id, self())
:timer.sleep(300)
:timer.sleep(1000)
assert :undefined = :syn.lookup(Connect, tenant_id)
refute Process.alive?(db_conn)
end
Expand All @@ -132,7 +132,7 @@ defmodule Realtime.Tenants.ConnectTest do
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Sandbox.allow(Repo, self(), db_conn)

:timer.sleep(100)
:timer.sleep(500)
Realtime.Tenants.suspend_tenant_by_external_id(tenant.external_id)
:timer.sleep(500)

Expand Down Expand Up @@ -181,11 +181,12 @@ defmodule Realtime.Tenants.ConnectTest do
end

test "on migrations failure, stop the process", %{tenant: tenant} do
with_mock Realtime.Tenants.Migrations, [], run_migrations: fn _ -> raise("error") end do
assert {:error, :tenant_database_unavailable} =
Connect.lookup_or_start_connection(tenant.external_id)

assert_called(Realtime.Tenants.Migrations.run_migrations(:_))
with_mock Realtime.Tenants.Migrations, [],
maybe_run_migrations: fn _, _ -> raise("error") end do
assert {:ok, pid} = Connect.lookup_or_start_connection(tenant.external_id)
Process.alive?(pid)
Process.sleep(1000)
assert_called(Realtime.Tenants.Migrations.maybe_run_migrations(:_, :_))
end
end

Expand Down
2 changes: 1 addition & 1 deletion test/support/generators.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Generators do

def message_fixture(tenant, override \\ %{}) do
{:ok, db_conn} = Database.connect(tenant, "realtime_test", 1)
Realtime.Tenants.Connect.CreatePartitions.run(%{db_conn_pid: db_conn})
Realtime.Tenants.Migrations.create_partitions(db_conn)

create_attrs = %{
"topic" => random_string(),
Expand Down

0 comments on commit 61907d3

Please sign in to comment.