Skip to content

Commit ac26f19

Browse files
authored
Merge b152db7 into 89e6dec
2 parents 89e6dec + b152db7 commit ac26f19

File tree

132 files changed

+8024
-2168
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

132 files changed

+8024
-2168
lines changed

.github/config/muted_ya.txt

+1
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,4 @@ ydb/tests/functional/tenants test_tenants.py.*
107107
ydb/tests/functional/ydb_cli test_ydb_impex.py.TestImpex.test_big_dataset*
108108
ydb/tests/tools/pq_read/test test_timeout.py.TestTimeout.test_timeout
109109
ydb/tests/functional/rename [test_rename.py */10] chunk chunk
110+
ydb/tests/functional/suite_tests test_postgres.py.TestPGSQL.test_sql_suite[plan-jointest/join2.test]

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@
123123

124124
#include <ydb/core/sys_view/processor/processor.h>
125125
#include <ydb/core/sys_view/service/sysview_service.h>
126-
#include <ydb/core/statistics/stat_service.h>
126+
#include <ydb/core/statistics/service/service.h>
127127
#include <ydb/core/statistics/aggregator/aggregator.h>
128128

129129
#include <ydb/core/tablet/bootstrapper.h>

ydb/core/driver_lib/run/ya.make

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ PEERDIR(
9898
ydb/core/scheme_types
9999
ydb/core/security
100100
ydb/core/security/ldap_auth_provider
101-
ydb/core/statistics
102101
ydb/core/statistics/aggregator
102+
ydb/core/statistics/service
103103
ydb/core/sys_view/processor
104104
ydb/core/sys_view/service
105105
ydb/core/tablet

ydb/core/formats/arrow/protos/ssa.proto

+4
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ message TProgram {
4545
repeated uint64 HashValues = 1;
4646
}
4747

48+
message TCountMinSketchChecker {
49+
}
50+
4851
message TOlapIndexChecker {
4952
optional uint32 IndexId = 1;
5053
optional string ClassName = 2;
@@ -56,6 +59,7 @@ message TProgram {
5659
oneof Implementation {
5760
TBloomFilterChecker BloomFilter = 40;
5861
TCompositeChecker Composite = 41;
62+
TCountMinSketchChecker CountMinSketch = 42;
5963
}
6064
}
6165

ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

+24
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "kqp_executer_impl.h"
33

44
#include <ydb/core/kqp/gateway/actors/scheme.h>
5+
#include <ydb/core/kqp/gateway/actors/analyze_actor.h>
56
#include <ydb/core/kqp/gateway/local_rpc/helper.h>
67
#include <ydb/core/tx/tx_proxy/proxy.h>
78
#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
@@ -307,6 +308,29 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
307308
break;
308309
}
309310

311+
case NKqpProto::TKqpSchemeOperation::kAnalyzeTable: {
312+
const auto& analyzeOperation = schemeOp.GetAnalyzeTable();
313+
314+
auto analyzePromise = NewPromise<IKqpGateway::TGenericResult>();
315+
316+
TVector<TString> columns{analyzeOperation.columns().begin(), analyzeOperation.columns().end()};
317+
IActor* analyzeActor = new TAnalyzeActor(analyzeOperation.GetTablePath(), columns, analyzePromise);
318+
319+
auto actorSystem = TlsActivationContext->AsActorContext().ExecutorThread.ActorSystem;
320+
RegisterWithSameMailbox(analyzeActor);
321+
322+
auto selfId = SelfId();
323+
analyzePromise.GetFuture().Subscribe([actorSystem, selfId](const TFuture<IKqpGateway::TGenericResult>& future) {
324+
auto ev = MakeHolder<TEvPrivate::TEvResult>();
325+
ev->Result = future.GetValue();
326+
327+
actorSystem->Send(selfId, ev.Release());
328+
});
329+
330+
Become(&TKqpSchemeExecuter::ExecuteState);
331+
return;
332+
}
333+
310334
default:
311335
InternalError(TStringBuilder() << "Unexpected scheme operation: "
312336
<< (ui32) schemeOp.GetOperationCase());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
#include "analyze_actor.h"
2+
3+
#include <ydb/core/base/path.h>
4+
#include <ydb/core/util/ulid.h>
5+
#include <ydb/library/actors/core/log.h>
6+
#include <ydb/library/services/services.pb.h>
7+
8+
9+
namespace NKikimr::NKqp {
10+
11+
enum {
12+
FirstRoundCookie = 0,
13+
SecondRoundCookie = 1,
14+
};
15+
16+
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
17+
18+
TString MakeOperationId() {
19+
TULIDGenerator ulidGen;
20+
return ulidGen.Next(TActivationContext::Now()).ToBinary();
21+
}
22+
23+
TAnalyzeActor::TAnalyzeActor(TString tablePath, TVector<TString> columns, NThreading::TPromise<NYql::IKikimrGateway::TGenericResult> promise)
24+
: TablePath(tablePath)
25+
, Columns(columns)
26+
, Promise(promise)
27+
, OperationId(MakeOperationId())
28+
{}
29+
30+
void TAnalyzeActor::Bootstrap() {
31+
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
32+
auto navigate = std::make_unique<TNavigate>();
33+
auto& entry = navigate->ResultSet.emplace_back();
34+
entry.Path = SplitPath(TablePath);
35+
entry.Operation = TNavigate::EOp::OpTable;
36+
entry.RequestType = TNavigate::TEntry::ERequestType::ByPath;
37+
navigate->Cookie = FirstRoundCookie;
38+
39+
Send(NKikimr::MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
40+
41+
Become(&TAnalyzeActor::StateWork);
42+
}
43+
44+
void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, const TActorContext& ctx) {
45+
Y_UNUSED(ctx);
46+
47+
const auto& record = ev->Get()->Record;
48+
const TString operationId = record.GetOperationId();
49+
const auto status = record.GetStatus();
50+
51+
if (status != NKikimrStat::TEvAnalyzeResponse::STATUS_SUCCESS) {
52+
ALOG_CRIT(NKikimrServices::KQP_GATEWAY,
53+
"TAnalyzeActor, TEvAnalyzeResponse has status=" << status);
54+
}
55+
56+
if (operationId != OperationId) {
57+
ALOG_CRIT(NKikimrServices::KQP_GATEWAY,
58+
"TAnalyzeActor, TEvAnalyzeResponse has operationId=" << operationId
59+
<< " , but expected " << OperationId);
60+
}
61+
62+
NYql::IKikimrGateway::TGenericResult result;
63+
result.SetSuccess();
64+
Promise.SetValue(std::move(result));
65+
this->Die(ctx);
66+
}
67+
68+
void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
69+
std::unique_ptr<TNavigate> navigate(ev->Get()->Request.Release());
70+
Y_ABORT_UNLESS(navigate->ResultSet.size() == 1);
71+
auto& entry = navigate->ResultSet.front();
72+
73+
if (entry.Status != TNavigate::EStatus::Ok) {
74+
NYql::EYqlIssueCode error;
75+
switch (entry.Status) {
76+
case TNavigate::EStatus::PathErrorUnknown:
77+
case TNavigate::EStatus::RootUnknown:
78+
case TNavigate::EStatus::PathNotTable:
79+
case TNavigate::EStatus::TableCreationNotComplete:
80+
error = NYql::TIssuesIds::KIKIMR_SCHEME_ERROR;
81+
case TNavigate::EStatus::LookupError:
82+
case TNavigate::EStatus::RedirectLookupError:
83+
error = NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE;
84+
default:
85+
error = NYql::TIssuesIds::DEFAULT_ERROR;
86+
}
87+
Promise.SetValue(
88+
NYql::NCommon::ResultFromIssues<NYql::IKikimrGateway::TGenericResult>(
89+
error,
90+
TStringBuilder() << "Can't get statistics aggregator ID. " << entry.Status,
91+
{}
92+
)
93+
);
94+
this->Die(ctx);
95+
return;
96+
}
97+
98+
if (navigate->Cookie == SecondRoundCookie) {
99+
if (entry.DomainInfo->Params.HasStatisticsAggregator()) {
100+
SendStatisticsAggregatorAnalyze(entry, ctx);
101+
} else {
102+
Promise.SetValue(
103+
NYql::NCommon::ResultFromIssues<NYql::IKikimrGateway::TGenericResult>(
104+
NYql::TIssuesIds::DEFAULT_ERROR,
105+
TStringBuilder() << "Can't get statistics aggregator ID.", {}
106+
)
107+
);
108+
}
109+
110+
this->Die(ctx);
111+
return;
112+
}
113+
114+
PathId = entry.TableId.PathId;
115+
116+
auto& domainInfo = entry.DomainInfo;
117+
118+
auto navigateDomainKey = [this] (TPathId domainKey) {
119+
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
120+
auto navigate = std::make_unique<TNavigate>();
121+
auto& entry = navigate->ResultSet.emplace_back();
122+
entry.TableId = TTableId(domainKey.OwnerId, domainKey.LocalPathId);
123+
entry.Operation = TNavigate::EOp::OpPath;
124+
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
125+
entry.RedirectRequired = false;
126+
navigate->Cookie = SecondRoundCookie;
127+
128+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
129+
};
130+
131+
if (!domainInfo->IsServerless()) {
132+
if (domainInfo->Params.HasStatisticsAggregator()) {
133+
SendStatisticsAggregatorAnalyze(entry, ctx);
134+
return;
135+
}
136+
137+
navigateDomainKey(domainInfo->DomainKey);
138+
} else {
139+
navigateDomainKey(domainInfo->ResourcesDomainKey);
140+
}
141+
}
142+
143+
TDuration TAnalyzeActor::CalcBackoffTime() {
144+
ui32 backoffSlots = 1 << RetryCount;
145+
TDuration maxDuration = RetryInterval * backoffSlots;
146+
147+
double uncertaintyRatio = std::max(std::min(UncertainRatio, 1.0), 0.0);
148+
double uncertaintyMultiplier = RandomNumber<double>() * uncertaintyRatio - uncertaintyRatio + 1.0;
149+
150+
double durationMs = round(maxDuration.MilliSeconds() * uncertaintyMultiplier);
151+
durationMs = std::max(std::min(durationMs, MaxBackoffDurationMs), 0.0);
152+
return TDuration::MilliSeconds(durationMs);
153+
}
154+
155+
void TAnalyzeActor::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev, const TActorContext& ctx) {
156+
Y_UNUSED(ev, ctx);
157+
158+
if (RetryCount >= MaxRetryCount) {
159+
Promise.SetValue(
160+
NYql::NCommon::ResultFromError<NYql::IKikimrGateway::TGenericResult>(
161+
YqlIssue(
162+
{}, NYql::TIssuesIds::UNEXPECTED,
163+
TStringBuilder() << "Can't establish connection with the Statistics Aggregator!"
164+
)
165+
)
166+
);
167+
this->Die(ctx);
168+
return;
169+
}
170+
171+
++RetryCount;
172+
Schedule(CalcBackoffTime(), new TEvAnalyzePrivate::TEvAnalyzeRetry());
173+
}
174+
175+
void TAnalyzeActor::Handle(TEvAnalyzePrivate::TEvAnalyzeRetry::TPtr& ev, const TActorContext& ctx) {
176+
Y_UNUSED(ev, ctx);
177+
178+
auto analyzeRequest = std::make_unique<NStat::TEvStatistics::TEvAnalyze>();
179+
analyzeRequest->Record = Request.Record;
180+
Send(
181+
MakePipePerNodeCacheID(false),
182+
new TEvPipeCache::TEvForward(analyzeRequest.release(), StatisticsAggregatorId.value(), true),
183+
IEventHandle::FlagTrackDelivery
184+
);
185+
}
186+
187+
void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TActorContext& ctx) {
188+
Y_ABORT_UNLESS(entry.DomainInfo->Params.HasStatisticsAggregator());
189+
190+
StatisticsAggregatorId = entry.DomainInfo->Params.GetStatisticsAggregator();
191+
192+
auto& record = Request.Record;
193+
record.SetOperationId(OperationId);
194+
auto table = record.AddTables();
195+
196+
PathIdFromPathId(PathId, table->MutablePathId());
197+
198+
199+
THashMap<TString, ui32> tagByColumnName;
200+
for (const auto& [_, tableInfo]: entry.Columns) {
201+
tagByColumnName[TString(tableInfo.Name)] = tableInfo.Id;
202+
}
203+
204+
for (const auto& columnName: Columns) {
205+
if (!tagByColumnName.contains(columnName)){
206+
Promise.SetValue(
207+
NYql::NCommon::ResultFromError<NYql::IKikimrGateway::TGenericResult>(
208+
YqlIssue(
209+
{}, NYql::TIssuesIds::UNEXPECTED,
210+
TStringBuilder() << "No such column: " << columnName << " in the " << TablePath
211+
)
212+
)
213+
);
214+
this->Die(ctx);
215+
return;
216+
}
217+
218+
*table->MutableColumnTags()->Add() = tagByColumnName[columnName];
219+
}
220+
221+
auto analyzeRequest = std::make_unique<NStat::TEvStatistics::TEvAnalyze>();
222+
analyzeRequest->Record = Request.Record;
223+
Send(
224+
MakePipePerNodeCacheID(false),
225+
new TEvPipeCache::TEvForward(analyzeRequest.release(), entry.DomainInfo->Params.GetStatisticsAggregator(), true),
226+
IEventHandle::FlagTrackDelivery
227+
);
228+
}
229+
230+
void TAnalyzeActor::HandleUnexpectedEvent(ui32 typeRewrite) {
231+
ALOG_CRIT(
232+
NKikimrServices::KQP_GATEWAY,
233+
"TAnalyzeActor, unexpected event, request type: " << typeRewrite;
234+
);
235+
236+
Promise.SetValue(
237+
NYql::NCommon::ResultFromError<NYql::IKikimrGateway::TGenericResult>(
238+
YqlIssue(
239+
{}, NYql::TIssuesIds::UNEXPECTED,
240+
TStringBuilder() << "Unexpected event: " << typeRewrite
241+
)
242+
)
243+
);
244+
245+
this->PassAway();
246+
}
247+
248+
}// end of NKikimr::NKqp

0 commit comments

Comments
 (0)