Skip to content

Commit 1b14e8c

Browse files
authored
Update GetAgentStatus and kernel header UDTF to allow kelvin filtering (#2061)
Summary: Update `GetAgentStatus` and kernel header UDTF to allow kelvin filtering In order to leverage the `GetAgentStatus`'s `kernel_headers_installed` column for #2051, it would be convenient for the the UDTF to provide the ability to filter kelvins out -- they don't have access to kernel headers since they don't have the host filesystem volume mounted. This change introduces an `include_kelvin` init argument to the UDTFs with a default of `true` to preserve the existing behavior. This change also fixes a bug with UDTF's init arg default values, which didn't work prior to this change. Please review commit by commit to see the default arg bug fix followed by the UDTF changes. Relevant Issues: #2051 Type of change: /kind bug Test Plan: New logical planner test no longer fails with the following error ``` $ bazel test -c opt src/carnot/planner:logical_planner_test --test_output=all [ RUN ] LogicalPlannerTest.one_pems_one_kelvin src/carnot/planner/logical_planner_test.cc:64: Failure Value of: IsOK(::px::StatusAdapter(__status_or_value__64)) Actual: false (Invalid Argument : DATA_TYPE_UNKNOWN not handled as a default value) Expected: true ```
1 parent a95d661 commit 1b14e8c

File tree

4 files changed

+60
-14
lines changed

4 files changed

+60
-14
lines changed

src/carnot/planner/logical_planner_test.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,24 @@ TEST_F(LogicalPlannerTest, AppendSelfTest) {
272272
EXPECT_OK(plan->ToProto());
273273
}
274274

275+
constexpr char kAgentStatusQuery[] = R"pxl(
276+
import px
277+
278+
# GetAgentStatus takes an include_kelvin argument. This defaults to True
279+
# to preserve backwards compatibility.
280+
px.display(px.GetAgentStatus())
281+
)pxl";
282+
283+
TEST_F(LogicalPlannerTest, UDTFDefaultArgumentTest) {
284+
auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie();
285+
auto plan_or_s = planner->Plan(
286+
MakeQueryRequest(testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema),
287+
kAgentStatusQuery));
288+
EXPECT_OK(plan_or_s);
289+
auto plan = plan_or_s.ConsumeValueOrDie();
290+
EXPECT_OK(plan->ToProto());
291+
}
292+
275293
constexpr char kPlannerQueryError[] = R"pxl(
276294
import px
277295

src/carnot/udf/registry.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,21 +92,25 @@ void DefaultToScalarValue(const UDTFArg&, planpb::ScalarValue*) {
9292
template <>
9393
void DefaultToScalarValue<types::BOOLEAN>(const UDTFArg& arg, planpb::ScalarValue* out) {
9494
out->set_bool_value(arg.GetDefaultValue<types::BOOLEAN>().val);
95+
out->set_data_type(types::BOOLEAN);
9596
}
9697

9798
template <>
9899
void DefaultToScalarValue<types::INT64>(const UDTFArg& arg, planpb::ScalarValue* out) {
99100
out->set_int64_value(arg.GetDefaultValue<types::INT64>().val);
101+
out->set_data_type(types::INT64);
100102
}
101103

102104
template <>
103105
void DefaultToScalarValue<types::TIME64NS>(const UDTFArg& arg, planpb::ScalarValue* out) {
104106
out->set_time64_ns_value(arg.GetDefaultValue<types::TIME64NS>().val);
107+
out->set_data_type(types::TIME64NS);
105108
}
106109

107110
template <>
108111
void DefaultToScalarValue<types::FLOAT64>(const UDTFArg& arg, planpb::ScalarValue* out) {
109112
out->set_float64_value(arg.GetDefaultValue<types::FLOAT64>().val);
113+
out->set_data_type(types::FLOAT64);
110114
}
111115

112116
template <>
@@ -116,11 +120,13 @@ void DefaultToScalarValue<types::UINT128>(const UDTFArg& arg, planpb::ScalarValu
116120

117121
out_val->set_high(casted_arg.High64());
118122
out_val->set_high(casted_arg.Low64());
123+
out->set_data_type(types::UINT128);
119124
}
120125

121126
template <>
122127
void DefaultToScalarValue<types::STRING>(const UDTFArg& arg, planpb::ScalarValue* out) {
123128
out->set_string_value(std::string(arg.GetDefaultValue<types::STRING>()));
129+
out->set_data_type(types::STRING);
124130
}
125131
} // namespace
126132

src/carnot/udf/registry_test.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ udtfs {
421421
semantic_type: ST_NONE
422422
default_value {
423423
int64_value: 123
424+
data_type: INT64
424425
}
425426
}
426427
args {

src/vizier/funcs/md_udtfs/md_udtfs_impl.h

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,10 @@ class GetAgentStatus final : public carnot::udf::UDTF<GetAgentStatus> {
304304
kKernelHeadersInstalledDesc));
305305
}
306306

307-
Status Init(FunctionContext*) {
307+
Status Init(FunctionContext*, types::BoolValue include_kelvin) {
308308
px::vizier::services::metadata::AgentInfoRequest req;
309309
resp_ = std::make_unique<px::vizier::services::metadata::AgentInfoResponse>();
310+
include_kelvin_ = include_kelvin.val;
310311

311312
grpc::ClientContext ctx;
312313
add_context_authentication_func_(&ctx);
@@ -317,6 +318,11 @@ class GetAgentStatus final : public carnot::udf::UDTF<GetAgentStatus> {
317318
return Status::OK();
318319
}
319320

321+
static constexpr auto InitArgs() {
322+
return MakeArray(UDTFArg::Make<types::BOOLEAN>(
323+
"include_kelvin", "Whether to include Kelvin agents in the output", true));
324+
}
325+
320326
bool NextRecord(FunctionContext*, RecordWriter* rw) {
321327
const auto& agent_metadata = resp_->info(idx_);
322328
const auto& agent_info = agent_metadata.agent();
@@ -329,22 +335,26 @@ class GetAgentStatus final : public carnot::udf::UDTF<GetAgentStatus> {
329335
}
330336
// TODO(zasgar): Figure out abort mechanism;
331337

332-
rw->Append<IndexOf("agent_id")>(absl::MakeUint128(u.ab, u.cd));
333-
rw->Append<IndexOf("asid")>(agent_info.asid());
334-
rw->Append<IndexOf("hostname")>(agent_info.info().host_info().hostname());
335-
rw->Append<IndexOf("ip_address")>(agent_info.info().ip_address());
336-
rw->Append<IndexOf("agent_state")>(StringValue(magic_enum::enum_name(agent_status.state())));
337-
rw->Append<IndexOf("create_time")>(agent_info.create_time_ns());
338-
rw->Append<IndexOf("last_heartbeat_ns")>(agent_status.ns_since_last_heartbeat());
339-
rw->Append<IndexOf("kernel_headers_installed")>(
340-
agent_info.info().host_info().kernel_headers_installed());
338+
auto host_info = agent_info.info().host_info();
339+
auto collects_data = agent_info.info().capabilities().collects_data();
340+
if (collects_data || include_kelvin_) {
341+
rw->Append<IndexOf("agent_id")>(absl::MakeUint128(u.ab, u.cd));
342+
rw->Append<IndexOf("asid")>(agent_info.asid());
343+
rw->Append<IndexOf("hostname")>(host_info.hostname());
344+
rw->Append<IndexOf("ip_address")>(agent_info.info().ip_address());
345+
rw->Append<IndexOf("agent_state")>(StringValue(magic_enum::enum_name(agent_status.state())));
346+
rw->Append<IndexOf("create_time")>(agent_info.create_time_ns());
347+
rw->Append<IndexOf("last_heartbeat_ns")>(agent_status.ns_since_last_heartbeat());
348+
rw->Append<IndexOf("kernel_headers_installed")>(host_info.kernel_headers_installed());
349+
}
341350

342351
++idx_;
343352
return idx_ < resp_->info_size();
344353
}
345354

346355
private:
347356
int idx_ = 0;
357+
bool include_kelvin_ = false;
348358
std::unique_ptr<px::vizier::services::metadata::AgentInfoResponse> resp_;
349359
std::shared_ptr<MDSStub> stub_;
350360
std::function<void(grpc::ClientContext*)> add_context_authentication_func_;
@@ -425,9 +435,10 @@ class GetLinuxHeadersStatus final : public carnot::udf::UDTF<GetLinuxHeadersStat
425435
kKernelHeadersInstalledDesc));
426436
}
427437

428-
Status Init(FunctionContext*) {
438+
Status Init(FunctionContext*, BoolValue include_kelvin) {
429439
px::vizier::services::metadata::AgentInfoRequest req;
430440
resp_ = std::make_unique<px::vizier::services::metadata::AgentInfoResponse>();
441+
include_kelvin_ = include_kelvin.val;
431442

432443
grpc::ClientContext ctx;
433444
add_context_authentication_func_(&ctx);
@@ -438,21 +449,31 @@ class GetLinuxHeadersStatus final : public carnot::udf::UDTF<GetLinuxHeadersStat
438449
return Status::OK();
439450
}
440451

452+
static constexpr auto InitArgs() {
453+
return MakeArray(UDTFArg::Make<types::BOOLEAN>(
454+
"include_kelvin", "Whether to include Kelvin agents in the output", true));
455+
}
456+
441457
bool NextRecord(FunctionContext*, RecordWriter* rw) {
442458
const auto& agent_metadata = resp_->info(idx_);
443459
const auto& agent_info = agent_metadata.agent();
444460

445461
const auto asid = agent_info.asid();
446-
const auto kernel_headers_installed = agent_info.info().host_info().kernel_headers_installed();
447-
rw->Append<IndexOf("asid")>(asid);
448-
rw->Append<IndexOf("kernel_headers_installed")>(kernel_headers_installed);
462+
auto collects_data = agent_info.info().capabilities().collects_data();
463+
const auto host_info = agent_info.info().host_info();
464+
const auto kernel_headers_installed = host_info.kernel_headers_installed();
465+
if (collects_data || include_kelvin_) {
466+
rw->Append<IndexOf("asid")>(asid);
467+
rw->Append<IndexOf("kernel_headers_installed")>(kernel_headers_installed);
468+
}
449469

450470
++idx_;
451471
return idx_ < resp_->info_size();
452472
}
453473

454474
private:
455475
int idx_ = 0;
476+
bool include_kelvin_ = false;
456477
std::unique_ptr<px::vizier::services::metadata::AgentInfoResponse> resp_;
457478
std::shared_ptr<MDSStub> stub_;
458479
std::function<void(grpc::ClientContext*)> add_context_authentication_func_;

0 commit comments

Comments
 (0)