@@ -1732,6 +1732,115 @@ eos: true)pb";
17321732 EXPECT_THAT (retval.ToString (), ::testing::MatchesRegex (" .*INTERNAL.*" ));
17331733}
17341734
1735+ TEST_F (OTelExportSinkNodeTest, consume_spans_clears_span_responses) {
1736+ oteltracecollector::ExportTraceServiceResponse error_response;
1737+ error_response.mutable_partial_success ()->set_rejected_spans (1 );
1738+ EXPECT_CALL (*trace_mock_, Export (_, _, _))
1739+ .Times (::testing::AtLeast (2 ))
1740+ .WillOnce (DoAll (SetArgPointee<2 >(error_response), Return (grpc::Status::OK)))
1741+ .WillRepeatedly (Invoke (
1742+ [&](const auto &, const auto &, auto * response) {
1743+
1744+ // It's expected that the response argument provided to Export
1745+ // has .Clear() called on it. This CALL assertion verifies that the
1746+ // response object no longer has rejected data points since it should
1747+ // have been .Clear()'ed at the beginning of the second ConsumeTraces invocation
1748+ EXPECT_TRUE (response->partial_success ().rejected_spans () == 0 );
1749+ return grpc::Status::OK;
1750+ }));
1751+
1752+ planpb::OTelExportSinkOperator otel_sink_op;
1753+
1754+ std::string operator_proto = R"pb(
1755+ spans {
1756+ name_string: "span"
1757+ start_time_column_index: 0
1758+ end_time_column_index: 1
1759+ trace_id_column_index: -1
1760+ span_id_column_index: -1
1761+ parent_span_id_column_index: -1
1762+ })pb" ;
1763+ EXPECT_TRUE (google::protobuf::TextFormat::ParseFromString (operator_proto, &otel_sink_op));
1764+ auto plan_node = std::make_unique<plan::OTelExportSinkOperator>(1 );
1765+ auto s = plan_node->Init (otel_sink_op);
1766+ std::string row_batch = R"pb(
1767+ cols { time64ns_data { data: 10 data: 20 } }
1768+ cols { time64ns_data { data: 12 data: 22 } }
1769+ num_rows: 2
1770+ eow: true
1771+ eos: true)pb" ;
1772+
1773+ // Load a RowBatch to get the Input RowDescriptor.
1774+ table_store::schemapb::RowBatchData row_batch_proto;
1775+ EXPECT_TRUE (google::protobuf::TextFormat::ParseFromString (row_batch, &row_batch_proto));
1776+ RowDescriptor input_rd = RowBatch::FromProto (row_batch_proto).ConsumeValueOrDie ()->desc ();
1777+ RowDescriptor output_rd ({});
1778+
1779+ auto tester = exec::ExecNodeTester<OTelExportSinkNode, plan::OTelExportSinkOperator>(
1780+ *plan_node, output_rd, {input_rd}, exec_state_.get ());
1781+ auto rb = RowBatch::FromProto (row_batch_proto).ConsumeValueOrDie ();
1782+
1783+ // Call ConsumeSpans twice in order to verify that the second
1784+ // invocation calls clear on the response object.
1785+ auto retval = tester.node ()->ConsumeNext (exec_state_.get (), *rb.get (), 1 );
1786+ EXPECT_OK (retval);
1787+ retval = tester.node ()->ConsumeNext (exec_state_.get (), *rb.get (), 1 );
1788+ EXPECT_OK (retval);
1789+ }
1790+
1791+ TEST_F (OTelExportSinkNodeTest, metrics_response_is_cleared) {
1792+ otelmetricscollector::ExportMetricsServiceResponse error_response;
1793+ error_response.mutable_partial_success ()->set_rejected_data_points (1 );
1794+ EXPECT_CALL (*metrics_mock_, Export (_, _, _))
1795+ .Times (::testing::AtLeast (2 ))
1796+ .WillOnce (DoAll (SetArgPointee<2 >(error_response), Return (grpc::Status::OK)))
1797+ .WillRepeatedly (Invoke (
1798+ [&](const auto &, const auto &, auto * response) {
1799+
1800+ // It's expected that the response argument provided to Export
1801+ // has .Clear() called on it. This CALL assertion verifies that the
1802+ // response object no longer has rejected data points since it should
1803+ // have been .Clear()'ed at the beginning of the second ConsumeMetrics invocation
1804+ EXPECT_TRUE (response->partial_success ().rejected_data_points () == 0 );
1805+ return grpc::Status::OK;
1806+ }));
1807+
1808+ planpb::OTelExportSinkOperator otel_sink_op;
1809+
1810+ std::string operator_proto = R"pb(
1811+ metrics {
1812+ name: "http.resp.latency"
1813+ time_column_index: 0
1814+ gauge { int_column_index: 1 }
1815+ })pb" ;
1816+ EXPECT_TRUE (google::protobuf::TextFormat::ParseFromString (operator_proto, &otel_sink_op));
1817+ auto plan_node = std::make_unique<plan::OTelExportSinkOperator>(1 );
1818+ auto s = plan_node->Init (otel_sink_op);
1819+ std::string row_batch = R"pb(
1820+ cols { time64ns_data { data: 10 data: 11 } }
1821+ cols { int64_data { data: 15 data: 150 } }
1822+ num_rows: 2
1823+ eow: true
1824+ eos: true)pb" ;
1825+
1826+ // Load a RowBatch to get the Input RowDescriptor.
1827+ table_store::schemapb::RowBatchData row_batch_proto;
1828+ EXPECT_TRUE (google::protobuf::TextFormat::ParseFromString (row_batch, &row_batch_proto));
1829+ RowDescriptor input_rd = RowBatch::FromProto (row_batch_proto).ConsumeValueOrDie ()->desc ();
1830+ RowDescriptor output_rd ({});
1831+
1832+ auto tester = exec::ExecNodeTester<OTelExportSinkNode, plan::OTelExportSinkOperator>(
1833+ *plan_node, output_rd, {input_rd}, exec_state_.get ());
1834+ auto rb = RowBatch::FromProto (row_batch_proto).ConsumeValueOrDie ();
1835+
1836+ // Call ConsumeMetrics twice in order to verify that the second
1837+ // invocation calls clear on the response object.
1838+ auto retval = tester.node ()->ConsumeNext (exec_state_.get (), *rb.get (), 1 );
1839+ EXPECT_OK (retval);
1840+ retval = tester.node ()->ConsumeNext (exec_state_.get (), *rb.get (), 1 );
1841+ EXPECT_OK (retval);
1842+ }
1843+
17351844} // namespace exec
17361845} // namespace carnot
17371846} // namespace px
0 commit comments