diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index b8135f65e4eb..820a86dba448 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -1168,110 +1168,110 @@ class simple_fetch_planner final : public fetch_planner::impl { plan.reserve_from_partition_count(octx.fetch_partition_count()); + const auto client_address = fmt::format( + "{}:{}", + octx.rctx.connection()->client_host(), + octx.rctx.connection()->client_port()); + /** * group fetch requests by shard */ - octx.for_each_fetch_partition([&resp_it, - &octx, - &plan, - &bytes_left_in_plan]( - const fetch_session_partition& fp) { - // if this is not an initial fetch we are allowed to skip - // partions that aleready have an error or we have enough data - if (!octx.initial_fetch) { - bool has_enough_data = !resp_it->empty() - && octx.over_min_bytes(); - - if (resp_it->has_error() || has_enough_data) { - ++resp_it; - return; - } - } - - // We audit successful messages only on the initial fetch - audit_on_success audit{octx.initial_fetch}; + octx.for_each_fetch_partition( + [&resp_it, &octx, &plan, &bytes_left_in_plan, &client_address]( + const fetch_session_partition& fp) { + // if this is not an initial fetch we are allowed to skip + // partions that aleready have an error or we have enough data + if (!octx.initial_fetch) { + bool has_enough_data = !resp_it->empty() + && octx.over_min_bytes(); + + if (resp_it->has_error() || has_enough_data) { + ++resp_it; + return; + } + } - /** - * if not authorized do not include into a plan - */ - if (!octx.rctx.authorized( - security::acl_operation::read, - fp.topic_partition.get_topic(), - audit)) { - resp_it->set(make_partition_response_error( - fp.topic_partition.get_partition(), - error_code::topic_authorization_failed)); - ++resp_it; - return; - } + // We audit successful messages only on the initial fetch + audit_on_success audit{octx.initial_fetch}; + + /** + * if not authorized do not include into a plan + */ + if (!octx.rctx.authorized( + security::acl_operation::read, + fp.topic_partition.get_topic(), + audit)) { + resp_it->set(make_partition_response_error( + fp.topic_partition.get_partition(), + error_code::topic_authorization_failed)); + ++resp_it; + return; + } - auto& tp = fp.topic_partition; + auto& tp = fp.topic_partition; - if (unlikely(octx.rctx.metadata_cache().is_disabled( - tp.as_tn_view(), tp.get_partition()))) { - resp_it->set(make_partition_response_error( - fp.topic_partition.get_partition(), - error_code::replica_not_available)); - ++resp_it; - return; - } + if (unlikely(octx.rctx.metadata_cache().is_disabled( + tp.as_tn_view(), tp.get_partition()))) { + resp_it->set(make_partition_response_error( + fp.topic_partition.get_partition(), + error_code::replica_not_available)); + ++resp_it; + return; + } - auto shard = octx.rctx.shards().shard_for(tp); - if (unlikely(!shard)) { - // there is given partition in topic metadata, return - // unknown_topic_or_partition error - - /** - * no shard is found on current node, but topic exists in - * cluster metadata, this mean that the partition was - * moved but consumer has not updated its metadata yet. we - * return not_leader_for_partition error to force metadata - * update. - */ - auto ec = octx.rctx.metadata_cache().contains(tp.to_ntp()) - ? error_code::not_leader_for_partition - : error_code::unknown_topic_or_partition; - resp_it->set(make_partition_response_error( - fp.topic_partition.get_partition(), ec)); - ++resp_it; - return; - } + auto shard = octx.rctx.shards().shard_for(tp); + if (unlikely(!shard)) { + // there is given partition in topic metadata, return + // unknown_topic_or_partition error + + /** + * no shard is found on current node, but topic exists in + * cluster metadata, this mean that the partition was + * moved but consumer has not updated its metadata yet. we + * return not_leader_for_partition error to force metadata + * update. + */ + auto ec = octx.rctx.metadata_cache().contains(tp.to_ntp()) + ? error_code::not_leader_for_partition + : error_code::unknown_topic_or_partition; + resp_it->set(make_partition_response_error( + fp.topic_partition.get_partition(), ec)); + ++resp_it; + return; + } - auto fetch_md = octx.rctx.get_fetch_metadata_cache().get(tp); - auto max_bytes = std::min(bytes_left_in_plan, size_t(fp.max_bytes)); - /** - * If offset is greater, assume that fetch will read max_bytes - */ - if (fetch_md && fetch_md->high_watermark > fp.fetch_offset) { - bytes_left_in_plan -= max_bytes; - } + auto fetch_md = octx.rctx.get_fetch_metadata_cache().get(tp); + auto max_bytes = std::min( + bytes_left_in_plan, size_t(fp.max_bytes)); + /** + * If offset is greater, assume that fetch will read max_bytes + */ + if (fetch_md && fetch_md->high_watermark > fp.fetch_offset) { + bytes_left_in_plan -= max_bytes; + } - const auto client_address = fmt::format( - "{}:{}", - octx.rctx.connection()->client_host(), - octx.rctx.connection()->client_port()); - - fetch_config config{ - .start_offset = fp.fetch_offset, - .max_offset = model::model_limits::max(), - .max_bytes = max_bytes, - .timeout = octx.deadline.value_or(model::no_timeout), - .current_leader_epoch = fp.current_leader_epoch, - .isolation_level = octx.request.data.isolation_level, - .strict_max_bytes = octx.response_size > 0, - .skip_read = bytes_left_in_plan == 0 && max_bytes == 0, - .read_from_follower = octx.request.has_rack_id(), - .consumer_rack_id = octx.request.has_rack_id() - ? std::make_optional( - octx.request.data.rack_id) - : std::nullopt, - .abort_source = octx.rctx.abort_source(), - .client_address = model::client_address_t{client_address}, - }; - - plan.fetches_per_shard[*shard].push_back({tp, config}, &(*resp_it)); - ++resp_it; - }); + fetch_config config{ + .start_offset = fp.fetch_offset, + .max_offset = model::model_limits::max(), + .max_bytes = max_bytes, + .timeout = octx.deadline.value_or(model::no_timeout), + .current_leader_epoch = fp.current_leader_epoch, + .isolation_level = octx.request.data.isolation_level, + .strict_max_bytes = octx.response_size > 0, + .skip_read = bytes_left_in_plan == 0 && max_bytes == 0, + .read_from_follower = octx.request.has_rack_id(), + .consumer_rack_id = octx.request.has_rack_id() + ? std::make_optional( + octx.request.data.rack_id) + : std::nullopt, + .abort_source = octx.rctx.abort_source(), + .client_address = model::client_address_t{client_address}, + }; + + plan.fetches_per_shard[*shard].push_back( + {tp, std::move(config)}, &(*resp_it)); + ++resp_it; + }); return plan; } };