Skip to content

Commit 5d6f791

Browse files
committed
Do not send new fetch request before last response is received
1 parent cf47e7c commit 5d6f791

File tree

3 files changed

+14
-7
lines changed

3 files changed

+14
-7
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
PROJECT = brod
22
PROJECT_DESCRIPTION = Kafka client library in Erlang
3-
PROJECT_VERSION = 2.2.11
3+
PROJECT_VERSION = 2.2.12
44

55
DEPS = supervisor3 kafka_protocol
66

src/brod.app.src

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
%% -*- mode:erlang -*-
22
{application,brod,
33
[{description,"Apache Kafka Erlang client library"},
4-
{vsn,"2.2.11"},
4+
{vsn,"2.2.12"},
55
{registered,[]},
66
{applications,[kernel,stdlib,ssl,kafka_protocol,supervisor3]},
77
{env,[]},

src/brod_consumer.erl

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
, max_bytes_orig :: bytes()
6666
, sleep_timeout :: integer()
6767
, prefetch_count :: integer()
68-
, last_corr_id :: corr_id()
68+
, last_corr_id :: ?undef | corr_id()
6969
, subscriber :: ?undef | pid()
7070
, subscriber_mref :: ?undef | reference()
7171
, pending_acks = [] :: [offset()]
@@ -293,16 +293,18 @@ do_debug(Pid, Debug) ->
293293
ok.
294294

295295
handle_fetch_response(_Response, _CorrId,
296-
#state{subscriber = ?undef} = State) ->
296+
#state{subscriber = ?undef} = State0) ->
297297
%% discard fetch response when there is no (dead?) subscriber
298+
State = State0#state{last_corr_id = ?undef},
298299
{noreply, State};
299300
handle_fetch_response(_Response, CorrId1,
300301
#state{ last_corr_id = CorrId2
301302
} = State) when CorrId1 =/= CorrId2 ->
302303
{noreply, State};
303304
handle_fetch_response(#kpro_FetchResponse{ fetchResponseTopic_L = [TopicData]
304-
}, CorrId, State) ->
305-
CorrId = State#state.last_corr_id, %% assert
305+
}, CorrId, State0) ->
306+
CorrId = State0#state.last_corr_id, %% assert
307+
State = State0#state{last_corr_id = ?undef},
306308
#kpro_FetchResponseTopic{ topicName = Topic
307309
, fetchResponsePartition_L = [PartitionResponse]
308310
} = TopicData,
@@ -478,6 +480,9 @@ maybe_send_fetch_request(#state{socket_pid = ?undef} = State) ->
478480
maybe_send_fetch_request(#state{is_suspended = true} = State) ->
479481
%% waiting for subscriber to re-subscribe
480482
State;
483+
maybe_send_fetch_request(#state{last_corr_id = I} = State) when is_integer(I) ->
484+
%% Waiting for the last request
485+
State;
481486
maybe_send_fetch_request(#state{ pending_acks = PendingAcks
482487
, prefetch_count = PrefetchCount
483488
} = State) ->
@@ -575,13 +580,15 @@ fetch_valid_offset(SocketPid, BeginOffset, Topic, Partition) ->
575580

576581
%% @private Reset fetch buffer, use the last unacked offset as the next begin
577582
%% offset to fetch data from.
583+
%% Discard onwire fetch responses by setting last_corr_id to undefined.
578584
%% @end
579585
-spec reset_buffer(#state{}) -> #state{}.
580586
reset_buffer(#state{pending_acks = []} = State) ->
581-
State;
587+
State#state{last_corr_id = ?undef};
582588
reset_buffer(#state{pending_acks = [Offset | _]} = State) ->
583589
State#state{ begin_offset = Offset
584590
, pending_acks = []
591+
, last_corr_id = ?undef
585592
}.
586593

587594
%% @private Catch noproc exit exception when making gen_server:call.

0 commit comments

Comments
 (0)