diff --git a/VERSION b/VERSION index 1b507d18..2818446a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.1.39 +1.1.40 diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index d816c3bb..4bf51ed4 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -30,6 +30,10 @@ defmodule Supavisor.ClientHandler do :gen_statem.cast(pid, {:client_cast, bin, status}) end + @spec client_call(pid, iodata(), atom()) :: :ok | {:error, term()} + def client_call(pid, bin, status), + do: :gen_statem.call(pid, {:client_call, bin, status}, 30_000) + @impl true def init(_), do: :ignore @@ -548,6 +552,12 @@ defmodule Supavisor.ClientHandler do end end + # emulate handle_call + def handle_event({:call, from}, {:client_call, bin, _}, _, data) do + Logger.debug("ClientHandler: --> --> bin call #{inspect(byte_size(bin))} bytes") + {:keep_state_and_data, {:reply, from, HH.sock_send(data.sock, bin)}} + end + def handle_event(type, content, state, data) do msg = [ {"type", type}, diff --git a/lib/supavisor/db_handler.ex b/lib/supavisor/db_handler.ex index 8cc776c9..52dbebb1 100644 --- a/lib/supavisor/db_handler.ex +++ b/lib/supavisor/db_handler.ex @@ -19,6 +19,7 @@ defmodule Supavisor.DbHandler do @reconnect_timeout 2_500 @sock_closed [:tcp_closed, :ssl_closed] @proto [:tcp, :ssl] + @async_send_limit 1_000 def start_link(config) do :gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000) @@ -334,14 +335,15 @@ defmodule Supavisor.DbHandler do Logger.debug("DbHandler: Got write replica message #{inspect(bin)}") HH.setopts(data.sock, active: :once) # check if the response ends with "ready for query" - ready = - if String.ends_with?(bin, Server.ready_for_query()) do - :ready_for_query - else - :continue - end + ready = check_ready(bin) + sent = data.sent || 0 + + send_via = + if ready == :ready_for_query || sent < @async_send_limit, + do: :client_cast, + else: :client_call - :ok = Client.client_cast(data.caller, bin, ready) + :ok = apply(Client, send_via, [data.caller, bin, ready]) case ready do :ready_for_query -> @@ -349,11 +351,11 @@ defmodule Supavisor.DbHandler do HH.setopts(data.sock, active: true) - {:next_state, :idle, %{data | stats: stats, caller: handler_caller(data)}, + {:next_state, :idle, %{data | stats: stats, caller: handler_caller(data), sent: false}, {:next_event, :internal, :check_anon_buffer}} :continue -> - :keep_state_and_data + {:keep_state, %{data | sent: sent + 1}} end end @@ -539,4 +541,11 @@ defmodule Supavisor.DbHandler do @spec handler_caller(map()) :: pid() | nil defp handler_caller(%{mode: :session} = data), do: data.caller defp handler_caller(_), do: nil + + @spec check_ready(binary()) :: :ready_for_query | :continue + def check_ready(bin) do + if String.ends_with?(bin, Server.ready_for_query()), + do: :ready_for_query, + else: :continue + end end diff --git a/test/supavisor/db_handler_test.exs b/test/supavisor/db_handler_test.exs index f62d85f4..947380a6 100644 --- a/test/supavisor/db_handler_test.exs +++ b/test/supavisor/db_handler_test.exs @@ -181,7 +181,8 @@ defmodule Supavisor.DbHandlerTest do caller: caller_pid, sock: {:gen_tcp, nil}, stats: %{}, - mode: :session + mode: :session, + sent: false } state = :some_state @@ -213,7 +214,8 @@ defmodule Supavisor.DbHandlerTest do caller: caller_pid, sock: {:gen_tcp, nil}, stats: %{}, - mode: :transaction + mode: :transaction, + sent: false } state = :some_state