Skip to content

Commit

Permalink
Merge pull request redpanda-data#17967 from redpanda-data/stephan/fet…
Browse files Browse the repository at this point in the history
…ch-plan-dont-format-for-each-partition

fetch: Avoid formatting client ip and port for each partition
  • Loading branch information
dotnwat authored Apr 20, 2024
2 parents 9d31ac0 + ab894d4 commit aec2b67
Showing 1 changed file with 95 additions and 95 deletions.
190 changes: 95 additions & 95 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1168,110 +1168,110 @@ class simple_fetch_planner final : public fetch_planner::impl {

plan.reserve_from_partition_count(octx.fetch_partition_count());

const auto client_address = fmt::format(
"{}:{}",
octx.rctx.connection()->client_host(),
octx.rctx.connection()->client_port());

/**
* group fetch requests by shard
*/
octx.for_each_fetch_partition([&resp_it,
&octx,
&plan,
&bytes_left_in_plan](
const fetch_session_partition& fp) {
// if this is not an initial fetch we are allowed to skip
// partions that aleready have an error or we have enough data
if (!octx.initial_fetch) {
bool has_enough_data = !resp_it->empty()
&& octx.over_min_bytes();

if (resp_it->has_error() || has_enough_data) {
++resp_it;
return;
}
}

// We audit successful messages only on the initial fetch
audit_on_success audit{octx.initial_fetch};
octx.for_each_fetch_partition(
[&resp_it, &octx, &plan, &bytes_left_in_plan, &client_address](
const fetch_session_partition& fp) {
// if this is not an initial fetch we are allowed to skip
// partions that aleready have an error or we have enough data
if (!octx.initial_fetch) {
bool has_enough_data = !resp_it->empty()
&& octx.over_min_bytes();

if (resp_it->has_error() || has_enough_data) {
++resp_it;
return;
}
}

/**
* if not authorized do not include into a plan
*/
if (!octx.rctx.authorized(
security::acl_operation::read,
fp.topic_partition.get_topic(),
audit)) {
resp_it->set(make_partition_response_error(
fp.topic_partition.get_partition(),
error_code::topic_authorization_failed));
++resp_it;
return;
}
// We audit successful messages only on the initial fetch
audit_on_success audit{octx.initial_fetch};

/**
* if not authorized do not include into a plan
*/
if (!octx.rctx.authorized(
security::acl_operation::read,
fp.topic_partition.get_topic(),
audit)) {
resp_it->set(make_partition_response_error(
fp.topic_partition.get_partition(),
error_code::topic_authorization_failed));
++resp_it;
return;
}

auto& tp = fp.topic_partition;
auto& tp = fp.topic_partition;

if (unlikely(octx.rctx.metadata_cache().is_disabled(
tp.as_tn_view(), tp.get_partition()))) {
resp_it->set(make_partition_response_error(
fp.topic_partition.get_partition(),
error_code::replica_not_available));
++resp_it;
return;
}
if (unlikely(octx.rctx.metadata_cache().is_disabled(
tp.as_tn_view(), tp.get_partition()))) {
resp_it->set(make_partition_response_error(
fp.topic_partition.get_partition(),
error_code::replica_not_available));
++resp_it;
return;
}

auto shard = octx.rctx.shards().shard_for(tp);
if (unlikely(!shard)) {
// there is given partition in topic metadata, return
// unknown_topic_or_partition error

/**
* no shard is found on current node, but topic exists in
* cluster metadata, this mean that the partition was
* moved but consumer has not updated its metadata yet. we
* return not_leader_for_partition error to force metadata
* update.
*/
auto ec = octx.rctx.metadata_cache().contains(tp.to_ntp())
? error_code::not_leader_for_partition
: error_code::unknown_topic_or_partition;
resp_it->set(make_partition_response_error(
fp.topic_partition.get_partition(), ec));
++resp_it;
return;
}
auto shard = octx.rctx.shards().shard_for(tp);
if (unlikely(!shard)) {
// there is given partition in topic metadata, return
// unknown_topic_or_partition error

/**
* no shard is found on current node, but topic exists in
* cluster metadata, this mean that the partition was
* moved but consumer has not updated its metadata yet. we
* return not_leader_for_partition error to force metadata
* update.
*/
auto ec = octx.rctx.metadata_cache().contains(tp.to_ntp())
? error_code::not_leader_for_partition
: error_code::unknown_topic_or_partition;
resp_it->set(make_partition_response_error(
fp.topic_partition.get_partition(), ec));
++resp_it;
return;
}

auto fetch_md = octx.rctx.get_fetch_metadata_cache().get(tp);
auto max_bytes = std::min(bytes_left_in_plan, size_t(fp.max_bytes));
/**
* If offset is greater, assume that fetch will read max_bytes
*/
if (fetch_md && fetch_md->high_watermark > fp.fetch_offset) {
bytes_left_in_plan -= max_bytes;
}
auto fetch_md = octx.rctx.get_fetch_metadata_cache().get(tp);
auto max_bytes = std::min(
bytes_left_in_plan, size_t(fp.max_bytes));
/**
* If offset is greater, assume that fetch will read max_bytes
*/
if (fetch_md && fetch_md->high_watermark > fp.fetch_offset) {
bytes_left_in_plan -= max_bytes;
}

const auto client_address = fmt::format(
"{}:{}",
octx.rctx.connection()->client_host(),
octx.rctx.connection()->client_port());

fetch_config config{
.start_offset = fp.fetch_offset,
.max_offset = model::model_limits<model::offset>::max(),
.max_bytes = max_bytes,
.timeout = octx.deadline.value_or(model::no_timeout),
.current_leader_epoch = fp.current_leader_epoch,
.isolation_level = octx.request.data.isolation_level,
.strict_max_bytes = octx.response_size > 0,
.skip_read = bytes_left_in_plan == 0 && max_bytes == 0,
.read_from_follower = octx.request.has_rack_id(),
.consumer_rack_id = octx.request.has_rack_id()
? std::make_optional(
octx.request.data.rack_id)
: std::nullopt,
.abort_source = octx.rctx.abort_source(),
.client_address = model::client_address_t{client_address},
};

plan.fetches_per_shard[*shard].push_back({tp, config}, &(*resp_it));
++resp_it;
});
fetch_config config{
.start_offset = fp.fetch_offset,
.max_offset = model::model_limits<model::offset>::max(),
.max_bytes = max_bytes,
.timeout = octx.deadline.value_or(model::no_timeout),
.current_leader_epoch = fp.current_leader_epoch,
.isolation_level = octx.request.data.isolation_level,
.strict_max_bytes = octx.response_size > 0,
.skip_read = bytes_left_in_plan == 0 && max_bytes == 0,
.read_from_follower = octx.request.has_rack_id(),
.consumer_rack_id = octx.request.has_rack_id()
? std::make_optional(
octx.request.data.rack_id)
: std::nullopt,
.abort_source = octx.rctx.abort_source(),
.client_address = model::client_address_t{client_address},
};

plan.fetches_per_shard[*shard].push_back(
{tp, std::move(config)}, &(*resp_it));
++resp_it;
});
return plan;
}
};
Expand Down

0 comments on commit aec2b67

Please sign in to comment.