Skip to content

Commit db1571e

Browse files
authored
Add built in connection pool (#108)
First step towards a faster/ singe pool written specifically for DBConnection. This pool uses ETS tables to store connection state and moves ownership of the connection around to avoid tracking state internally. The pool is faster because interaction between the clients and the pool requires only a single message roundtrip to checkout, and a single cast to return. Thus beating or matching the efficiency of our existing pools in each case.
1 parent 3b34256 commit db1571e

File tree

10 files changed

+528
-16
lines changed

10 files changed

+528
-16
lines changed

integration_test/cases/idle_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ defmodule TestIdle do
2828
end]
2929
{:ok, agent} = A.start_link(stack)
3030

31-
opts = [agent: agent, parent: self(), idle_timeout: 50]
31+
opts = [agent: agent, parent: self(), idle_timeout: 50, idle_interval: 50]
3232
{:ok, pool} = P.start_link(opts)
3333
assert_receive {:hi, conn}
3434
assert_receive {:pong, ^conn}

integration_test/cases/queue_test.exs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ defmodule QueueTest do
4242
stack = [{:ok, :state}]
4343
{:ok, agent} = A.start_link(stack)
4444

45-
opts = [agent: agent, parent: self(), queue_timeout: 50]
45+
opts = [agent: agent, parent: self(), queue_timeout: 50, queue_target: 50,
46+
queue_interval: 50]
4647
{:ok, pool} = P.start_link(opts)
4748

4849
parent = self()
@@ -147,7 +148,8 @@ defmodule QueueTest do
147148
{:ok, agent} = A.start_link(stack)
148149

149150
opts = [agent: agent, parent: self(), backoff_start: 30_000,
150-
queue_timeout: 10, pool_timeout: 10]
151+
queue_timeout: 10, pool_timeout: 10, queue_target: 10,
152+
queue_interval: 10]
151153
{:ok, pool} = P.start_link(opts)
152154

153155
P.run(pool, fn(_) ->
@@ -165,7 +167,7 @@ defmodule QueueTest do
165167
{:ok, agent} = A.start_link(stack)
166168

167169
opts = [agent: agent, parent: self(), backoff_start: 30_000,
168-
queue_timeout: 10, pool_timeout: 10]
170+
queue_timeout: 10, pool_timeout: 10, queue_target: 10, queue_interval: 10]
169171
{:ok, pool} = P.start_link(opts)
170172

171173
P.run(pool, fn(_) ->
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Code.require_file "../tests.exs", __DIR__
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
ExUnit.start([capture_log: true, assert_receive_timeout: 500,
2+
exclude: [:pool_overflow, :enqueue_disconnected,
3+
:queue_timeout_exit]])
4+
5+
Code.require_file "../../test/test_support.exs", __DIR__
6+
7+
defmodule TestPool do
8+
use TestConnection, [pool: DBConnection.ConnectionPool, pool_size: 1]
9+
end
10+
11+
{:ok, _} = TestPool.ensure_all_started()

lib/db_connection/app.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ defmodule DBConnection.App do
99
supervisor(DBConnection.Task, []),
1010
supervisor(DBConnection.Sojourn.Supervisor, []),
1111
supervisor(DBConnection.Ownership.PoolSupervisor, []),
12+
supervisor(DBConnection.ConnectionPool.PoolSupervisor, []),
1213
worker(DBConnection.Watcher, [])
1314
]
1415
Supervisor.start_link(children, strategy: :one_for_all, name: __MODULE__)

lib/db_connection/connection.ex

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ defmodule DBConnection.Connection do
1717
use Connection
1818
require Logger
1919
alias DBConnection.Backoff
20+
alias DBConnection.ConnectionPool
2021

2122
@pool_timeout 5_000
2223
@timeout 15_000
@@ -83,6 +84,11 @@ defmodule DBConnection.Connection do
8384
end
8485
end
8586

87+
@doc false
88+
def ping({pid, ref}, state) do
89+
Connection.cast(pid, {:ping, ref, state})
90+
end
91+
8692
## Internal API
8793

8894
@doc false
@@ -96,19 +102,26 @@ defmodule DBConnection.Connection do
96102
Supervisor.Spec.worker(__MODULE__, [mod, opts, mode], child_opts)
97103
end
98104

105+
@doc false
106+
def child_spec(mod, opts, mode, info, child_opts) do
107+
Supervisor.Spec.worker(__MODULE__, [mod, opts, mode, info], child_opts)
108+
end
109+
99110
## Connection API
100111

101112
@doc false
102113
def init({mod, opts, mode, info}) do
103-
queue = if mode == :sojourn, do: :broker, else: :queue.new()
114+
queue = if mode in [:connection, :poolboy], do: :queue.new(), else: mode
115+
idle = if mode in [:connection, :poolboy], do: get_idle(opts), else: :passive
104116
broker = if mode == :sojourn, do: elem(info, 0)
105117
regulator = if mode == :sojourn, do: elem(info, 1)
106-
idle = if mode == :sojourn, do: :passive, else: get_idle(opts)
107118
after_timeout = if mode == :poolboy, do: :stop, else: :backoff
119+
pool = if mode == :connection_pool, do: elem(info, 0)
120+
tag = if mode == :connection_pool, do: elem(info, 1)
108121

109122
s = %{mod: mod, opts: opts, state: nil, client: :closed, broker: broker,
110-
regulator: regulator, lock: nil, queue: queue, timer: nil,
111-
backoff: Backoff.new(opts),
123+
regulator: regulator, lock: nil, pool: pool, tag: tag, queue: queue,
124+
timer: nil, backoff: Backoff.new(opts),
112125
after_connect: Keyword.get(opts, :after_connect),
113126
after_connect_timeout: Keyword.get(opts, :after_connect_timeout,
114127
@timeout), idle: idle,
@@ -206,7 +219,7 @@ defmodule DBConnection.Connection do
206219
@doc false
207220
def handle_call({:checkout, ref, queue?, timeout}, {pid, _} = from, s) do
208221
case s do
209-
%{queue: :broker} ->
222+
%{queue: mode} when is_atom(mode) ->
210223
exit(:bad_checkout)
211224
%{client: nil, idle: :passive, mod: mod, state: state} ->
212225
Connection.reply(from, {:ok, {self(), ref}, mod, state})
@@ -244,6 +257,16 @@ defmodule DBConnection.Connection do
244257
end
245258

246259
@doc false
260+
def handle_cast({:ping, ref, state}, %{client: {ref, :pool}} = s) do
261+
%{mod: mod} = s
262+
case apply(mod, :ping, [state]) do
263+
{:ok, state} ->
264+
pool_update(state, s)
265+
{:disconnect, err, state} ->
266+
{:disconnect, {:log, err}, %{s | state: state}}
267+
end
268+
end
269+
247270
def handle_cast({:checkin, ref, state}, %{client: {ref, _}} = s) do
248271
handle_next(state, s)
249272
end
@@ -259,7 +282,7 @@ defmodule DBConnection.Connection do
259282
{:stop, {err, stack}, %{s | state: state}}
260283
end
261284

262-
def handle_cast({:cancel, _}, %{queue: :broker}) do
285+
def handle_cast({:cancel, _}, %{queue: mode}) when is_atom(mode) do
263286
exit(:bad_cancel)
264287
end
265288
def handle_cast({:cancel, ref}, %{client: {ref, _}, state: state} = s) do
@@ -308,7 +331,9 @@ defmodule DBConnection.Connection do
308331
def handle_cast({:connected, ref}, %{client: {ref, :connect}} = s) do
309332
%{mod: mod, state: state, queue: queue, broker: broker} = s
310333
case apply(mod, :checkout, [state]) do
311-
{:ok, state} when queue == :broker ->
334+
{:ok, state} when queue == :connection_pool ->
335+
pool_update(state, s)
336+
{:ok, state} when queue == :sojourn ->
312337
info = {self(), mod, state}
313338
{:await, ^ref, _} = :sbroker.async_ask_r(broker, info, {self(), ref})
314339
{:noreply, %{s | client: {ref, :broker}, state: state}}
@@ -335,7 +360,8 @@ defmodule DBConnection.Connection do
335360
err = DBConnection.ConnectionError.exception(message)
336361
{:disconnect, {down_log(reason), err}, %{s | client: {ref, nil}}}
337362
end
338-
def handle_info({:DOWN, _, :process, _, _} = msg, %{queue: :broker} = s) do
363+
def handle_info({:DOWN, _, :process, _, _} = msg, %{queue: mode} = s)
364+
when is_atom(mode) do
339365
do_handle_info(msg, s)
340366
end
341367
def handle_info({:DOWN, ref, :process, _, _} = msg, %{queue: queue} = s) do
@@ -425,7 +451,7 @@ defmodule DBConnection.Connection do
425451
defp start_opts(:connection, opts) do
426452
Keyword.take(opts, [:debug, :name, :timeout, :spawn_opt])
427453
end
428-
defp start_opts(mode, opts) when mode in [:poolboy, :sojourn] do
454+
defp start_opts(mode, opts) when mode in [:poolboy, :sojourn, :connection_pool] do
429455
Keyword.take(opts, [:debug, :spawn_opt])
430456
end
431457

@@ -486,7 +512,10 @@ defmodule DBConnection.Connection do
486512
demonitor(client)
487513
handle_next(state, %{s | client: nil, backoff: backoff})
488514
end
489-
defp handle_next(state, %{queue: :broker} = s) do
515+
defp handle_next(state, %{queue: :connection_pool} = s) do
516+
pool_update(state, s)
517+
end
518+
defp handle_next(state, %{queue: :sojourn} = s) do
490519
%{client: client, timer: timer, mod: mod, broker: broker} = s
491520
demonitor(client)
492521
cancel_timer(timer)
@@ -626,7 +655,9 @@ defmodule DBConnection.Connection do
626655
end
627656
end
628657

629-
defp clear_queue(:broker), do: :broker
658+
defp clear_queue(queue) when is_atom(queue) do
659+
queue
660+
end
630661
defp clear_queue(queue) do
631662
clear =
632663
fn({{_, mon}, _, from}) ->
@@ -645,6 +676,11 @@ defmodule DBConnection.Connection do
645676
:ok
646677
end
647678

679+
defp pool_update(state, %{pool: pool, tag: tag, mod: mod} = s) do
680+
ref = ConnectionPool.update(pool, tag, mod, state)
681+
{:noreply, %{s | client: {ref, :pool}, state: state}, :hibernate}
682+
end
683+
648684
defp normal_status(mod, pdict, state) do
649685
try do
650686
mod.format_status(:normal, [pdict, state])

0 commit comments

Comments
 (0)