Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ydb/core/fq/libs/control_plane_storage/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -795,11 +795,15 @@ struct TEvControlPlaneStorage {
};

struct TEvFinalStatusReport : NActors::TEventLocal<TEvFinalStatusReport, EvFinalStatusReport> {
TEvFinalStatusReport(const TString& queryId, const TString& jobId, const TString& cloudId, const TString& scope, FederatedQuery::QueryMeta::ComputeStatus status, const NYql::TIssues& issues, const NYql::TIssues& transientIssues)
TEvFinalStatusReport(
const TString& queryId, const TString& jobId, const TString& cloudId, const TString& scope,
std::vector<std::pair<TString, ui64>>&& statistics, FederatedQuery::QueryMeta::ComputeStatus status,
const NYql::TIssues& issues, const NYql::TIssues& transientIssues)
: QueryId(queryId)
, JobId(jobId)
, CloudId(cloudId)
, Scope(scope)
, Statistics(std::move(statistics))
, Status(status)
, Issues(issues)
, TransientIssues(transientIssues)
Expand All @@ -809,6 +813,7 @@ struct TEvControlPlaneStorage {
TString JobId;
TString CloudId;
TString Scope;
std::vector<std::pair<TString, ui64>> Statistics;
FederatedQuery::QueryMeta::ComputeStatus Status = FederatedQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED;
NYql::TIssues Issues;
NYql::TIssues TransientIssues;
Expand Down
20 changes: 16 additions & 4 deletions ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ TPingTaskParams ConstructHardPingTask(
const TString& tablePathPrefix, const TDuration& automaticQueriesTtl, const TDuration& taskLeaseTtl,
const THashMap<ui64, TRetryPolicyItem>& retryPolicies, ::NMonitoring::TDynamicCounterPtr rootCounters,
uint64_t maxRequestSize, bool dumpRawStatistics, const std::shared_ptr<TFinalStatus>& finalStatus,
const TRequestCommonCountersPtr& commonCounters) {
const TRequestCommonCountersPtr& commonCounters, const std::shared_ptr<StatsValuesList>& finalStatistics) {

auto scope = request.scope();
auto query_id = request.query_id().value();
Expand Down Expand Up @@ -253,10 +253,14 @@ TPingTaskParams ConstructHardPingTask(

if (request.statistics()) {
TString statistics = request.statistics();
internal.clear_statistics();
PackStatisticsToProtobuf(*internal.mutable_statistics(), statistics);

if (!dumpRawStatistics) {
try {
statistics = GetPrettyStatistics(statistics);
} catch (const std::exception&) {
// LOG_AS?
CPS_LOG_E("Error on statistics prettification: " << CurrentExceptionMessage());
}
}
Expand Down Expand Up @@ -482,6 +486,7 @@ TPingTaskParams ConstructHardPingTask(
// YQv2 may not provide statistics with terminal status, use saved one
statistics = query.statistics().json();
}
*finalStatistics = ExtractStatisticsFromProtobuf(internal.statistics());
auto records = GetMeteringRecords(statistics, isBillable, jobId, request.scope(), HostName());
meteringRecords->swap(records);
} catch (const std::exception&) {
Expand Down Expand Up @@ -597,11 +602,12 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq

std::shared_ptr<Fq::Private::PingTaskResult> response = std::make_shared<Fq::Private::PingTaskResult>();
std::shared_ptr<TFinalStatus> finalStatus = std::make_shared<TFinalStatus>();
std::shared_ptr finalStatistics = std::make_shared<StatsValuesList>();

auto pingTaskParams = DoesPingTaskUpdateQueriesTable(request) ?
ConstructHardPingTask(request, response, YdbConnection->TablePathPrefix, Config->AutomaticQueriesTtl,
Config->TaskLeaseTtl, Config->RetryPolicies, Counters.Counters, Config->Proto.GetMaxRequestSize(),
Config->Proto.GetDumpRawStatistics(), finalStatus, requestCounters.Common) :
Config->Proto.GetDumpRawStatistics(), finalStatus, requestCounters.Common, finalStatistics) :
ConstructSoftPingTask(request, response, YdbConnection->TablePathPrefix, Config->TaskLeaseTtl, requestCounters.Common);
auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
auto result = ReadModifyWrite(pingTaskParams.Query, pingTaskParams.Params, pingTaskParams.Prepare, requestCounters, debugInfo);
Expand All @@ -628,7 +634,9 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq
}

if (success) {
actorSystem->Send(ControlPlaneStorageServiceActorId(), new TEvControlPlaneStorage::TEvFinalStatusReport(request.query_id().value(), request.job_id().value(), cloudId, scope, finalStatus->Status, finalStatus->Issues, finalStatus->TransientIssues));
actorSystem->Send(ControlPlaneStorageServiceActorId(), new TEvControlPlaneStorage::TEvFinalStatusReport(
request.query_id().value(), request.job_id().value(), cloudId, scope, std::move(*finalStatistics),
finalStatus->Status, finalStatus->Issues, finalStatus->TransientIssues));
}
});
}
Expand All @@ -644,7 +652,11 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvFinalStatus
}

Counters.GetFinalStatusCounters(event.CloudId, event.Scope)->IncByStatus(event.Status);
LOG_YQ_AUDIT_SERVICE_INFO("FinalStatus: cloud id: [" << event.CloudId << "], scope: [" << event.Scope << "], query id: [" << event.QueryId << "], job id: [" << event.JobId << "], status: " << FederatedQuery::QueryMeta::ComputeStatus_Name(event.Status));

Statistics statistics{event.Statistics};
LOG_YQ_AUDIT_SERVICE_INFO("FinalStatus: cloud id: [" << event.CloudId << "], scope: [" << event.Scope << "], query id: [" <<
event.QueryId << "], job id: [" << event.JobId << "], " << statistics << (statistics ? ", " : "") <<
"status: " << FederatedQuery::QueryMeta::ComputeStatus_Name(event.Status));
}


Expand Down
52 changes: 52 additions & 0 deletions ydb/core/fq/libs/control_plane_storage/internal/ut/utils_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include <library/cpp/testing/unittest/registar.h>
#include <ydb/core/fq/libs/control_plane_storage/internal/utils.h>
#include <ydb/core/fq/libs/control_plane_storage/proto/yq_internal.pb.h>

namespace NFq {

Y_UNIT_TEST_SUITE(ParseStats) {
Y_UNIT_TEST(ParseV2) {
FederatedQuery::Internal::QueryInternal internal;
auto statisticsPtr = internal.mutable_statistics();
PackStatisticsToProtobuf(*statisticsPtr, R"({"ResultSet":{"01_1_Stage-Source":{"SourceCpuTimeUs":{"sum":"828us","count":1,"avg":"828us","max":"828us","min":"828us"},"Output=3":{"Pop":{"Chunks":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"Rows":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"LastMessageMs":{"avg":"11:01:10.88s","sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"ActiveMessageMs":{"sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"FirstMessageMs":{"avg":"11:01:10.88s","sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"Bytes":{"sum":8,"count":1,"avg":8,"max":8,"min":8}},"Push":{"LastMessageMs":{"avg":"11:01:10.88s","sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"Rows":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"Chunks":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"ResumeMessageMs":{"avg":"11:01:10.88s","sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"FirstMessageMs":{"avg":"11:01:10.88s","sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"ActiveMessageMs":{"sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"PauseMessageMs":{"avg":"11:01:10.81s","sum":"0.00s","count":1,"max":"11:01:10.81s","min":"11:01:10.81s"},"WaitTimeUs":{"sum":"72ms","count":1,"avg":"72ms","max":"72ms","min":"72ms"},"WaitPeriods":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"WaitMessageMs":{"sum":"73ms","count":1,"max":"11:01:10.88s","min":"11:01:10.81s"}}},"MaxMemoryUsage":{"sum":241172480,"count":1,"avg":241172480,"max":241172480,"min":241172480},"TotalDurationMs":{"sum":"00:00:00.09s","count":1},"IngressBytes":{"sum":22,"count":1,"avg":22,"max":22,"min":22},"Tasks":{"sum":1,"count":1},"OutputRows":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"IngressRows":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"CpuTimeUs":{"sum":"1ms","count":1,"avg":"1ms","max":"1ms","min":"1ms"},"OutputBytes":{"sum":8,"count":1,"avg":8,"max":8,"min":8},"Ingress=S3Source":{"Pop":{"Chunks":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"LastMessageMs":{"avg":"11:01:10.88s","sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"ActiveMessageMs":{"sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"FirstMessageMs":{"avg":"11:01:10.88s","sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"Bytes":{"sum":17,"count":1,"avg":17,"max":17,"min":17}},"Ingress":{"Rows":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"LastMessageMs":{"avg":"11:01:10.88s","sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"Chunks":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"ResumeMessageMs":{"avg":"11:01:10.88s","sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"FirstMessageMs":{"avg":"11:01:10.88s","sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"ActiveMessageMs":{"sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"Bytes":{"sum":22,"count":1,"avg":22,"max":22,"min":22},"Splits":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"PauseMessageMs":{"avg":"11:01:10.80s","sum":"0.00s","count":1,"max":"11:01:10.80s","min":"11:01:10.80s"},"WaitTimeUs":{"sum":"86ms","count":1,"avg":"86ms","max":"86ms","min":"86ms"},"WaitPeriods":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"WaitMessageMs":{"sum":"86ms","count":1,"max":"11:01:10.88s","min":"11:01:10.80s"}},"Push":{"LastMessageMs":{"avg":"11:01:10.88s","sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"Chunks":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"ResumeMessageMs":{"avg":"11:01:10.88s","sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"FirstMessageMs":{"avg":"11:01:10.88s","sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"ActiveMessageMs":{"sum":"0.00s","count":1,"max":"11:01:10.88s","min":"11:01:10.88s"},"Bytes":{"sum":17,"count":1,"avg":17,"max":17,"min":17},"PauseMessageMs":{"avg":"11:01:10.80s","sum":"0.00s","count":1,"max":"11:01:10.80s","min":"11:01:10.80s"},"WaitTimeUs":{"sum":"86ms","count":1,"avg":"86ms","max":"86ms","min":"86ms"},"WaitPeriods":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"WaitMessageMs":{"sum":"86ms","count":1,"max":"11:01:10.88s","min":"11:01:10.80s"}}}},"02_3_Collect":{"Output=RESULT":{"Pop":{"Chunks":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"Rows":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"LastMessageMs":{"avg":"11:01:10.89s","sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"ActiveMessageMs":{"sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"FirstMessageMs":{"avg":"11:01:10.89s","sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"Bytes":{"sum":8,"count":1,"avg":8,"max":8,"min":8}},"Push":{"LastMessageMs":{"avg":"11:01:10.89s","sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"Rows":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"Chunks":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"ResumeMessageMs":{"avg":"11:01:10.89s","sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"FirstMessageMs":{"avg":"11:01:10.89s","sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"ActiveMessageMs":{"sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"PauseMessageMs":{"avg":"11:01:10.81s","sum":"0.00s","count":1,"max":"11:01:10.81s","min":"11:01:10.81s"},"WaitTimeUs":{"sum":"77ms","count":1,"avg":"77ms","max":"77ms","min":"77ms"},"WaitPeriods":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"WaitMessageMs":{"sum":"77ms","count":1,"max":"11:01:10.89s","min":"11:01:10.81s"}}},"MaxMemoryUsage":{"sum":31457280,"count":1,"avg":31457280,"max":31457280,"min":31457280},"TotalDurationMs":{"sum":"00:00:00.08s","count":1},"InputBytes":{"sum":8,"count":1,"avg":8,"max":8,"min":8},"ResultRows":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"Tasks":{"sum":1,"count":1},"ResultBytes":{"sum":8,"count":1,"avg":8,"max":8,"min":8},"OutputRows":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"InputRows":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"CpuTimeUs":{"sum":"771us","count":1,"avg":"771us","max":"771us","min":"771us"},"OutputBytes":{"sum":8,"count":1,"avg":8,"max":8,"min":8},"Input=1":{"Pop":{"Chunks":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"Rows":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"LastMessageMs":{"avg":"11:01:10.89s","sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"ActiveMessageMs":{"sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"FirstMessageMs":{"avg":"11:01:10.89s","sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"Bytes":{"sum":8,"count":1,"avg":8,"max":8,"min":8}},"Push":{"Rows":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"LastMessageMs":{"avg":"11:01:10.89s","sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"Chunks":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"ResumeMessageMs":{"avg":"11:01:10.89s","sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"FirstMessageMs":{"avg":"11:01:10.89s","sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"ActiveMessageMs":{"sum":"0.00s","count":1,"max":"11:01:10.89s","min":"11:01:10.89s"},"Bytes":{"sum":8,"count":1,"avg":8,"max":8,"min":8},"PauseMessageMs":{"avg":"11:01:10.81s","sum":"0.00s","count":1,"max":"11:01:10.81s","min":"11:01:10.81s"},"WaitTimeUs":{"sum":"77ms","count":1,"avg":"77ms","max":"77ms","min":"77ms"},"WaitPeriods":{"sum":1,"count":1,"avg":1,"max":1,"min":1},"WaitMessageMs":{"sum":"77ms","count":1,"max":"11:01:10.89s","min":"11:01:10.81s"}}}},"MaxMemoryUsage":{"min":31457280,"max":241172480,"avg":136314880,"sum":272629760,"count":2},"CpuTimeUs":{"min":"771us","max":"1ms","avg":"1ms","sum":"2ms","count":2},"SourceCpuTimeUs":{"min":"828us","max":"828us","avg":"828us","sum":"828us","count":1},"InputBytes":{"min":8,"max":8,"avg":8,"sum":8,"count":1},"InputRows":{"min":1,"max":1,"avg":1,"sum":1,"count":1},"OutputBytes":{"min":8,"max":8,"avg":8,"sum":16,"count":2},"OutputRows":{"min":1,"max":1,"avg":1,"sum":2,"count":2},"ResultBytes":{"min":8,"max":8,"avg":8,"sum":8,"count":1},"ResultRows":{"min":1,"max":1,"avg":1,"sum":1,"count":1},"IngressBytes":{"min":22,"max":22,"avg":22,"sum":22,"count":1},"IngressRows":{"min":1,"max":1,"avg":1,"sum":1,"count":1}}})");

std::unordered_map<std::string_view, i64> expected{
{"IngressBytes", 22},
{"InputBytes", 8},
{"OutputBytes", 16},
{"S3Source", 22}};

for (const auto& statsElement : *statisticsPtr) {
const auto& name = statsElement.name();
auto value = statsElement.value();

auto it = expected.find(name);
UNIT_ASSERT(it != expected.end());
UNIT_ASSERT_EQUAL(value, it->second);
}
UNIT_ASSERT_EQUAL(expected.size(), static_cast<size_t>(statisticsPtr->size()));
}

Y_UNIT_TEST(Parse2SourcesV2) {
FederatedQuery::Internal::QueryInternal internal;
auto statisticsPtr = internal.mutable_statistics();
PackStatisticsToProtobuf(*statisticsPtr, R"({"ResultSet": {"01_1_Stage-Source": {"IngressBytes": {"sum": 24,"count": 1,"avg": 24,"max": 24,"min": 24},"OutputBytes": {"sum": 13,"count": 1,"avg": 13,"max": 13,"min": 13},"Ingress=S3Source": {"Ingress": {"Bytes": {"sum": 24,"count": 1,"avg": 24,"max": 24,"min": 24}}}},"02_3_Collect": {"Output=5": {"Pop": {"Bytes": {"sum": 13,"count": 1,"avg": 13,"max": 13,"min": 13}}},"InputBytes": {"sum": 13,"count": 1,"avg": 13,"max": 13,"min": 13},"OutputBytes": {"sum": 13,"count": 1,"avg": 13,"max": 13,"min": 13},"Input=1": {"Pop": {"Bytes": {"sum": 13,"count": 1,"avg": 13,"max": 13,"min": 13}}}},"03_5_InnerJoin (MapJoin)-Source": {"SourceCpuTimeUs": {"sum": "495us","count": 1,"avg": "495us","max": "495us","min": "495us"},"Output=7": {"Pop": {"Bytes": {"sum": 18,"count": 1,"avg": 18,"max": 18,"min": 18}}},"InputBytes": {"sum": 13,"count": 1,"avg": 13,"max": 13,"min": 13},"IngressBytes": {"sum": 22,"count": 1,"avg": 22,"max": 22,"min": 22},"OutputBytes": {"sum": 18,"count": 1,"avg": 18,"max": 18,"min": 18},"Ingress=S3Source": {"Pop": {"Bytes": {"sum": 17,"count": 1,"avg": 17,"max": 17,"min": 17}},"Ingress": {"Bytes": {"sum": 22,"count": 1,"avg": 22,"max": 22,"min": 22}},"Push": {"Bytes": {"sum": 17,"count": 1,"avg": 17,"max": 17,"min": 17}}},"Input=3": {"Pop": {"Bytes": {"sum": 13,"count": 1,"avg": 13,"max": 13,"min": 13}},"Push": {"Bytes": {"sum": 13,"count": 1,"avg": 13,"max": 13,"min": 13}}}},"04_7_Collect": {"Output=RESULT": {"Pop": {"Bytes": {"sum": 18,"count": 1,"avg": 18,"max": 18,"min": 18}},"Push": {"Rows": {"sum": 1,"count": 1,"avg": 1,"max": 1,"min": 1}}},"InputBytes": {"sum": 18,"count": 1,"avg": 18,"max": 18,"min": 18},"OutputBytes": {"sum": 18,"count": 1,"avg": 18,"max": 18,"min": 18},"Input=5": {"Pop": {"Bytes": {"sum": 18,"count": 1,"avg": 18,"max": 18,"min": 18}},"Push": {"Bytes": {"sum": 18,"count": 1,"avg": 18,"max": 18,"min": 18}}}},"InputBytes": {"min": 13,"max": 18,"avg": 14,"sum": 44,"count": 3},"OutputBytes": {"min": 13,"max": 18,"avg": 15,"sum": 62,"count": 4},"IngressBytes": {"min": 22,"max": 24,"avg": 23,"sum": 46,"count": 2}}})");

std::unordered_map<std::string_view, i64> expected{
{"IngressBytes", 46},
{"InputBytes", 44},
{"OutputBytes", 62},
{"S3Source", 46}};

for (const auto& statsElement : *statisticsPtr) {
const auto& name = statsElement.name();
auto value = statsElement.value();

auto it = expected.find(name);
UNIT_ASSERT(it != expected.end());
UNIT_ASSERT_EQUAL(value, it->second);
}
UNIT_ASSERT_EQUAL(expected.size(), static_cast<size_t>(statisticsPtr->size()));
}
}
}
18 changes: 18 additions & 0 deletions ydb/core/fq/libs/control_plane_storage/internal/ut/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
UNITTEST_FOR(ydb/core/fq/libs/control_plane_storage/internal)

OWNER(g:yq)

SIZE(MEDIUM)

SRCS(utils_ut.cpp)

PEERDIR(
library/cpp/testing/unittest
library/cpp/json/yson
ydb/library/yql/public/udf/service/stub
)

YQL_LAST_ABI_VERSION()

END()

Loading