Skip to content

Commit b01f8ae

Browse files
authored
Add test to verify that ConsumeMetrics and ConsumeTraces clear response members (#1975)
Summary: Add test to verify that ConsumeMetrics and ConsumeTraces clear response members This adds test coverage for the bug fix in #1910. This is a follow up to the conversation [here](#1910 (comment)) Relevant Issues: N/A Type of change: /kind bug Test Plan: Verified that unit test fails if #1910 is reverted ``` $ git show HEAD commit 4ab4a9c (HEAD -> ddelnano/add-tests-for-otel-sink-bug, ddelnano/ddelnano/add-tests-for-otel-sink-bug) Author: Dom Del Nano <ddelnano@gmail.com> Date: Fri Jul 26 12:17:00 2024 +0000 Revert "Clear trace response instead of metric response in `OTelExportSinkNode::ConsumeSpans` (#1910)" This reverts commit 970a54a. $ bazel test src/carnot/exec:otel_export_sink_node_test --test_output=all [ ... ] [ RUN ] OTelExportSinkNodeTest.consume_spans_clears_span_responses src/carnot/exec/otel_export_sink_node_test.cc:1748: Failure Value of: response->partial_success().rejected_spans() == 0 Actual: false Expected: true ``` --------- Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
1 parent 962e48d commit b01f8ae

File tree

1 file changed

+105
-0
lines changed

1 file changed

+105
-0
lines changed

src/carnot/exec/otel_export_sink_node_test.cc

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1732,6 +1732,111 @@ eos: true)pb";
17321732
EXPECT_THAT(retval.ToString(), ::testing::MatchesRegex(".*INTERNAL.*"));
17331733
}
17341734

1735+
TEST_F(OTelExportSinkNodeTest, consume_spans_clears_span_responses) {
1736+
oteltracecollector::ExportTraceServiceResponse error_response;
1737+
error_response.mutable_partial_success()->set_rejected_spans(1);
1738+
EXPECT_CALL(*trace_mock_, Export(_, _, _))
1739+
.Times(::testing::AtLeast(2))
1740+
.WillOnce(DoAll(SetArgPointee<2>(error_response), Return(grpc::Status::OK)))
1741+
.WillRepeatedly(Invoke([&](const auto&, const auto&, auto* response) {
1742+
// It's expected that the response argument provided to Export
1743+
// has .Clear() called on it. This CALL assertion verifies that the
1744+
// response object no longer has rejected data points since it should
1745+
// have been .Clear()'ed at the beginning of the second ConsumeTraces invocation
1746+
EXPECT_EQ(response->partial_success().rejected_spans(), 0);
1747+
return grpc::Status::OK;
1748+
}));
1749+
1750+
planpb::OTelExportSinkOperator otel_sink_op;
1751+
1752+
std::string operator_proto = R"pb(
1753+
spans {
1754+
name_string: "span"
1755+
start_time_column_index: 0
1756+
end_time_column_index: 1
1757+
trace_id_column_index: -1
1758+
span_id_column_index: -1
1759+
parent_span_id_column_index: -1
1760+
})pb";
1761+
EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(operator_proto, &otel_sink_op));
1762+
auto plan_node = std::make_unique<plan::OTelExportSinkOperator>(1);
1763+
auto s = plan_node->Init(otel_sink_op);
1764+
std::string row_batch = R"pb(
1765+
cols { time64ns_data { data: 10 data: 20 } }
1766+
cols { time64ns_data { data: 12 data: 22 } }
1767+
num_rows: 2
1768+
eow: true
1769+
eos: true)pb";
1770+
1771+
// Load a RowBatch to get the Input RowDescriptor.
1772+
table_store::schemapb::RowBatchData row_batch_proto;
1773+
EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(row_batch, &row_batch_proto));
1774+
RowDescriptor input_rd = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie()->desc();
1775+
RowDescriptor output_rd({});
1776+
1777+
auto tester = exec::ExecNodeTester<OTelExportSinkNode, plan::OTelExportSinkOperator>(
1778+
*plan_node, output_rd, {input_rd}, exec_state_.get());
1779+
auto rb = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie();
1780+
1781+
// Call ConsumeSpans twice in order to verify that the second
1782+
// invocation calls clear on the response object.
1783+
auto retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1);
1784+
EXPECT_OK(retval);
1785+
retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1);
1786+
EXPECT_OK(retval);
1787+
}
1788+
1789+
TEST_F(OTelExportSinkNodeTest, metrics_response_is_cleared) {
1790+
otelmetricscollector::ExportMetricsServiceResponse error_response;
1791+
error_response.mutable_partial_success()->set_rejected_data_points(1);
1792+
EXPECT_CALL(*metrics_mock_, Export(_, _, _))
1793+
.Times(::testing::AtLeast(2))
1794+
.WillOnce(DoAll(SetArgPointee<2>(error_response), Return(grpc::Status::OK)))
1795+
.WillRepeatedly(Invoke([&](const auto&, const auto&, auto* response) {
1796+
// It's expected that the response argument provided to Export
1797+
// has .Clear() called on it. This CALL assertion verifies that the
1798+
// response object no longer has rejected data points since it should
1799+
// have been .Clear()'ed at the beginning of the second ConsumeMetrics invocation
1800+
EXPECT_EQ(response->partial_success().rejected_data_points(), 0);
1801+
return grpc::Status::OK;
1802+
}));
1803+
1804+
planpb::OTelExportSinkOperator otel_sink_op;
1805+
1806+
std::string operator_proto = R"pb(
1807+
metrics {
1808+
name: "http.resp.latency"
1809+
time_column_index: 0
1810+
gauge { int_column_index: 1 }
1811+
})pb";
1812+
EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(operator_proto, &otel_sink_op));
1813+
auto plan_node = std::make_unique<plan::OTelExportSinkOperator>(1);
1814+
auto s = plan_node->Init(otel_sink_op);
1815+
std::string row_batch = R"pb(
1816+
cols { time64ns_data { data: 10 data: 11 } }
1817+
cols { int64_data { data: 15 data: 150 } }
1818+
num_rows: 2
1819+
eow: true
1820+
eos: true)pb";
1821+
1822+
// Load a RowBatch to get the Input RowDescriptor.
1823+
table_store::schemapb::RowBatchData row_batch_proto;
1824+
EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(row_batch, &row_batch_proto));
1825+
RowDescriptor input_rd = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie()->desc();
1826+
RowDescriptor output_rd({});
1827+
1828+
auto tester = exec::ExecNodeTester<OTelExportSinkNode, plan::OTelExportSinkOperator>(
1829+
*plan_node, output_rd, {input_rd}, exec_state_.get());
1830+
auto rb = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie();
1831+
1832+
// Call ConsumeMetrics twice in order to verify that the second
1833+
// invocation calls clear on the response object.
1834+
auto retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1);
1835+
EXPECT_OK(retval);
1836+
retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1);
1837+
EXPECT_OK(retval);
1838+
}
1839+
17351840
} // namespace exec
17361841
} // namespace carnot
17371842
} // namespace px

0 commit comments

Comments
 (0)