Skip to content

Commit f5f08af

Browse files
MikhailKalashnikovzmstone
authored andcommitted
Use erlang:throw instead of erlang:error in do_commit_offsets_ (kafka4beam#151)
* Use erlang:throw instead of erlang:error in brod_group_coordinator:do_commit_offsets_ if all error codes are the same * do_commit_offsets refactoring
1 parent d7638a4 commit f5f08af

File tree

1 file changed

+26
-13
lines changed

1 file changed

+26
-13
lines changed

src/brod_group_coordinator.erl

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -682,22 +682,35 @@ do_commit_offsets_(#state{ groupId = GroupId
682682
},
683683
Rsp = send_sync(SockPid, Req),
684684
#kpro_OffsetCommitResponse{oCRspTopic_L = Topics} = Rsp,
685-
lists:foreach(
686-
fun(#kpro_OCRspTopic{topicName = Topic, oCRspPartition_L = Partitions}) ->
687-
lists:foreach(
688-
fun(#kpro_OCRspPartition{partition = Partition, errorCode = EC}) ->
689-
kpro_ErrorCode:is_error(EC) andalso
690-
begin
691-
log(State, error,
692-
"failed to commit offset for topic=~s, partition=~p\n"
693-
"~p:~s", [Topic, Partition, EC, kpro_ErrorCode:desc(EC)]),
694-
erlang:error(EC)
695-
end
696-
end, Partitions)
697-
end, Topics),
685+
ok = assert_commit_response(Topics),
698686
NewState = State#state{acked_offsets = []},
699687
{ok, NewState}.
700688

689+
%% @private Check commit response. If no error returns ok,
690+
%% if all error codes are the same, raise throw, otherwise error.
691+
-spec assert_commit_response([kpro_OffsetCommitResponse()]) -> ok.
692+
assert_commit_response(Topics) ->
693+
ErrorSet = collect_commit_response_error_codes(Topics),
694+
case gb_sets:to_list(ErrorSet) of
695+
[] -> ok;
696+
[EC] -> ?ESCALATE_EC(EC);
697+
_ -> erlang:error({commit_offset_failed, Topics})
698+
end.
699+
700+
-spec collect_commit_response_error_codes([kpro_OffsetCommitResponse()]) ->
701+
gb_sets:set().
702+
collect_commit_response_error_codes(Topics) ->
703+
lists:foldl(
704+
fun(#kpro_OCRspTopic{oCRspPartition_L = Partitions}, Acc1) ->
705+
lists:foldl(
706+
fun(#kpro_OCRspPartition{errorCode = EC}, Acc2) ->
707+
case kpro_ErrorCode:is_error(EC) of
708+
true -> gb_sets:add_element(EC, Acc2);
709+
false -> Acc2
710+
end
711+
end, Acc1, Partitions)
712+
end, gb_sets:new(), Topics).
713+
701714
-spec assign_partitions(#state{}) -> [kpro_GroupAssignment()].
702715
assign_partitions(State) when ?IS_LEADER(State) ->
703716
#state{ client = Client

0 commit comments

Comments
 (0)