@@ -19,7 +19,6 @@ defmodule Realtime.Tenants.Connect do
1919  alias  Realtime.Tenants.Connect.GetTenant 
2020  alias  Realtime.Tenants.Connect.Piper 
2121  alias  Realtime.Tenants.Connect.RegisterProcess 
22-   alias  Realtime.Tenants.Connect.StartCounters 
2322  alias  Realtime.Tenants.Migrations 
2423  alias  Realtime.Tenants.ReplicationConnection 
2524  alias  Realtime.UsersCounter 
@@ -83,14 +82,13 @@ defmodule Realtime.Tenants.Connect do
8382          |  { :error ,  :tenant_database_connection_initializing } 
8483  def  get_status ( tenant_id )  do 
8584    case  :syn . lookup ( __MODULE__ ,  tenant_id )  do 
86-       { _pid ,  % { conn:  nil } }  -> 
87-         wait_for_connection ( tenant_id ) 
85+       { pid ,  % { conn:  nil } }  -> 
86+         wait_for_connection ( pid ,   tenant_id ) 
8887
8988      { _ ,  % { conn:  conn } }  -> 
9089        { :ok ,  conn } 
9190
9291      :undefined  -> 
93-         Logger . warning ( "Connection process starting up" ) 
9492        { :error ,  :tenant_database_connection_initializing } 
9593
9694      error  -> 
@@ -101,7 +99,7 @@ defmodule Realtime.Tenants.Connect do
10199
102100  def  syn_topic ( tenant_id ) ,  do:  "connect:#{ tenant_id }  
103101
104-   defp  wait_for_connection ( tenant_id )  do 
102+   defp  wait_for_connection ( pid ,   tenant_id )  do 
105103    RealtimeWeb.Endpoint . subscribe ( syn_topic ( tenant_id ) ) 
106104
107105    # We do a lookup after subscribing because we could've missed a message while subscribing 
@@ -112,9 +110,18 @@ defmodule Realtime.Tenants.Connect do
112110      _  -> 
113111        # Wait for up to 5 seconds for the ready event 
114112        receive  do 
115-           % { event:  "ready" ,  payload:  % { conn:  conn } }  ->  { :ok ,  conn } 
113+           % { event:  "ready" ,  payload:  % { pid:  ^ pid ,  conn:  conn } }  -> 
114+             { :ok ,  conn } 
115+ 
116+           % { event:  "connect_down" ,  payload:  % { pid:  ^ pid ,  reason:  { :shutdown ,  :tenant_db_too_many_connections } } }  -> 
117+             { :error ,  :tenant_db_too_many_connections } 
118+ 
119+           % { event:  "connect_down" ,  payload:  % { pid:  ^ pid ,  reason:  _reason } }  -> 
120+             metadata  =  [ external_id:  tenant_id ,  project:  tenant_id ] 
121+             log_error ( "UnableToConnectToTenantDatabase" ,  "Unable to connect to tenant database" ,  metadata ) 
122+             { :error ,  :tenant_database_unavailable } 
116123        after 
117-           5_000  ->  { :error ,  :initializing } 
124+           15_000  ->  { :error ,  :initializing } 
118125        end 
119126    end 
120127  after 
@@ -139,16 +146,6 @@ defmodule Realtime.Tenants.Connect do
139146      { :error ,  { :already_started ,  _ } }  -> 
140147        get_status ( tenant_id ) 
141148
142-       { :error ,  { :shutdown ,  :tenant_db_too_many_connections } }  -> 
143-         { :error ,  :tenant_db_too_many_connections } 
144- 
145-       { :error ,  { :shutdown ,  :tenant_not_found } }  -> 
146-         { :error ,  :tenant_not_found } 
147- 
148-       { :error ,  :shutdown }  -> 
149-         log_error ( "UnableToConnectToTenantDatabase" ,  "Unable to connect to tenant database" ,  metadata ) 
150-         { :error ,  :tenant_database_unavailable } 
151- 
152149      { :error ,  error }  -> 
153150        log_error ( "UnableToConnectToTenantDatabase" ,  error ,  metadata ) 
154151        { :error ,  :tenant_database_unavailable } 
@@ -209,30 +206,33 @@ defmodule Realtime.Tenants.Connect do
209206  def  init ( % { tenant_id:  tenant_id }  =  state )  do 
210207    Logger . metadata ( external_id:  tenant_id ,  project:  tenant_id ) 
211208
209+     { :ok ,  state ,  { :continue ,  :db_connect } } 
210+   end 
211+ 
212+   @ impl  true 
213+   def  handle_continue ( :db_connect ,  state )  do 
212214    pipes  =  [ 
213215      GetTenant , 
214216      CheckConnection , 
215-       StartCounters , 
216217      RegisterProcess 
217218    ] 
218219
219220    case  Piper . run ( pipes ,  state )  do 
220221      { :ok ,  acc }  -> 
221-         { :ok  ,  acc ,  { :continue ,  :run_migrations } } 
222+         { :noreply  ,  acc ,  { :continue ,  :run_migrations } } 
222223
223224      { :error ,  :tenant_not_found }  -> 
224-         { :stop ,  { :shutdown ,  :tenant_not_found } } 
225+         { :stop ,  { :shutdown ,  :tenant_not_found } ,   state } 
225226
226227      { :error ,  :tenant_db_too_many_connections }  -> 
227-         { :stop ,  { :shutdown ,  :tenant_db_too_many_connections } } 
228+         { :stop ,  { :shutdown ,  :tenant_db_too_many_connections } ,   state } 
228229
229230      { :error ,  error }  -> 
230231        log_error ( "UnableToConnectToTenantDatabase" ,  error ) 
231-         { :stop ,  :shutdown } 
232+         { :stop ,  :shutdown ,   state } 
232233    end 
233234  end 
234235
235-   @ impl  true 
236236  def  handle_continue ( :run_migrations ,  state )  do 
237237    % { tenant:  tenant ,  db_conn_pid:  db_conn_pid }  =  state 
238238    Logger . warning ( "Tenant #{ tenant . external_id } #{ inspect ( node ( ) ) }  ) 
@@ -375,6 +375,7 @@ defmodule Realtime.Tenants.Connect do
375375
376376  ## Private functions 
377377  defp  call_external_node ( tenant_id ,  opts )  do 
378+     Logger . warning ( "Connection process starting up" ) 
378379    rpc_timeout  =  Keyword . get ( opts ,  :rpc_timeout ,  @ rpc_timeout_default ) 
379380
380381    with  tenant  <-  Tenants.Cache . get_tenant_by_external_id ( tenant_id ) , 
0 commit comments