Skip to content

Commit c739557

Browse files
committed
Add rabbitmq-streams reset_offset command
A user can set the stored offset for a stream/reference couple to 0. This way a consumer can keep the same name and re-attach to the beginning of a stream. References #14124
1 parent 4390bb6 commit c739557

7 files changed

+266
-55
lines changed

deps/rabbit/docs/rabbitmq-streams.8

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ for each group:
382382
.It Cm list_stream_group_consumers Fl -stream Ar stream Fl -reference Ar reference Oo Fl -vhost Ar vhost Oc Op Ar consumerinfoitem ...
383383
.Pp
384384
Lists consumers of a stream consumer group in a vhost.
385+
.Bl -tag -width Ds
385386
.It Ar stream
386387
The stream the consumers are attached to.
387388
.It Ar reference
@@ -420,7 +421,14 @@ For example, this command displays the connection name and state
420421
for each consumer attached to the stream-1 stream and belonging to the stream-1 group:
421422
.sp
422423
.Dl rabbitmq-streams list_stream_group_consumers --stream stream-1 --reference stream-1 connection_name state
423-
.El
424+
.\" ------------------------------------------------------------------
425+
.It Cm reset_offset Fl -stream Ar stream Fl -reference Ar reference Oo Fl -vhost Ar vhost Oc
426+
.Pp
427+
Reset the stored offset for a consumer name on a stream.
428+
.Pp
429+
Example:
430+
.Sp
431+
.Dl rabbitmq-streams reset_offset --stream stream --reference app-1
424432
.\" ------------------------------------------------------------------
425433
.Sh SEE ALSO
426434
.\" ------------------------------------------------------------------
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 2.0 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at https://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
15+
16+
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand').
17+
18+
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
19+
-include_lib("rabbitmq_stream/src/rabbit_stream_utils.hrl").
20+
21+
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
22+
23+
-export([formatter/0,
24+
scopes/0,
25+
switches/0,
26+
aliases/0,
27+
usage/0,
28+
usage_additional/0,
29+
usage_doc_guides/0,
30+
banner/2,
31+
validate/2,
32+
merge_defaults/2,
33+
run/2,
34+
output/2,
35+
description/0,
36+
help_section/0]).
37+
38+
formatter() ->
39+
'Elixir.RabbitMQ.CLI.Formatters.String'.
40+
41+
scopes() ->
42+
[streams].
43+
44+
switches() ->
45+
[{stream, string}, {reference, string}].
46+
47+
aliases() ->
48+
[].
49+
50+
description() ->
51+
<<"Reset the stored offset for a consumer name on a stream">>.
52+
53+
help_section() ->
54+
{plugin, stream}.
55+
56+
validate([], #{stream := _, reference := R}) when ?IS_INVALID_REF(R) ->
57+
{validation_failure, reference_too_long};
58+
validate([], #{stream := _, reference := _}) ->
59+
ok;
60+
validate(Args, _) when is_list(Args) andalso length(Args) > 0 ->
61+
{validation_failure, too_many_args};
62+
validate(_, _) ->
63+
{validation_failure, not_enough_args}.
64+
65+
merge_defaults(Args, Opts) ->
66+
{Args, maps:merge(#{vhost => <<"/">>}, Opts)}.
67+
68+
usage() ->
69+
<<"reset_offset --stream <stream> "
70+
"--reference <reference> [--vhost <vhost>]">>.
71+
72+
usage_additional() ->
73+
<<"">>.
74+
75+
usage_doc_guides() ->
76+
[?STREAMS_GUIDE_URL].
77+
78+
run(_,
79+
#{node := NodeName,
80+
vhost := VHost,
81+
stream := Stream,
82+
reference := Reference,
83+
timeout := Timeout}) ->
84+
rabbit_misc:rpc_call(NodeName,
85+
rabbit_stream_manager,
86+
reset_offset,
87+
[VHost, Stream, Reference],
88+
Timeout).
89+
90+
banner(_, _) ->
91+
<<"Resetting stored offset ...">>.
92+
93+
output(ok, _Opts) ->
94+
'Elixir.RabbitMQ.CLI.DefaultOutput':output({ok, <<"OK">>});
95+
output({validation_failure, reference_too_long}, _Opts) ->
96+
'Elixir.RabbitMQ.CLI.DefaultOutput':output({error,
97+
<<"The reference is too long">>});
98+
output({error, not_found}, _Opts) ->
99+
'Elixir.RabbitMQ.CLI.DefaultOutput':output({error,
100+
<<"The stream does not exist">>});
101+
output({error, not_available}, _Opts) ->
102+
'Elixir.RabbitMQ.CLI.DefaultOutput':output({error,
103+
<<"The stream is not available">>});
104+
output(R, _Opts) ->
105+
'Elixir.RabbitMQ.CLI.DefaultOutput':output(R).
106+

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
-include_lib("rabbit_common/include/rabbit_framing.hrl").
2222
-include_lib("rabbit_common/include/rabbit.hrl").
2323
-include_lib("rabbit/include/amqqueue.hrl").
24+
-include_lib("rabbitmq_stream/src/rabbit_stream_utils.hrl").
2425

2526
%% API
2627
-export([create/4,
@@ -33,7 +34,8 @@
3334
topology/2,
3435
route/3,
3536
partitions/2,
36-
partition_index/3]).
37+
partition_index/3,
38+
reset_offset/3]).
3739

3840
-spec create(binary(), binary(), #{binary() => binary()}, binary()) ->
3941
{ok, map()} |
@@ -396,6 +398,21 @@ partition_index(VirtualHost, SuperStream, Stream) ->
396398
{error, stream_not_found}
397399
end.
398400

401+
-spec reset_offset(binary(), binary(), binary()) ->
402+
ok |
403+
{error, not_available | not_found | {validation_failed, term()}}.
404+
reset_offset(_, _, Ref) when ?IS_INVALID_REF(Ref) ->
405+
{error, {validation_failed,
406+
rabbit_misc:format("Reference is too long to store offset: ~p",
407+
[byte_size(Ref)])}};
408+
reset_offset(VH, S, Ref) ->
409+
case lookup_leader(VH, S) of
410+
{ok, P} ->
411+
osiris:write_tracking(P, Ref, {offset, 0});
412+
R ->
413+
R
414+
end.
415+
399416
stream_queue_arguments(Arguments) ->
400417
stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}],
401418
Arguments).

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
-behaviour(gen_statem).
2121

22+
-include("rabbit_stream_utils.hrl").
2223
-include("rabbit_stream_reader.hrl").
2324
-include("rabbit_stream_metrics.hrl").
2425

@@ -80,7 +81,6 @@
8081
peer_cert_validity]).
8182
-define(UNKNOWN_FIELD, unknown_field).
8283
-define(SILENT_CLOSE_DELAY, 3_000).
83-
-define(IS_INVALID_REF(Ref), is_binary(Ref) andalso byte_size(Ref) > 255).
8484
-define(SAC_MOD, rabbit_stream_sac_coordinator).
8585

8686
-import(rabbit_stream_utils, [check_write_permitted/2,
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% at https://www.mozilla.org/en-US/MPL/2.0/
3+
%%
4+
%% Software distributed under the License is distributed on an "AS IS"
5+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
6+
%% the License for the specific language governing rights and
7+
%% limitations under the License.
8+
%%
9+
%% The Original Code is RabbitMQ.
10+
%%
11+
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
12+
%% Copyright (c) 2025 Broadcom. All Rights Reserved.
13+
%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
14+
%%
15+
16+
-define(IS_INVALID_REF(Ref), is_binary(Ref) andalso byte_size(Ref) > 255).

deps/rabbitmq_stream/test/commands_SUITE.erl

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand').
3636
-define(COMMAND_ACTIVATE_STREAM_CONSUMER,
3737
'Elixir.RabbitMQ.CLI.Ctl.Commands.ActivateStreamConsumerCommand').
38-
38+
-define(COMMAND_RESET_OFFSET,
39+
'Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand').
3940

4041
all() ->
4142
[{group, list_connections},
@@ -45,6 +46,7 @@ all() ->
4546
{group, list_group_consumers},
4647
{group, activate_consumer},
4748
{group, list_stream_tracking},
49+
{group, reset_offset},
4850
{group, super_streams}].
4951

5052
groups() ->
@@ -67,6 +69,9 @@ groups() ->
6769
{list_stream_tracking, [],
6870
[list_stream_tracking_validate, list_stream_tracking_merge_defaults,
6971
list_stream_tracking_run]},
72+
{reset_offset, [],
73+
[reset_offset_validate, reset_offset_merge_defaults,
74+
reset_offset_run]},
7075
{super_streams, [],
7176
[add_super_stream_merge_defaults,
7277
add_super_stream_validate,
@@ -708,6 +713,65 @@ list_stream_tracking_run(Config) ->
708713
close(S, C),
709714
ok.
710715

716+
reset_offset_validate(_) ->
717+
Cmd = ?COMMAND_RESET_OFFSET,
718+
ValidOpts = #{vhost => <<"/">>,
719+
stream => <<"s1">>,
720+
reference => <<"foo">>},
721+
?assertMatch({validation_failure, not_enough_args},
722+
Cmd:validate([], #{})),
723+
?assertMatch({validation_failure, not_enough_args},
724+
Cmd:validate([], #{vhost => <<"test">>})),
725+
?assertMatch({validation_failure, too_many_args},
726+
Cmd:validate([<<"foo">>], ValidOpts)),
727+
?assertMatch({validation_failure, reference_too_long},
728+
Cmd:validate([], ValidOpts#{reference => gen_bin(256)})),
729+
?assertMatch(ok, Cmd:validate([], ValidOpts)),
730+
?assertMatch(ok, Cmd:validate([], ValidOpts#{reference => gen_bin(255)})).
731+
732+
reset_offset_merge_defaults(_Config) ->
733+
Cmd = ?COMMAND_RESET_OFFSET,
734+
Opts = #{vhost => <<"/">>,
735+
stream => <<"s1">>,
736+
reference => <<"foo">>},
737+
?assertEqual({[], Opts},
738+
Cmd:merge_defaults([], maps:without([vhost], Opts))),
739+
Merged = maps:merge(Opts, #{vhost => "vhost"}),
740+
?assertEqual({[], Merged},
741+
Cmd:merge_defaults([], Merged)).
742+
743+
reset_offset_run(Config) ->
744+
Cmd = ?COMMAND_RESET_OFFSET,
745+
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
746+
Opts =#{node => Node,
747+
timeout => 10000,
748+
vhost => <<"/">>},
749+
Args = [],
750+
751+
St = atom_to_binary(?FUNCTION_NAME, utf8),
752+
Ref = <<"foo">>,
753+
OptsGroup = maps:merge(#{stream => St, reference => Ref},
754+
Opts),
755+
756+
%% the stream does not exist yet
757+
?assertMatch({error, not_found},
758+
Cmd:run(Args, OptsGroup)),
759+
760+
Port = rabbit_stream_SUITE:get_stream_port(Config),
761+
{S, C} = start_stream_connection(Port),
762+
create_stream(S, St, C),
763+
764+
?assertEqual(ok, Cmd:run(Args, OptsGroup)),
765+
store_offset(S, St, Ref, 42, C),
766+
767+
check_stored_offset(S, St, Ref, 42, C),
768+
?assertMatch(ok, Cmd:run(Args, OptsGroup)),
769+
check_stored_offset(S, St, Ref, 0, C),
770+
771+
delete_stream_no_metadata_update(S, St, C),
772+
close(S, C),
773+
ok.
774+
711775
add_super_stream_merge_defaults(_Config) ->
712776
?assertMatch({[<<"super-stream">>],
713777
#{partitions := 3, vhost := <<"/">>}},
@@ -1024,6 +1088,10 @@ store_offset(S, Stream, Reference, Value, C) ->
10241088
{error, offset_not_stored}
10251089
end.
10261090

1091+
1092+
check_stored_offset(S, Stream, Reference, Expected, C) ->
1093+
check_stored_offset(S, Stream, Reference, Expected, C, 20).
1094+
10271095
check_stored_offset(_, _, _, _, _, 0) ->
10281096
error;
10291097
check_stored_offset(S, Stream, Reference, Expected, C, Attempt) ->
@@ -1061,3 +1129,5 @@ check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt) ->
10611129
check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt - 1)
10621130
end.
10631131

1132+
gen_bin(L) ->
1133+
list_to_binary(lists:duplicate(L, "a")).

0 commit comments

Comments
 (0)