Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 49 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ Poolboy is a **lightweight**, **generic** pooling library for Erlang with a
focus on **simplicity**, **performance**, and **rock-solid** disaster recovery.

## Usage

The most basic use case is to check out a worker, make a call and manually
return it to the pool when done
```erl-sh
1> Worker = poolboy:checkout(PoolName).
<0.9001.0>
Expand All @@ -17,7 +18,15 @@ ok
3> poolboy:checkin(PoolName, Worker).
ok
```

Alternatively you can use a transaction which will return the worker to the
pool when the call is finished.
```erl-sh
poolboy:transaction(
PoolName,
fun(Worker) -> gen_server:call(Worker, Request) end,
TransactionTimeout
)
```
## Example

This is an example application showcasing database connection pools using
Expand Down Expand Up @@ -149,14 +158,47 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
```

## Options
## Pool Options

- `name`: the pool name
- `worker_module`: the module that represents the workers
- `size`: maximum pool size
- `max_overflow`: maximum number of workers created if pool is empty
- `name`: the pool name - optional
- `worker_module`: the module that represents the workers - mandatory
- `size`: maximum pool size - optional
- `max_overflow`: maximum number of workers created if pool is empty - optional
- `strategy`: `lifo` or `fifo`, determines whether checked in workers should be
placed first or last in the line of available workers. Default is `lifo`.
- `overflow_ttl`: time in milliseconds you want to wait before removing overflow
workers. Useful when it's expensive to start workers. Default is 0.

## Pool Status
Returns : {Status, Workers, Overflow, InUse}
- `Status`: ready | full | overflow
The ready atom indicates there are workers that are not checked out
ready. The full atom indicates all workers including overflow are
checked out. The overflow atom is used to describe the condition
when all permanent workers are in use but there is overflow capacity
available.
- `Workers`: Number of workers ready for use.
- `Overflow`: Number of overflow workers started, should never exceed number
specified by MaxOverflow when starting pool
- `InUse`: Number of workers currently busy/checked out

## Full Pool Status
Returns a propslist of counters relating to a specified pool. Useful
for graphing the state of your pools
- `size`: The defined size of the permanent worker pool
- `max_overflow`: The maximum number of overflow workers allowed
- `total_worker_count`: The total supervised workers. This includes
any workers waiting to be culled and not available to the
general pool
- `ready_worker_count`: The count of workers available workers to be
used including overflow workers. Workers in this count may or may
not be checked out.
- `checked_out_worker_count`: The count of workers that are currently
checked out
- `overflow_worker_count`: The count of active overflow workers
- `waiting_request_count`: The backlog of requests waiting to checkout
a worker


## Authors

Expand Down
114 changes: 102 additions & 12 deletions src/poolboy.erl
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
%% Poolboy - A hunky Erlang worker pool factory

-module(poolboy).
-behaviour(gen_server).

-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2,
transaction/3, child_spec/2, child_spec/3, start/1, start/2,
start_link/1, start_link/2, stop/1, status/1]).
start_link/1, start_link/2, stop/1, status/1, full_status/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export_type([pool/0]).
Expand All @@ -32,11 +31,13 @@
supervisor :: pid(),
workers :: [pid()],
waiting :: pid_queue(),
workers_to_reap :: ets:tid(),
monitors :: ets:tid(),
size = 5 :: non_neg_integer(),
overflow = 0 :: non_neg_integer(),
max_overflow = 10 :: non_neg_integer(),
strategy = lifo :: lifo | fifo
strategy = lifo :: lifo | fifo,
overflow_ttl = 0 :: non_neg_integer()
}).

-spec checkout(Pool :: pool()) -> pid().
Expand Down Expand Up @@ -122,11 +123,17 @@ stop(Pool) ->
status(Pool) ->
gen_server:call(Pool, status).

-spec full_status(Pool :: pool()) -> proplists:proplist().
full_status(Pool) ->
gen_server:call(Pool, full_status).

init({PoolArgs, WorkerArgs}) ->
process_flag(trap_exit, true),
Waiting = queue:new(),
Monitors = ets:new(monitors, [private]),
init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}).
WorkersToReap = ets:new(workers_to_reap, [private]),
init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors,
workers_to_reap = WorkersToReap}).

init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) ->
{ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs),
Expand All @@ -139,6 +146,8 @@ init([{strategy, lifo} | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State#state{strategy = lifo});
init([{strategy, fifo} | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State#state{strategy = fifo});
init([{overflow_ttl, OverflowTtl} | Rest], WorkerArgs, State) when is_integer(OverflowTtl) ->
init(Rest, WorkerArgs, State#state{overflow_ttl = OverflowTtl});
init([_ | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State);
init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->
Expand Down Expand Up @@ -184,6 +193,11 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) ->
overflow = Overflow,
max_overflow = MaxOverflow} = State,
case Workers of
[Pid | Left] when State#state.overflow_ttl > 0 ->
MRef = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
ok = cancel_worker_reap(State, Pid),
{reply, Pid, State#state{workers = Left}};
[Pid | Left] ->
MRef = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
Expand All @@ -204,8 +218,32 @@ handle_call(status, _From, State) ->
#state{workers = Workers,
monitors = Monitors,
overflow = Overflow} = State,
CheckedOutWorkers = ets:info(Monitors, size),
StateName = state_name(State),
{reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State};
{reply, {StateName, length(Workers), Overflow, CheckedOutWorkers}, State};
handle_call(full_status, _From, State) ->
#state{workers = Workers,
size = Size,
monitors = Monitors,
overflow = Overflow,
max_overflow = MaxOverflow,
supervisor = Sup,
waiting = Waiting } = State,
CheckedOutWorkers = ets:info(Monitors, size),
{reply,
[
{size, Size}, % The permanent worker size
{max_overflow, MaxOverflow}, % The overflow size
% The maximum amount of worker is size + overflow_size

{total_worker_count, length(supervisor:which_children(Sup))}, % The total of all workers
{ready_worker_count, length(Workers)}, % Number of workers ready to use
{overflow_worker_count, Overflow}, % Number of overflow workers
{checked_out_worker_count, CheckedOutWorkers}, % Number of workers currently checked out
{waiting_request_count, queue:len(Waiting)} % Number of waiting requests
],
State
};
handle_call(get_avail_workers, _From, State) ->
Workers = State#state.workers,
{reply, Workers, State};
Expand Down Expand Up @@ -235,23 +273,29 @@ handle_info({'DOWN', MRef, _, _, _}, State) ->
end;
handle_info({'EXIT', Pid, _Reason}, State) ->
#state{supervisor = Sup,
monitors = Monitors} = State,
monitors = Monitors,
workers = Workers} = State,
ok = cancel_worker_reap(State, Pid),
case ets:lookup(Monitors, Pid) of
[{Pid, _, MRef}] ->
true = erlang:demonitor(MRef),
true = ets:delete(Monitors, Pid),
NewState = handle_worker_exit(Pid, State),
{noreply, NewState};
[] ->
case lists:member(Pid, State#state.workers) of
case lists:member(Pid, Workers) of
true ->
W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers),
W = lists:filter(fun (P) -> P =/= Pid end, Workers),
{noreply, State#state{workers = [new_worker(Sup) | W]}};
false ->
{noreply, State}
end
end;

handle_info({reap_worker, Pid}, State)->
#state{workers_to_reap = WorkersToReap} = State,
true = ets:delete(WorkersToReap, Pid),
NewState = purge_worker(Pid, State),
{noreply, NewState};
handle_info(_Info, State) ->
{noreply, State}.

Expand Down Expand Up @@ -285,6 +329,35 @@ dismiss_worker(Sup, Pid) ->
true = unlink(Pid),
supervisor:terminate_child(Sup, Pid).

cancel_worker_reap(State, Pid) ->
case ets:lookup(State#state.workers_to_reap, Pid) of
[{Pid, Tref}] ->
erlang:cancel_timer(Tref),
true = ets:delete(State#state.workers_to_reap, Pid),
ok;
[] ->
ok
end,
receive
{reap_worker, Pid} ->
ok
after 0 ->
ok
end.

purge_worker(Pid, State) ->
#state{supervisor = Sup,
workers = Workers,
overflow = Overflow} = State,
case Overflow > 0 of
true ->
W = lists:filter(fun (P) -> P =/= Pid end, Workers),
ok = dismiss_worker(Sup, Pid),
State#state{workers = W, overflow = Overflow -1};
false ->
State
end.

prepopulate(N, _Sup) when N < 1 ->
[];
prepopulate(N, Sup) ->
Expand All @@ -300,12 +373,23 @@ handle_checkin(Pid, State) ->
waiting = Waiting,
monitors = Monitors,
overflow = Overflow,
strategy = Strategy} = State,
strategy = Strategy,
overflow_ttl = OverflowTtl,
workers_to_reap = WorkersToReap} = State,
case queue:out(Waiting) of
{{value, {From, CRef, MRef}}, Left} ->
true = ets:insert(Monitors, {Pid, CRef, MRef}),
gen_server:reply(From, Pid),
State#state{waiting = Left};
{empty, Empty} when Overflow > 0, OverflowTtl > 0 ->
Tref =
erlang:send_after(OverflowTtl, self(), {reap_worker, Pid}),
true = ets:insert(WorkersToReap, {Pid, Tref}),
Workers = case Strategy of
lifo -> [Pid | State#state.workers];
fifo -> State#state.workers ++ [Pid]
end,
State#state{workers = Workers, waiting = Empty};
{empty, Empty} when Overflow > 0 ->
ok = dismiss_worker(Sup, Pid),
State#state{waiting = Empty, overflow = Overflow - 1};
Expand Down Expand Up @@ -343,7 +427,13 @@ state_name(State = #state{overflow = Overflow}) when Overflow < 1 ->
true -> overflow;
false -> ready
end;
state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) ->
full;
state_name(State = #state{overflow = Overflow}) when Overflow > 0 ->
#state{max_overflow = MaxOverflow, workers = Workers, overflow = Overflow} = State,
NumberOfWorkers = length(Workers),
case MaxOverflow == Overflow of
true when NumberOfWorkers > 0 -> ready;
true -> full;
false -> overflow
end;
state_name(_State) ->
overflow.
Loading