3636 | {running , proplists :proplist ()}
3737 | {terminated , term ()}.
3838-type blocked_status () :: running | flow | blocked .
39+ -type shovel_status () :: blocked_status () | ignore .
3940
4041-type name () :: binary () | {rabbit_types :vhost (), binary ()}.
4142-type type () :: static | dynamic .
42- -type status_tuple () :: {name (), type (), info (), calendar :datetime ()}.
43+ -type metrics () :: #{remaining := rabbit_types :option (non_neg_integer ()) | unlimited ,
44+ remaining_unacked := rabbit_types :option (non_neg_integer ()),
45+ pending := rabbit_types :option (non_neg_integer ()),
46+ forwarded := rabbit_types :option (non_neg_integer ())
47+ } | #{}.
48+ -type status_tuple () :: {name (), type (), info (), metrics (), calendar :datetime ()}.
4349
44- -export_type ([info / 0 , blocked_status / 0 ]).
50+ -export_type ([info / 0 , blocked_status / 0 , shovel_status / 0 , metrics / 0 ]).
4551
4652-record (state , {timer }).
4753-record (entry , {name :: name (),
4854 type :: type (),
4955 info :: info (),
5056 blocked_status = running :: blocked_status (),
5157 blocked_at :: integer () | undefined ,
58+ metrics = #{} :: metrics (),
59+
5260 timestamp :: calendar :datetime ()}).
5361
5462start_link () ->
@@ -58,7 +66,7 @@ start_link() ->
5866report (Name , Type , Info ) ->
5967 gen_server :cast (? SERVER , {report , Name , Type , Info , calendar :local_time ()}).
6068
61- -spec report_blocked_status (name (), blocked_status ()) -> ok .
69+ -spec report_blocked_status (name (), { blocked_status (), metrics ()} | blocked_status ()) -> ok .
6270report_blocked_status (Name , Status ) ->
6371 gen_server :cast (? SERVER , {report_blocked_status , Name , Status , erlang :monotonic_time ()}).
6472
@@ -112,6 +120,7 @@ handle_call(status, _From, State) ->
112120 {reply , [{Entry # entry .name ,
113121 Entry # entry .type ,
114122 blocked_status_to_info (Entry ),
123+ Entry # entry .metrics ,
115124 Entry # entry .timestamp }
116125 || Entry <- Entries ], State };
117126
@@ -120,6 +129,7 @@ handle_call({lookup, Name}, _From, State) ->
120129 [Entry ] -> [{name , Name },
121130 {type , Entry # entry .type },
122131 {info , blocked_status_to_info (Entry )},
132+ {metrics , Entry # entry .metrics },
123133 {timestamp , Entry # entry .timestamp }];
124134 [] -> not_found
125135 end ,
@@ -141,6 +151,18 @@ handle_cast({report, Name, Type, Info, Timestamp}, State) ->
141151 split_name (Name ) ++ split_status (Info )),
142152 {noreply , State };
143153
154+ handle_cast ({report_blocked_status , Name , {Status , Metrics }, Timestamp }, State ) ->
155+ case Status of
156+ flow ->
157+ true = ets :update_element (? ETS_NAME , Name , [{# entry .blocked_status , flow },
158+ {# entry .metrics , Metrics },
159+ {# entry .blocked_at , Timestamp }]);
160+ _ ->
161+ true = ets :update_element (? ETS_NAME , Name , [{# entry .blocked_status , Status },
162+ {# entry .metrics , Metrics }])
163+ end ,
164+ {noreply , State };
165+ % % used in tests
144166handle_cast ({report_blocked_status , Name , Status , Timestamp }, State ) ->
145167 case Status of
146168 flow ->
@@ -178,22 +200,22 @@ code_change(_OldVsn, State, _Extra) ->
178200inject_node_info (Node , Shovels ) ->
179201 lists :map (
180202 % % starting
181- fun ({Name , Type , State , Timestamp }) when is_atom (State ) ->
203+ fun ({Name , Type , State , Metrics , Timestamp }) when is_atom (State ) ->
182204 Opts = [{node , Node }],
183- {Name , Type , {State , Opts }, Timestamp };
205+ {Name , Type , {State , Opts }, Metrics , Timestamp };
184206 % % terminated
185- ({Name , Type , {terminated , Reason }, Timestamp }) ->
186- {Name , Type , {terminated , Reason }, Timestamp };
207+ ({Name , Type , {terminated , Reason }, Metrics , Timestamp }) ->
208+ {Name , Type , {terminated , Reason }, Metrics , Timestamp };
187209 % % running
188- ({Name , Type , {State , Opts }, Timestamp }) ->
210+ ({Name , Type , {State , Opts }, Metrics , Timestamp }) ->
189211 Opts1 = Opts ++ [{node , Node }],
190- {Name , Type , {State , Opts1 }, Timestamp }
212+ {Name , Type , {State , Opts1 }, Metrics , Timestamp }
191213 end , Shovels ).
192214
193215-spec find_matching_shovel (rabbit_types :vhost (), binary (), [status_tuple ()]) -> status_tuple () | undefined .
194216find_matching_shovel (VHost , Name , Shovels ) ->
195217 case lists :filter (
196- fun ({{V , S }, _Kind , _Status , _ }) ->
218+ fun ({{V , S }, _Kind , _Status , _Metrics , _ }) ->
197219 VHost =:= V andalso Name =:= S
198220 end , Shovels ) of
199221 [] -> undefined ;
0 commit comments