Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions src/carnot/exec/otel_export_sink_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,111 @@ eos: true)pb";
EXPECT_THAT(retval.ToString(), ::testing::MatchesRegex(".*INTERNAL.*"));
}

TEST_F(OTelExportSinkNodeTest, consume_spans_clears_span_responses) {
oteltracecollector::ExportTraceServiceResponse error_response;
error_response.mutable_partial_success()->set_rejected_spans(1);
EXPECT_CALL(*trace_mock_, Export(_, _, _))
.Times(::testing::AtLeast(2))
.WillOnce(DoAll(SetArgPointee<2>(error_response), Return(grpc::Status::OK)))
.WillRepeatedly(Invoke([&](const auto&, const auto&, auto* response) {
// It's expected that the response argument provided to Export
// has .Clear() called on it. This CALL assertion verifies that the
// response object no longer has rejected data points since it should
// have been .Clear()'ed at the beginning of the second ConsumeTraces invocation
EXPECT_EQ(response->partial_success().rejected_spans(), 0);
return grpc::Status::OK;
}));

planpb::OTelExportSinkOperator otel_sink_op;

std::string operator_proto = R"pb(
spans {
name_string: "span"
start_time_column_index: 0
end_time_column_index: 1
trace_id_column_index: -1
span_id_column_index: -1
parent_span_id_column_index: -1
})pb";
EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(operator_proto, &otel_sink_op));
auto plan_node = std::make_unique<plan::OTelExportSinkOperator>(1);
auto s = plan_node->Init(otel_sink_op);
std::string row_batch = R"pb(
cols { time64ns_data { data: 10 data: 20 } }
cols { time64ns_data { data: 12 data: 22 } }
num_rows: 2
eow: true
eos: true)pb";

// Load a RowBatch to get the Input RowDescriptor.
table_store::schemapb::RowBatchData row_batch_proto;
EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(row_batch, &row_batch_proto));
RowDescriptor input_rd = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie()->desc();
RowDescriptor output_rd({});

auto tester = exec::ExecNodeTester<OTelExportSinkNode, plan::OTelExportSinkOperator>(
*plan_node, output_rd, {input_rd}, exec_state_.get());
auto rb = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie();

// Call ConsumeSpans twice in order to verify that the second
// invocation calls clear on the response object.
auto retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1);
EXPECT_OK(retval);
retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1);
EXPECT_OK(retval);
}

TEST_F(OTelExportSinkNodeTest, metrics_response_is_cleared) {
otelmetricscollector::ExportMetricsServiceResponse error_response;
error_response.mutable_partial_success()->set_rejected_data_points(1);
EXPECT_CALL(*metrics_mock_, Export(_, _, _))
.Times(::testing::AtLeast(2))
.WillOnce(DoAll(SetArgPointee<2>(error_response), Return(grpc::Status::OK)))
.WillRepeatedly(Invoke([&](const auto&, const auto&, auto* response) {
// It's expected that the response argument provided to Export
// has .Clear() called on it. This CALL assertion verifies that the
// response object no longer has rejected data points since it should
// have been .Clear()'ed at the beginning of the second ConsumeMetrics invocation
EXPECT_EQ(response->partial_success().rejected_data_points(), 0);
return grpc::Status::OK;
}));

planpb::OTelExportSinkOperator otel_sink_op;

std::string operator_proto = R"pb(
metrics {
name: "http.resp.latency"
time_column_index: 0
gauge { int_column_index: 1 }
})pb";
EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(operator_proto, &otel_sink_op));
auto plan_node = std::make_unique<plan::OTelExportSinkOperator>(1);
auto s = plan_node->Init(otel_sink_op);
std::string row_batch = R"pb(
cols { time64ns_data { data: 10 data: 11 } }
cols { int64_data { data: 15 data: 150 } }
num_rows: 2
eow: true
eos: true)pb";

// Load a RowBatch to get the Input RowDescriptor.
table_store::schemapb::RowBatchData row_batch_proto;
EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(row_batch, &row_batch_proto));
RowDescriptor input_rd = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie()->desc();
RowDescriptor output_rd({});

auto tester = exec::ExecNodeTester<OTelExportSinkNode, plan::OTelExportSinkOperator>(
*plan_node, output_rd, {input_rd}, exec_state_.get());
auto rb = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie();

// Call ConsumeMetrics twice in order to verify that the second
// invocation calls clear on the response object.
auto retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1);
EXPECT_OK(retval);
retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1);
EXPECT_OK(retval);
}

} // namespace exec
} // namespace carnot
} // namespace px