Skip to content

Commit fd23319

Browse files
authored
Encapsulate metadata aggregates (grpc#27262)
* Encapsulate metadata aggregates * Automated change: Fix sanity tests * add const * Automated change: Fix sanity tests * add a comment about an awful api Co-authored-by: ctiller <ctiller@users.noreply.github.com>
1 parent 8d5b93e commit fd23319

36 files changed

+716
-586
lines changed

src/core/ext/filters/client_channel/client_channel.cc

+12-10
Original file line numberDiff line numberDiff line change
@@ -2502,14 +2502,14 @@ class ClientChannel::LoadBalancedCall::Metadata
25022502
std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
25032503
override {
25042504
std::vector<std::pair<std::string, std::string>> result;
2505-
for (grpc_linked_mdelem* entry = batch_->list.head; entry != nullptr;
2506-
entry = entry->next) {
2507-
if (batch_->idx.named.path != entry) {
2508-
result.push_back(std::make_pair(
2509-
std::string(StringViewFromSlice(GRPC_MDKEY(entry->md))),
2510-
std::string(StringViewFromSlice(GRPC_MDVALUE(entry->md)))));
2505+
(*batch_)->ForEach([&](grpc_mdelem md) {
2506+
auto key = std::string(StringViewFromSlice(GRPC_MDKEY(md)));
2507+
if (key != ":path") {
2508+
result.push_back(
2509+
std::make_pair(std::move(key),
2510+
std::string(StringViewFromSlice(GRPC_MDVALUE(md)))));
25112511
}
2512-
}
2512+
});
25132513
return result;
25142514
}
25152515

@@ -2537,8 +2537,9 @@ class ClientChannel::LoadBalancedCall::LbCallState
25372537
const LoadBalancingPolicy::BackendMetricData* GetBackendMetricData()
25382538
override {
25392539
if (lb_call_->backend_metric_data_ == nullptr) {
2540-
grpc_linked_mdelem* md = lb_call_->recv_trailing_metadata_->idx.named
2541-
.x_endpoint_load_metrics_bin;
2540+
grpc_linked_mdelem* md = (*lb_call_->recv_trailing_metadata_)
2541+
->legacy_index()
2542+
->named.x_endpoint_load_metrics_bin;
25422543
if (md != nullptr) {
25432544
lb_call_->backend_metric_data_ =
25442545
ParseBackendMetricData(GRPC_MDVALUE(md->md), lb_call_->arena_);
@@ -2918,7 +2919,8 @@ void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
29182919
StringViewFromSlice(message));
29192920
} else {
29202921
// Get status from headers.
2921-
const auto& fields = self->recv_trailing_metadata_->idx.named;
2922+
const auto& fields =
2923+
(*self->recv_trailing_metadata_)->legacy_index()->named;
29222924
GPR_ASSERT(fields.grpc_status != nullptr);
29232925
grpc_status_code code =
29242926
grpc_get_status_code_from_metadata(fields.grpc_status->md);

src/core/ext/filters/client_channel/health/health_check_client.cc

+4-2
Original file line numberDiff line numberDiff line change
@@ -558,9 +558,11 @@ void HealthCheckClient::CallState::RecvTrailingMetadataReady(
558558
grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, &status,
559559
nullptr /* slice */, nullptr /* http_error */,
560560
nullptr /* error_string */);
561-
} else if (self->recv_trailing_metadata_.idx.named.grpc_status != nullptr) {
561+
} else if ((*self->recv_trailing_metadata_)
562+
.legacy_index()
563+
->named.grpc_status != nullptr) {
562564
status = grpc_get_status_code_from_metadata(
563-
self->recv_trailing_metadata_.idx.named.grpc_status->md);
565+
(*self->recv_trailing_metadata_).legacy_index()->named.grpc_status->md);
564566
}
565567
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
566568
gpr_log(GPR_INFO,

src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc

+6-15
Original file line numberDiff line numberDiff line change
@@ -101,20 +101,15 @@ static void clr_start_transport_stream_op_batch(
101101
// Handle send_initial_metadata.
102102
if (batch->send_initial_metadata) {
103103
// Grab client stats object from metadata.
104-
grpc_linked_mdelem* client_stats_md =
105-
batch->payload->send_initial_metadata.send_initial_metadata->list.head;
106-
for (; client_stats_md != nullptr;
107-
client_stats_md = client_stats_md->next) {
108-
if (GRPC_SLICE_START_PTR(GRPC_MDKEY(client_stats_md->md)) ==
109-
static_cast<const void*>(grpc_core::kGrpcLbClientStatsMetadataKey)) {
110-
break;
111-
}
112-
}
113-
if (client_stats_md != nullptr) {
104+
auto client_stats_md =
105+
(*batch->payload->send_initial_metadata.send_initial_metadata)
106+
->Remove(grpc_slice_from_static_string(
107+
grpc_core::kGrpcLbClientStatsMetadataKey));
108+
if (client_stats_md.has_value()) {
114109
grpc_core::GrpcLbClientStats* client_stats =
115110
const_cast<grpc_core::GrpcLbClientStats*>(
116111
reinterpret_cast<const grpc_core::GrpcLbClientStats*>(
117-
GRPC_SLICE_START_PTR(GRPC_MDVALUE(client_stats_md->md))));
112+
GRPC_SLICE_START_PTR(*client_stats_md)));
118113
if (client_stats != nullptr) {
119114
calld->client_stats.reset(client_stats);
120115
// Intercept completion.
@@ -123,10 +118,6 @@ static void clr_start_transport_stream_op_batch(
123118
calld, grpc_schedule_on_exec_ctx);
124119
batch->on_complete = &calld->on_complete_for_send;
125120
}
126-
// Remove metadata so it doesn't go out on the wire.
127-
grpc_metadata_batch_remove(
128-
batch->payload->send_initial_metadata.send_initial_metadata,
129-
client_stats_md);
130121
}
131122
}
132123
// Intercept completion of recv_initial_metadata.

src/core/ext/filters/client_channel/retry_filter.cc

+23-21
Original file line numberDiff line numberDiff line change
@@ -1565,11 +1565,12 @@ void GetCallStatus(grpc_millis deadline, grpc_metadata_batch* md_batch,
15651565
*is_lb_drop = true;
15661566
}
15671567
} else {
1568-
GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
1569-
*status =
1570-
grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
1571-
if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
1572-
*server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
1568+
GPR_ASSERT((*md_batch)->legacy_index()->named.grpc_status != nullptr);
1569+
*status = grpc_get_status_code_from_metadata(
1570+
(*md_batch)->legacy_index()->named.grpc_status->md);
1571+
if ((*md_batch)->legacy_index()->named.grpc_retry_pushback_ms != nullptr) {
1572+
*server_pushback_md =
1573+
&(*md_batch)->legacy_index()->named.grpc_retry_pushback_ms->md;
15731574
}
15741575
}
15751576
GRPC_ERROR_UNREF(error);
@@ -1921,15 +1922,16 @@ void RetryFilter::CallData::CallAttempt::BatchData::
19211922
// If we've already completed one or more attempts, add the
19221923
// grpc-retry-attempts header.
19231924
call_attempt_->send_initial_metadata_storage_ =
1924-
static_cast<grpc_linked_mdelem*>(
1925-
calld->arena_->Alloc(sizeof(grpc_linked_mdelem) *
1926-
(calld->send_initial_metadata_.list.count +
1927-
(calld->num_attempts_completed_ > 0))));
1925+
static_cast<grpc_linked_mdelem*>(calld->arena_->Alloc(
1926+
sizeof(grpc_linked_mdelem) *
1927+
(calld->send_initial_metadata_->non_deadline_count() +
1928+
(calld->num_attempts_completed_ > 0))));
19281929
grpc_metadata_batch_copy(&calld->send_initial_metadata_,
19291930
&call_attempt_->send_initial_metadata_,
19301931
call_attempt_->send_initial_metadata_storage_);
1931-
if (GPR_UNLIKELY(call_attempt_->send_initial_metadata_.idx.named
1932-
.grpc_previous_rpc_attempts != nullptr)) {
1932+
if (GPR_UNLIKELY((*call_attempt_->send_initial_metadata_)
1933+
.legacy_index()
1934+
->named.grpc_previous_rpc_attempts != nullptr)) {
19331935
grpc_metadata_batch_remove(&call_attempt_->send_initial_metadata_,
19341936
GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
19351937
}
@@ -1940,7 +1942,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::
19401942
grpc_error_handle error = grpc_metadata_batch_add_tail(
19411943
&call_attempt_->send_initial_metadata_,
19421944
&call_attempt_->send_initial_metadata_storage_
1943-
[calld->send_initial_metadata_.list.count],
1945+
[calld->send_initial_metadata_->non_deadline_count()],
19441946
retry_md, GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
19451947
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
19461948
gpr_log(GPR_ERROR, "error adding retry metadata: %s",
@@ -1984,9 +1986,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::
19841986
// the filters in the subchannel stack may modify this batch, and we don't
19851987
// want those modifications to be passed forward to subsequent attempts.
19861988
call_attempt_->send_trailing_metadata_storage_ =
1987-
static_cast<grpc_linked_mdelem*>(
1988-
calld->arena_->Alloc(sizeof(grpc_linked_mdelem) *
1989-
calld->send_trailing_metadata_.list.count));
1989+
static_cast<grpc_linked_mdelem*>(calld->arena_->Alloc(
1990+
sizeof(grpc_linked_mdelem) *
1991+
calld->send_trailing_metadata_->non_deadline_count()));
19901992
grpc_metadata_batch_copy(&calld->send_trailing_metadata_,
19911993
&call_attempt_->send_trailing_metadata_,
19921994
call_attempt_->send_trailing_metadata_storage_);
@@ -2304,9 +2306,9 @@ void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
23042306
GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
23052307
grpc_metadata_batch* send_initial_metadata =
23062308
batch->payload->send_initial_metadata.send_initial_metadata;
2307-
send_initial_metadata_storage_ =
2308-
static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2309-
sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count));
2309+
send_initial_metadata_storage_ = static_cast<grpc_linked_mdelem*>(
2310+
arena_->Alloc(sizeof(grpc_linked_mdelem) *
2311+
(*send_initial_metadata)->non_deadline_count()));
23102312
grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
23112313
send_initial_metadata_storage_);
23122314
send_initial_metadata_flags_ =
@@ -2325,9 +2327,9 @@ void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
23252327
GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
23262328
grpc_metadata_batch* send_trailing_metadata =
23272329
batch->payload->send_trailing_metadata.send_trailing_metadata;
2328-
send_trailing_metadata_storage_ =
2329-
static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2330-
sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count));
2330+
send_trailing_metadata_storage_ = static_cast<grpc_linked_mdelem*>(
2331+
arena_->Alloc(sizeof(grpc_linked_mdelem) *
2332+
(*send_trailing_metadata)->non_deadline_count()));
23312333
grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
23322334
send_trailing_metadata_storage_);
23332335
}

src/core/ext/filters/client_channel/subchannel.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,9 @@ void GetCallStatus(grpc_status_code* status, grpc_millis deadline,
255255
if (error != GRPC_ERROR_NONE) {
256256
grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
257257
} else {
258-
if (md_batch->idx.named.grpc_status != nullptr) {
258+
if ((*md_batch)->legacy_index()->named.grpc_status != nullptr) {
259259
*status = grpc_get_status_code_from_metadata(
260-
md_batch->idx.named.grpc_status->md);
260+
(*md_batch)->legacy_index()->named.grpc_status->md);
261261
} else {
262262
*status = GRPC_STATUS_UNKNOWN;
263263
}

src/core/ext/filters/deadline/deadline_filter.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ static void deadline_client_start_transport_stream_op_batch(
295295
static void recv_initial_metadata_ready(void* arg, grpc_error_handle error) {
296296
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
297297
server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
298-
start_timer_if_needed(elem, calld->recv_initial_metadata->deadline);
298+
start_timer_if_needed(elem, (*calld->recv_initial_metadata)->deadline());
299299
// Invoke the next callback.
300300
grpc_core::Closure::Run(DEBUG_LOCATION,
301301
calld->next_recv_initial_metadata_ready,

src/core/ext/filters/fault_injection/fault_injection_filter.cc

+14-15
Original file line numberDiff line numberDiff line change
@@ -46,27 +46,27 @@ static_assert(
4646
std::is_trivially_destructible<std::atomic<uint32_t>>::value,
4747
"the active fault counter needs to have a trivially destructible type");
4848

49-
inline int GetLinkedMetadatumValueInt(grpc_linked_mdelem* md) {
49+
inline int GetMetadatumValueInt(grpc_mdelem md) {
5050
int res;
51-
if (absl::SimpleAtoi(StringViewFromSlice(GRPC_MDVALUE(md->md)), &res)) {
51+
if (absl::SimpleAtoi(StringViewFromSlice(GRPC_MDVALUE(md)), &res)) {
5252
return res;
5353
} else {
5454
return -1;
5555
}
5656
}
5757

58-
inline uint32_t GetLinkedMetadatumValueUnsignedInt(grpc_linked_mdelem* md) {
58+
inline uint32_t GetMetadatumValueUnsignedInt(grpc_mdelem md) {
5959
uint32_t res;
60-
if (absl::SimpleAtoi(StringViewFromSlice(GRPC_MDVALUE(md->md)), &res)) {
60+
if (absl::SimpleAtoi(StringViewFromSlice(GRPC_MDVALUE(md)), &res)) {
6161
return res;
6262
} else {
6363
return -1;
6464
}
6565
}
6666

67-
inline int64_t GetLinkedMetadatumValueInt64(grpc_linked_mdelem* md) {
67+
inline int64_t GetMetadatumValueInt64(grpc_mdelem md) {
6868
int64_t res;
69-
if (absl::SimpleAtoi(StringViewFromSlice(GRPC_MDVALUE(md->md)), &res)) {
69+
if (absl::SimpleAtoi(StringViewFromSlice(GRPC_MDVALUE(md)), &res)) {
7070
return res;
7171
} else {
7272
return -1;
@@ -347,9 +347,8 @@ void CallData::DecideWhetherToInjectFaults(
347347
*fi_policy_);
348348
}
349349
};
350-
for (grpc_linked_mdelem* md = initial_metadata->list.head; md != nullptr;
351-
md = md->next) {
352-
absl::string_view key = StringViewFromSlice(GRPC_MDKEY(md->md));
350+
(*initial_metadata)->ForEach([&](grpc_mdelem md) {
351+
absl::string_view key = StringViewFromSlice(GRPC_MDKEY(md));
353352
// Only perform string comparison if:
354353
// 1. Needs to check this header;
355354
// 2. The value is not been filled before.
@@ -358,31 +357,31 @@ void CallData::DecideWhetherToInjectFaults(
358357
copied_policy->abort_code == GRPC_STATUS_OK) &&
359358
key == fi_policy_->abort_code_header) {
360359
maybe_copy_policy_func();
361-
grpc_status_code_from_int(GetLinkedMetadatumValueInt(md),
360+
grpc_status_code_from_int(GetMetadatumValueInt(md),
362361
&copied_policy->abort_code);
363362
}
364363
if (!fi_policy_->abort_percentage_header.empty() &&
365364
key == fi_policy_->abort_percentage_header) {
366365
maybe_copy_policy_func();
367366
copied_policy->abort_percentage_numerator =
368-
GPR_MIN(GetLinkedMetadatumValueUnsignedInt(md),
367+
GPR_MIN(GetMetadatumValueUnsignedInt(md),
369368
fi_policy_->abort_percentage_numerator);
370369
}
371370
if (!fi_policy_->delay_header.empty() &&
372371
(copied_policy == nullptr || copied_policy->delay == 0) &&
373372
key == fi_policy_->delay_header) {
374373
maybe_copy_policy_func();
375-
copied_policy->delay = static_cast<grpc_millis>(
376-
GPR_MAX(GetLinkedMetadatumValueInt64(md), 0));
374+
copied_policy->delay =
375+
static_cast<grpc_millis>(GPR_MAX(GetMetadatumValueInt64(md), 0));
377376
}
378377
if (!fi_policy_->delay_percentage_header.empty() &&
379378
key == fi_policy_->delay_percentage_header) {
380379
maybe_copy_policy_func();
381380
copied_policy->delay_percentage_numerator =
382-
GPR_MIN(GetLinkedMetadatumValueUnsignedInt(md),
381+
GPR_MIN(GetMetadatumValueUnsignedInt(md),
383382
fi_policy_->delay_percentage_numerator);
384383
}
385-
}
384+
});
386385
if (copied_policy != nullptr) fi_policy_ = copied_policy;
387386
}
388387
// Roll the dice

0 commit comments

Comments
 (0)