Skip to content

Commit c1fd7c3

Browse files
committed
Propagate connection state in offset lag calculation test
This should fix some flakes.
1 parent 0d84c8e commit c1fd7c3

File tree

2 files changed

+82
-74
lines changed

2 files changed

+82
-74
lines changed

deps/rabbitmq_ct_helpers/src/stream_test_utils.erl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,17 @@ connect(Config, Node) ->
2121
connect(StreamPort).
2222

2323
connect(StreamPort) ->
24+
do_connect(StreamPort, #{}).
25+
26+
connect_pp(StreamPort, PeerProperties) ->
27+
do_connect(StreamPort, PeerProperties).
28+
29+
do_connect(StreamPort, PeerProperties) ->
2430
{ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
2531

2632
C0 = rabbit_stream_core:init(0),
27-
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
33+
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties,
34+
PeerProperties}}),
2835
ok = gen_tcp:send(Sock, PeerPropertiesFrame),
2936
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0),
3037

@@ -78,8 +85,12 @@ delete_publisher(Sock, C0, PublisherId) ->
7885
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
7986
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, #{}).
8087

88+
8189
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props) ->
82-
Cmd = {subscribe, SubscriptionId, Stream, _OffsetSpec = first,
90+
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props, first).
91+
92+
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props, OffsetSpec) ->
93+
Cmd = {subscribe, SubscriptionId, Stream, OffsetSpec,
8394
InitialCredit, Props},
8495
SubscribeFrame = rabbit_stream_core:frame({request, 1, Cmd}),
8596
ok = gen_tcp:send(Sock, SubscribeFrame),

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 69 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -819,89 +819,86 @@ store_offset_requires_read_access(Config) ->
819819

820820
offset_lag_calculation(Config) ->
821821
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
822-
T = gen_tcp,
823-
Port = get_port(T, Config),
824-
Opts = get_opts(T),
825-
{ok, S} = T:connect("localhost", Port, Opts),
826-
C = rabbit_stream_core:init(0),
822+
Port = get_port(gen_tcp, Config),
827823
ConnectionName = FunctionName,
828-
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
829-
test_authenticate(T, S, C),
824+
{ok, S, C0} = stream_test_utils:connect_pp(Port,
825+
#{<<"connection_name">> => ConnectionName}),
830826

831-
Stream = FunctionName,
832-
test_create_stream(T, S, Stream, C),
827+
St = FunctionName,
828+
{ok, C1} = stream_test_utils:create_stream(S, C0, St),
833829

834830
SubId = 1,
835831
TheFuture = os:system_time(millisecond) + 60 * 60 * 1_000,
836-
lists:foreach(fun(OffsetSpec) ->
837-
test_subscribe(T, S, SubId, Stream,
838-
OffsetSpec, 10, #{},
839-
?RESPONSE_CODE_OK, C),
840-
ConsumerInfo = consumer_offset_info(Config, ConnectionName),
841-
?assertEqual({0, 0}, ConsumerInfo),
842-
test_unsubscribe(T, S, SubId, C)
843-
end, [first, last, next, 0, 1_000, {timestamp, TheFuture}]),
844-
845-
846-
PublisherId = 1,
847-
test_declare_publisher(T, S, PublisherId, Stream, C),
832+
C2 = lists:foldl(
833+
fun(OffsetSpec, C00) ->
834+
{ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId,
835+
10, #{}, OffsetSpec),
836+
ConsumerInfo = consumer_offset_info(Config, ConnectionName),
837+
?assertEqual({0, 0}, ConsumerInfo),
838+
{ok, C02} = stream_test_utils:unsubscribe(S, C01, SubId),
839+
C02
840+
end, C1, [first, last, next, 0, 1_000, {timestamp, TheFuture}]),
841+
842+
PubId = 1,
843+
{ok, C3} = stream_test_utils:declare_publisher(S, C2, St, PubId),
848844
MessageCount = 10,
849845
Body = <<"hello">>,
850-
lists:foreach(fun(_) ->
851-
test_publish_confirm(T, S, PublisherId, Body, C)
852-
end, lists:seq(1, MessageCount - 1)),
846+
{ok, C4} = stream_test_utils:publish(S, C3, PubId, 1,
847+
lists:duplicate(MessageCount - 1, Body)),
853848
%% to make sure to have 2 chunks
854849
timer:sleep(200),
855-
test_publish_confirm(T, S, PublisherId, Body, C),
856-
test_delete_publisher(T, S, PublisherId, C),
850+
{ok, C5} = stream_test_utils:publish(S, C4, PubId, 1, [Body]),
851+
{ok, C6} = stream_test_utils:delete_publisher(S, C5, PubId),
857852

858853
NextOffset = MessageCount,
859-
lists:foreach(fun({OffsetSpec, ReceiveDeliver, CheckFun}) ->
860-
test_subscribe(T, S, SubId, Stream,
861-
OffsetSpec, 1, #{},
862-
?RESPONSE_CODE_OK, C),
863-
case ReceiveDeliver of
864-
true ->
865-
{{deliver, SubId, _}, _} = receive_commands(T, S, C);
866-
_ ->
867-
ok
868-
end,
869-
{Offset, Lag} = consumer_offset_info(Config, ConnectionName),
870-
CheckFun(Offset, Lag),
871-
test_unsubscribe(T, S, SubId, C)
872-
end, [{first, true,
873-
fun(Offset, Lag) ->
874-
?assert(Offset >= 0, "first, at least one chunk consumed"),
875-
?assert(Lag > 0, "first, not all messages consumed")
876-
end},
877-
{last, true,
878-
fun(Offset, _Lag) ->
879-
?assert(Offset > 0, "offset expected for last")
880-
end},
881-
{next, false,
882-
fun(Offset, Lag) ->
883-
?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"),
884-
?assert(Lag =:= 0, "next, offset lag should be 0")
885-
end},
886-
{0, true,
887-
fun(Offset, Lag) ->
888-
?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"),
889-
?assert(Lag > 0, "offset spec = 0, not all messages consumed")
890-
end},
891-
{1_000, false,
892-
fun(Offset, Lag) ->
893-
?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"),
894-
?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0")
895-
end},
896-
{{timestamp, TheFuture}, false,
897-
fun(Offset, Lag) ->
898-
?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"),
899-
?assert(Lag =:= 0, "offset spec in future , offset lag should be 0")
900-
end}]),
901-
902-
test_delete_stream(T, S, Stream, C, false),
903-
test_close(T, S, C),
904-
854+
C7 = lists:foldl(
855+
fun({OffsetSpec, ReceiveDeliver, CheckFun}, C00) ->
856+
{ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId,
857+
1, #{}, OffsetSpec),
858+
859+
C03 = case ReceiveDeliver of
860+
true ->
861+
{{deliver, SubId, _}, C02} = receive_commands(S, C01),
862+
C02;
863+
_ ->
864+
C01
865+
end,
866+
{Offset, Lag} = consumer_offset_info(Config, ConnectionName),
867+
CheckFun(Offset, Lag),
868+
{ok, C04} = stream_test_utils:unsubscribe(S, C03, SubId),
869+
C04
870+
end, C6, [{first, true,
871+
fun(Offset, Lag) ->
872+
?assert(Offset >= 0, "first, at least one chunk consumed"),
873+
?assert(Lag > 0, "first, not all messages consumed")
874+
end},
875+
{last, true,
876+
fun(Offset, _Lag) ->
877+
?assert(Offset > 0, "offset expected for last")
878+
end},
879+
{next, false,
880+
fun(Offset, Lag) ->
881+
?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"),
882+
?assert(Lag =:= 0, "next, offset lag should be 0")
883+
end},
884+
{0, true,
885+
fun(Offset, Lag) ->
886+
?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"),
887+
?assert(Lag > 0, "offset spec = 0, not all messages consumed")
888+
end},
889+
{1_000, false,
890+
fun(Offset, Lag) ->
891+
?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"),
892+
?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0")
893+
end},
894+
{{timestamp, TheFuture}, false,
895+
fun(Offset, Lag) ->
896+
?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"),
897+
?assert(Lag =:= 0, "offset spec in future , offset lag should be 0")
898+
end}]),
899+
900+
{ok, C8} = stream_test_utils:delete_stream(S, C7, St),
901+
{ok, _} = stream_test_utils:close(S, C8),
905902
ok.
906903

907904
authentication_error_should_close_with_delay(Config) ->

0 commit comments

Comments
 (0)