Skip to content

Commit

Permalink
Add wildcard matching in resource counters (#128)
Browse files Browse the repository at this point in the history
* Add wildcard matching in resource counters

* Disallow wildcard names in 'r' objects. Add tests
If 'r' objects have a name with a '\\_' in the last element, things
go badly, so we disallow this. This led to key pattern checks in all
functions that effect registrations. Also ensure that `mreg/2' does
this.

* spawn_monitor/2 only exists in OTP 23

* Apply suggestions from code review

Co-authored-by: Tino Breddin <tolbrino@users.noreply.github.com>

Co-authored-by: Tino Breddin <tolbrino@users.noreply.github.com>
  • Loading branch information
uwiger and tolbrino authored Sep 18, 2020
1 parent 112dbcb commit 378bd27
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 42 deletions.
54 changes: 33 additions & 21 deletions src/gproc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ is_string(S) ->
%% @equiv reg(Key, default(Key), [])
%% @end
reg(Key) ->
?CATCH_GPROC_ERROR(reg1(Key), [Key]).
?CATCH_GPROC_ERROR(reg1(valid_key(Key)), [Key]).

reg1(Key) ->
reg1(Key, default(Key), [], reg).
Expand All @@ -632,11 +632,14 @@ reg1(Key) ->
%% @equiv reg_or_locate(Key, default(Key))
%% @end
reg_or_locate(Key) ->
?CATCH_GPROC_ERROR(reg_or_locate1(Key), [Key]).
?CATCH_GPROC_ERROR(reg_or_locate1(valid_key(Key)), [Key]).

reg_or_locate1(Key) ->
reg_or_locate1(Key, default(Key), self()).

valid_key(Key) ->
gproc_lib:valid_key(Key).

default({T,_,_}) when T==c -> 0;
default(_) -> undefined.

Expand Down Expand Up @@ -961,7 +964,7 @@ demonitor1(_, _) ->
%%
%%
reg(Key, Value) ->
?CATCH_GPROC_ERROR(reg1(Key, Value, [], reg), [Key, Value]).
?CATCH_GPROC_ERROR(reg1(valid_key(Key), Value, [], reg), [Key, Value]).

%% @spec reg(Key::key(), Value::value(), Attrs::attrs()) -> true
%%
Expand All @@ -984,10 +987,16 @@ reg(Key, Value) ->
%% reflects the sum of all counter objects with the same name in the given
%% scope. The initial value for an aggregated counter must be `undefined'.
%% * `r' - 'resource property', behaves like a property, but can be tracked
%% with a 'resource counter'.
%% with a 'resource counter'. Note that using an `rc' wildcard name
%% pattern (see below) for a resource property is not allowed.
%% * `rc' - 'resource counter', tracks the number of resource properties
%% with the same name. When the resource count reaches `0', any triggers
%% specified using an `on_zero' attribute may be executed (see below).
%% If `Name' is a tuple, the last element of the name can contain a
%% wildcard, using the symbol <code>'\\_'</code>. This will make the resource
%% counter keep track of any resources where all elements match except
%% the last position. For example, <code>{rc,l,{a,b,'\\_'}}</code> would keep
%% track of both `{r,l,{a,b,1}}' and `{r,l,{a,b,2}}'.
%%
%% On-zero triggers:
%%
Expand All @@ -1001,16 +1010,16 @@ reg(Key, Value) ->
%% `{Type, Context, Name}'
%% @end
reg(Key, Value, Attrs) ->
?CATCH_GPROC_ERROR(reg1(Key, Value, Attrs, reg), [Key, Value, Attrs]).
?CATCH_GPROC_ERROR(reg1(valid_key(Key), Value, Attrs, reg), [Key, Value, Attrs]).

%% @equiv ensure_reg(Key, default(Key), [])
ensure_reg(Key) ->
?CATCH_GPROC_ERROR(reg1(Key, ensure), [Key]).
?CATCH_GPROC_ERROR(reg1(valid_key(Key), ensure), [Key]).

%% @equiv ensure_reg(Key, Value, [])
-spec ensure_reg(key(), value()) -> new | updated.
ensure_reg(Key, Value) ->
?CATCH_GPROC_ERROR(reg1(Key, Value, ensure), [Key, Value]).
?CATCH_GPROC_ERROR(reg1(valid_key(Key), Value, ensure), [Key, Value]).

%% @spec ensure_reg(Key::key(), Value::value(), Attrs::attrs()) ->
%% new | updated
Expand All @@ -1025,7 +1034,7 @@ ensure_reg(Key, Value) ->
%% @end
-spec ensure_reg(key(), value(), attrs()) -> new | updated.
ensure_reg(Key, Value, Attrs) ->
?CATCH_GPROC_ERROR(reg1(Key, Value, Attrs, ensure), [Key, Value, Attrs]).
?CATCH_GPROC_ERROR(reg1(valid_key(Key), Value, Attrs, ensure), [Key, Value, Attrs]).

reg1(Key, Op) ->
reg1(Key, default(Key), [], Op).
Expand Down Expand Up @@ -1054,11 +1063,11 @@ reg1(_, _, _, _) ->

%% @equiv reg_other(Key, Pid, default(Key), [])
reg_other(Key, Pid) ->
?CATCH_GPROC_ERROR(reg_other1(Key, Pid, reg), [Key, Pid]).
?CATCH_GPROC_ERROR(reg_other1(valid_key(Key), Pid, reg), [Key, Pid]).

%% @equiv reg_other(Key, Pid, Value, [])
reg_other(Key, Pid, Value) ->
?CATCH_GPROC_ERROR(reg_other1(Key, Pid, Value, [], reg), [Key, Pid, Value]).
?CATCH_GPROC_ERROR(reg_other1(valid_key(Key), Pid, Value, [], reg), [Key, Pid, Value]).

%% @spec reg_other(Key, Pid, Value, Attrs) -> true
%% @doc Register name or property to another process.
Expand All @@ -1078,15 +1087,15 @@ reg_other(Key, Pid, Value) ->
%% * `rc' - resource counters
%% @end
reg_other(Key, Pid, Value, Attrs) ->
?CATCH_GPROC_ERROR(reg_other1(Key, Pid, Value, Attrs, reg),
?CATCH_GPROC_ERROR(reg_other1(valid_key(Key), Pid, Value, Attrs, reg),
[Key, Pid, Value, Attrs]).

ensure_reg_other(Key, Pid) ->
?CATCH_GPROC_ERROR(reg_other1(Key, Pid, ensure), [Key, Pid]).
?CATCH_GPROC_ERROR(reg_other1(valid_key(Key), Pid, ensure), [Key, Pid]).

%% @equiv ensure_reg_other(Key, Pid, Value, [])
ensure_reg_other(Key, Pid, Value) ->
?CATCH_GPROC_ERROR(reg_other1(Key, Pid, Value, [], ensure),
?CATCH_GPROC_ERROR(reg_other1(valid_key(Key), Pid, Value, [], ensure),
[Key, Pid, Value]).

%% @spec ensure_reg_other(Key::key(), Pid::pid(),
Expand All @@ -1099,7 +1108,7 @@ ensure_reg_other(Key, Pid, Value) ->
%% process instead of the current process. Also see {@link ensure_reg/3}.
%% @end
ensure_reg_other(Key, Pid, Value, Attrs) ->
?CATCH_GPROC_ERROR(reg_other1(Key, Pid, Value, Attrs, ensure),
?CATCH_GPROC_ERROR(reg_other1(valid_key(Key), Pid, Value, Attrs, ensure),
[Key, Pid, Value, Attrs]).

reg_other1(Key, Pid, Op) ->
Expand All @@ -1124,7 +1133,7 @@ reg_other1({T,l,_} = Key, Pid, Value, As, Op) when is_pid(Pid) ->
%% the current registration is returned instead.
%% @end
reg_or_locate(Key, Value) ->
?CATCH_GPROC_ERROR(reg_or_locate1(Key, Value, self()), [Key, Value]).
?CATCH_GPROC_ERROR(reg_or_locate1(valid_key(Key), Value, self()), [Key, Value]).

%% @spec reg_or_locate(Key::key(), Value, Fun::fun()) -> {pid(), NewValue}
%%
Expand All @@ -1139,7 +1148,7 @@ reg_or_locate(Key, Value) ->
%% process is set to the group_leader of the calling process.
%% @end
reg_or_locate({n,_,_} = Key, Value, F) when is_function(F, 0) ->
?CATCH_GPROC_ERROR(reg_or_locate1(Key, Value, F), [Key, Value, F]).
?CATCH_GPROC_ERROR(reg_or_locate1(valid_key(Key), Value, F), [Key, Value, F]).

reg_or_locate1({_,g,_} = Key, Value, P) ->
?CHK_DIST,
Expand All @@ -1157,7 +1166,7 @@ reg_or_locate1(_, _, _) ->
%% `reg_shared({a,l,A}) -> reg_shared({a,l,A}, undefined).'
%% @end
reg_shared(Key) ->
?CATCH_GPROC_ERROR(reg_shared1(Key), [Key]).
?CATCH_GPROC_ERROR(reg_shared1(valid_key(Key)), [Key]).

%% @private
reg_shared1({T,_,_} = Key) when T==a; T==p; T==c ->
Expand All @@ -1179,10 +1188,10 @@ reg_shared1({T,_,_} = Key) when T==a; T==p; T==c ->
%% @end
%%
reg_shared(Key, Value) ->
?CATCH_GPROC_ERROR(reg_shared1(Key, Value, []), [Key, Value]).
?CATCH_GPROC_ERROR(reg_shared1(valid_key(Key), Value, []), [Key, Value]).

reg_shared(Key, Value, Attrs) when is_list(Attrs) ->
?CATCH_GPROC_ERROR(reg_shared1(Key, Value, Attrs), [Key, Value, Attrs]).
?CATCH_GPROC_ERROR(reg_shared1(valid_key(Key), Value, Attrs), [Key, Value, Attrs]).

%% @private
reg_shared1({_,g,_} = Key, Value, As) ->
Expand Down Expand Up @@ -1214,7 +1223,7 @@ mreg(T, C, KVL) ->
mreg1(T, g, KVL) ->
?CHK_DIST,
gproc_dist:mreg(T, KVL);
mreg1(T, l, KVL) when T==a; T==n ->
mreg1(T, l, KVL) when T==p; T==n; T==a; T==r ->
if is_list(KVL) ->
call({mreg, T, l, KVL});
true ->
Expand Down Expand Up @@ -2416,7 +2425,10 @@ handle_call({mreg, T, l, L}, {Pid,_}, S) ->
{true,_} -> {reply, true, S};
false -> {reply, badarg, S}
catch
error:_ -> {reply, badarg, S}
throw:?GPROC_THROW(_) ->
{reply, badarg, S};
error:_ ->
{reply, badarg, S}
end;
handle_call({munreg, T, l, L}, {Pid,_}, S) ->
_ = gproc_lib:remove_many(T, l, L, Pid),
Expand Down
27 changes: 20 additions & 7 deletions src/gproc_dist.erl
Original file line number Diff line number Diff line change
Expand Up @@ -572,10 +572,10 @@ handle_leader_call({Unreg, {T,g,Name} = K, Pid}, _From, S, _E)
{reply, true, [{delete, [{K,Pid}, {Pid,K}]}], S}
end;
T == r ->
case ets:lookup(?TAB, {{rc,g,Name},rc}) of
[RC] ->
case lookup_resource_counters(Name) of
[_|_] = RCs ->
{reply, true, [{delete,[{K,Pid}, {Pid,K}]},
{insert, [RC]}], S};
{insert, RCs}], S};
[] ->
{reply, true, [{delete, [{K,Pid}, {Pid, K}]}], S}
end;
Expand Down Expand Up @@ -617,12 +617,15 @@ handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
{reply, badarg, S}
end;
handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
if T==p; T==n; T==r ->
if T==p; T==n; T==a; T==r ->
try gproc_lib:insert_many(T, g, L, Pid) of
{true,Objs} -> {reply, true, [{insert,Objs}], S};
false -> {reply, badarg, S}
catch
error:_ -> {reply, badarg, S}
throw:?GPROC_THROW(_) ->
{reply, badarg, S};
error:_ ->
{reply, badarg, S}
end;
true -> {reply, badarg, S}
end;
Expand Down Expand Up @@ -751,7 +754,7 @@ mk_broadcast_insert_vals(Objs) ->
[{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})]
++ ets:lookup(?TAB, {Pid,K});
C == r ->
[{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{rc,g,Name},rc})]
[{{K,Pid},Pid,Value} | lookup_resource_counters(Name)]
++ ets:lookup(?TAB, {Pid, K});
C == n ->
[{{K,n},Pid,Value}| ets:lookup(?TAB, {Pid,K})];
Expand Down Expand Up @@ -1085,7 +1088,17 @@ decrement_resource_count({r,g,Rsrc}, Acc) ->
true ->
%% Call the lib function, which might trigger events
gproc_lib:decrement_resource_count(g, Rsrc),
ets:lookup(?TAB, Key) ++ Acc
lookup_resource_counters(Rsrc) ++ Acc
end.

lookup_resource_counters(K) ->
case is_tuple(K) of
true ->
ets:lookup(?TAB, {{rc,g,K}, rc})
++ ets:lookup(
?TAB, {{rc,g,setelement(size(K),K,'\\_')}, rc});
false ->
ets:lookup(?TAB, {{rc,g,K}, rc})
end.

pid_to_give_away_to(P) when is_pid(P) ->
Expand Down
6 changes: 4 additions & 2 deletions src/gproc_int.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
-define(CATCH_GPROC_ERROR(Expr, Args),
try Expr
catch
throw:{gproc_error, GprocError} ->
throw:?GPROC_THROW(GprocError) ->
erlang:error(GprocError, Args)
end).

-define(THROW_GPROC_ERROR(E), throw({gproc_error, E})).
-define(GPROC_THROW(E), {gproc_error, E}).

-define(THROW_GPROC_ERROR(E), throw(?GPROC_THROW(E))).

%% Used to wrap operations that may fail, but we ignore the exception.
%% Use instead of catch, to avoid building a stacktrace unnecessarily.
Expand Down
60 changes: 52 additions & 8 deletions src/gproc_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
update_aggr_counter/3,
update_counter/3,
decrement_resource_count/2,
valid_opts/2]).
valid_opts/2,
valid_key/1]).

-export([dbg/1]).

Expand Down Expand Up @@ -79,7 +80,12 @@ insert_reg({T,_,Name} = K, Value, Pid, Scope, Event) when T==a; T==n; T==rc ->
false ->
maybe_waiters(K, Pid, Value, T, Event)
end,
maybe_scan(T, Pid, Scope, Name, K),
case Res of
true ->
maybe_scan(T, Pid, Scope, Name, K);
false ->
false
end,
Res;
insert_reg({p,Scope,_} = K, Value, shared, Scope, _E)
when Scope == g; Scope == l ->
Expand Down Expand Up @@ -161,6 +167,13 @@ get_attr(Attr, Pid, {_,_,_} = Key, Default) ->
{true,list()} | false.

insert_many(T, Scope, KVL, Pid) ->
try insert_many_(T, Scope, KVL, Pid)
catch
throw:?GPROC_THROW(_) ->
false
end.

insert_many_(T, Scope, KVL, Pid) ->
Objs = mk_reg_objs(T, Scope, Pid, KVL),
case ets:insert_new(?TAB, Objs) of
true ->
Expand Down Expand Up @@ -315,20 +328,34 @@ does_pid_monitor(Pid, Opts) ->

mk_reg_objs(T, Scope, Pid, L) when T==n; T==a; T==rc ->
lists:map(fun({K,V}) ->
{{{T,Scope,K},T}, Pid, V};
Key = {T, Scope, K},
_ = valid_key(Key),
{{Key,T}, Pid, V};
(_) ->
erlang:error(badarg)
?THROW_GPROC_ERROR(badarg)
end, L);
mk_reg_objs(p = T, Scope, Pid, L) ->
mk_reg_objs(T, Scope, Pid, L) when T==p; T==r ->
lists:map(fun({K,V}) ->
{{{T,Scope,K},Pid}, Pid, V};
Key = {T, Scope, K},
_ = valid_key(Key),
{{Key,Pid}, Pid, V};
(_) ->
erlang:error(badarg)
?THROW_GPROC_ERROR(badarg)
end, L).

mk_reg_rev_objs(T, Scope, Pid, L) ->
[{{Pid,{T,Scope,K}}, []} || {K,_} <- L].

valid_key({r,_,R} = Key) when is_tuple(R) ->
case element(tuple_size(R), R) of
'\\_' ->
%% Cannot allow this, since '\\_' is a wildcard for resources
?THROW_GPROC_ERROR(badarg);
_ ->
Key
end;
valid_key(Key) ->
Key.

ensure_monitor(shared, _) ->
ok;
Expand Down Expand Up @@ -557,6 +584,14 @@ decrement_resource_count(C, N) ->
update_resource_count(C, N, -1).

update_resource_count(C, N, Val) ->
update_resource_count_(C, N, Val),
case is_tuple(N) of
true -> update_resource_count_(
C, setelement(size(N), N, '\\_'), Val);
false -> ok
end.

update_resource_count_(C, N, Val) ->
try ets:update_counter(?TAB, {{rc,C,N},rc}, {3, Val}) of
0 ->
resource_count_zero(C, N);
Expand Down Expand Up @@ -615,9 +650,18 @@ scan_existing_counters(Ctxt, Name) ->
lists:sum(Cs).

scan_existing_resources(Ctxt, Name) ->
Head = {{{r,Ctxt,Name},'_'},'_','_'},
Head = {{{r,Ctxt,adjust_wild(Name)},'_'},'_','_'},
ets:select_count(?TAB, [{Head, [], [true]}]).

adjust_wild(N) when is_tuple(N) ->
Sz = size(N),
case element(Sz, N) of
'\\_' -> setelement(Sz, N, '_');
_ -> N
end;
adjust_wild(N) ->
N.

valid_opts(Type, Default) ->
Opts = get_app_env(Type, Default),
check_opts(Type, Opts).
Expand Down
Loading

0 comments on commit 378bd27

Please sign in to comment.