Skip to content

Commit f490faf

Browse files
authored
YQ-4046 KqpRun improved templates (#13951)
1 parent 97df0fb commit f490faf

File tree

7 files changed

+154
-34
lines changed

7 files changed

+154
-34
lines changed

ydb/tests/tools/kqprun/configuration/app_config.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,15 @@ ActorSystemConfig {
4646

4747
ColumnShardConfig {
4848
DisabledOnSchemeShard: false
49+
WritingInFlightRequestBytesLimit: 104857600
4950
}
5051

5152
FeatureFlags {
5253
EnableExternalDataSources: true
5354
EnableScriptExecutionOperations: true
5455
EnableExternalSourceSchemaInference: true
5556
EnableTempTables: true
57+
EnableReplaceIfExistsForExternalEntities: true
5658
}
5759

5860
KQPConfig {

ydb/tests/tools/kqprun/kqprun.cpp

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,6 @@ struct TExecutionOptions {
8888

8989
TRequestOptions GetSchemeQueryOptions() const {
9090
TString sql = SchemeQuery;
91-
if (UseTemplates) {
92-
ReplaceYqlTokenTemplate(sql);
93-
}
9491

9592
return {
9693
.Query = sql,
@@ -108,7 +105,6 @@ struct TExecutionOptions {
108105

109106
TString sql = ScriptQueries[index];
110107
if (UseTemplates) {
111-
ReplaceYqlTokenTemplate(sql);
112108
SubstGlobal(sql, "${QUERY_ID}", ToString(queryId));
113109
}
114110

@@ -271,16 +267,6 @@ struct TExecutionOptions {
271267
ythrow yexception() << "Cannot format storage without real PDisks, please use --storage-path";
272268
}
273269
}
274-
275-
private:
276-
static void ReplaceYqlTokenTemplate(TString& sql) {
277-
const TString variableName = TStringBuilder() << "${" << YQL_TOKEN_VARIABLE << "}";
278-
if (const TString& yqlToken = GetEnv(YQL_TOKEN_VARIABLE)) {
279-
SubstGlobal(sql, variableName, yqlToken);
280-
} else if (sql.Contains(variableName)) {
281-
ythrow yexception() << "Failed to replace ${YQL_TOKEN} template, please specify YQL_TOKEN environment variable\n";
282-
}
283-
}
284270
};
285271

286272

@@ -446,6 +432,7 @@ class TMain : public TMainClassArgs {
446432
TExecutionOptions ExecutionOptions;
447433
TRunnerOptions RunnerOptions;
448434

435+
std::unordered_map<TString, TString> Templates;
449436
THashMap<TString, TString> TablesMapping;
450437
TVector<TString> UdfsPaths;
451438
TString UdfsDirectory;
@@ -537,6 +524,31 @@ class TMain : public TMainClassArgs {
537524
.NoArgument()
538525
.SetFlag(&ExecutionOptions.UseTemplates);
539526

527+
options.AddLongOption("var-template", "Add template from environment variables or file for -s and -p queries (use variable@file for files)")
528+
.RequiredArgument("variable")
529+
.Handler1([this](const NLastGetopt::TOptsParser* option) {
530+
TStringBuf variable;
531+
TStringBuf filePath;
532+
TStringBuf(option->CurVal()).Split('@', variable, filePath);
533+
if (variable.empty()) {
534+
ythrow yexception() << "Variable name should not be empty";
535+
}
536+
537+
TString value;
538+
if (!filePath.empty()) {
539+
value = LoadFile(TString(filePath));
540+
} else {
541+
value = GetEnv(TString(variable));
542+
if (!value) {
543+
ythrow yexception() << "Invalid env template, can not find value for variable '" << variable << "'";
544+
}
545+
}
546+
547+
if (!Templates.emplace(variable, value).second) {
548+
ythrow yexception() << "Got duplicated template variable name '" << variable << "'";
549+
}
550+
});
551+
540552
options.AddLongOption('t', "table", "File with input table (can be used by YT with -E flag), table@file")
541553
.RequiredArgument("table@file")
542554
.Handler1([this](const NLastGetopt::TOptsParser* option) {
@@ -845,6 +857,11 @@ class TMain : public TMainClassArgs {
845857
.NoArgument()
846858
.SetFlag(&EmulateYt);
847859

860+
options.AddLongOption('H', "health-check", "Level of health check before start (max level 2)")
861+
.RequiredArgument("uint")
862+
.DefaultValue(1)
863+
.StoreResult(&RunnerOptions.YdbSettings.HealthCheckLevel);
864+
848865
options.AddLongOption("domain", "Test cluster domain name")
849866
.RequiredArgument("name")
850867
.DefaultValue(RunnerOptions.YdbSettings.DomainName)
@@ -862,9 +879,8 @@ class TMain : public TMainClassArgs {
862879
.RequiredArgument("path")
863880
.InsertTo(&RunnerOptions.YdbSettings.ServerlessTenants);
864881

865-
options.AddLongOption("storage-size", "Domain storage size in gigabytes")
882+
options.AddLongOption("storage-size", "Domain storage size in gigabytes (32 GiB by default)")
866883
.RequiredArgument("uint")
867-
.DefaultValue(32)
868884
.StoreMappedResultT<ui32>(&RunnerOptions.YdbSettings.DiskSize, [](ui32 diskSize) {
869885
return static_cast<ui64>(diskSize) << 30;
870886
});
@@ -898,6 +914,11 @@ class TMain : public TMainClassArgs {
898914
int DoRun(NLastGetopt::TOptsParseResult&&) override {
899915
ExecutionOptions.Validate(RunnerOptions);
900916

917+
ReplaceTemplates(ExecutionOptions.SchemeQuery);
918+
for (auto& sql : ExecutionOptions.ScriptQueries) {
919+
ReplaceTemplates(sql);
920+
}
921+
901922
RunnerOptions.YdbSettings.YqlToken = YqlToken;
902923
RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get();
903924

@@ -943,6 +964,21 @@ class TMain : public TMainClassArgs {
943964

944965
return 0;
945966
}
967+
968+
private:
969+
void ReplaceTemplates(TString& sql) const {
970+
for (const auto& [variable, value] : Templates) {
971+
SubstGlobal(sql, TStringBuilder() << "${" << variable <<"}", value);
972+
}
973+
if (ExecutionOptions.UseTemplates) {
974+
const TString tokenVariableName = TStringBuilder() << "${" << YQL_TOKEN_VARIABLE << "}";
975+
if (const TString& yqlToken = GetEnv(YQL_TOKEN_VARIABLE)) {
976+
SubstGlobal(sql, tokenVariableName, yqlToken);
977+
} else if (sql.Contains(tokenVariableName)) {
978+
ythrow yexception() << "Failed to replace ${YQL_TOKEN} template, please specify YQL_TOKEN environment variable";
979+
}
980+
}
981+
}
946982
};
947983

948984

@@ -972,7 +1008,7 @@ void FloatingPointExceptionHandler(int) {
9721008

9731009
Cerr << colors.Red() << "======= floating point exception call stack ========" << colors.Default() << Endl;
9741010
FormatBackTrace(&Cerr);
975-
Cerr << colors.Red() << "==============================================" << colors.Default() << Endl;
1011+
Cerr << colors.Red() << "====================================================" << colors.Default() << Endl;
9761012

9771013
abort();
9781014
}

ydb/tests/tools/kqprun/src/actors.cpp

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -232,40 +232,78 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
232232
static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10);
233233

234234
public:
235-
TResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount)
236-
: ExpectedNodeCount_(expectedNodeCount)
235+
TResourcesWaiterActor(NThreading::TPromise<void> promise, const TWaitResourcesSettings& settings)
236+
: Settings_(settings)
237237
, Promise_(promise)
238238
{}
239239

240240
void Bootstrap() {
241-
Become(&TResourcesWaiterActor::StateFunc);
241+
if (Settings_.HealthCheckLevel < 1) {
242+
Finish();
243+
return;
244+
}
245+
246+
Become(&TResourcesWaiterActor::StateWaitNodeCont);
242247
CheckResourcesPublish();
243248
}
244249

245-
void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
250+
void HandleWaitNodeCountWakeup() {
246251
CheckResourcesPublish();
247252
}
248253

249254
void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) {
250-
if (ev->Get()->NodeCount == ExpectedNodeCount_) {
251-
Promise_.SetValue();
252-
PassAway();
255+
const auto nodeCont = ev->Get()->NodeCount;
256+
if (nodeCont == Settings_.ExpectedNodeCount) {
257+
if (Settings_.HealthCheckLevel < 2) {
258+
Finish();
259+
} else {
260+
Become(&TResourcesWaiterActor::StateWaitScript);
261+
StartScriptQuery();
262+
}
253263
return;
254264
}
255265

266+
if (Settings_.VerboseLevel >= 2) {
267+
Cout << CoutColors_.Cyan() << "Retry invalid node count, got " << nodeCont << ", expected " << Settings_.ExpectedNodeCount << CoutColors_.Default() << Endl;
268+
}
256269
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
257270
}
258271

259-
STRICT_STFUNC(StateFunc,
260-
hFunc(NActors::TEvents::TEvWakeup, Handle);
272+
void HandleWaitScriptWakeup() {
273+
StartScriptQuery();
274+
}
275+
276+
void Handle(NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) {
277+
const auto status = ev->Get()->Status;
278+
if (status == Ydb::StatusIds::SUCCESS) {
279+
Finish();
280+
return;
281+
}
282+
283+
if (Settings_.VerboseLevel >= 2) {
284+
Cout << CoutColors_.Cyan() << "Retry script creation fail with status " << status << ", reason:\n" << CoutColors_.Default() << ev->Get()->Issues.ToString() << Endl;
285+
}
286+
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
287+
}
288+
289+
STRICT_STFUNC(StateWaitNodeCont,
290+
sFunc(NActors::TEvents::TEvWakeup, HandleWaitNodeCountWakeup);
261291
hFunc(TEvPrivate::TEvResourcesInfo, Handle);
262292
)
263293

294+
STRICT_STFUNC(StateWaitScript,
295+
sFunc(NActors::TEvents::TEvWakeup, HandleWaitScriptWakeup);
296+
hFunc(NKikimr::NKqp::TEvKqp::TEvScriptResponse, Handle);
297+
)
298+
264299
private:
265300
void CheckResourcesPublish() {
266301
GetResourceManager();
267302

268303
if (!ResourceManager_) {
304+
if (Settings_.VerboseLevel >= 2) {
305+
Cout << CoutColors_.Cyan() << "Retry uninitialized resource manager" << CoutColors_.Default() << Endl;
306+
}
269307
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
270308
return;
271309
}
@@ -287,8 +325,27 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
287325
});
288326
}
289327

328+
void StartScriptQuery() {
329+
auto event = MakeHolder<NKikimr::NKqp::TEvKqp::TEvScriptRequest>();
330+
event->Record.SetUserToken(NACLib::TUserToken("", BUILTIN_ACL_ROOT, {}).SerializeAsString());
331+
332+
auto request = event->Record.MutableRequest();
333+
request->SetQuery("SELECT 42");
334+
request->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT);
335+
request->SetAction(NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE);
336+
request->SetDatabase(Settings_.Database);
337+
338+
Send(NKikimr::NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
339+
}
340+
341+
void Finish() {
342+
Promise_.SetValue();
343+
PassAway();
344+
}
345+
290346
private:
291-
const i32 ExpectedNodeCount_;
347+
const TWaitResourcesSettings Settings_;
348+
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
292349
NThreading::TPromise<void> Promise_;
293350

294351
std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
@@ -415,8 +472,8 @@ NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settin
415472
return new TAsyncQueryRunnerActor(settings);
416473
}
417474

418-
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount) {
419-
return new TResourcesWaiterActor(promise, expectedNodeCount);
475+
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, const TWaitResourcesSettings& settings) {
476+
return new TResourcesWaiterActor(promise, settings);
420477
}
421478

422479
NActors::IActor* CreateSessionHolderActor(TCreateSessionRequest request, NThreading::TPromise<TString> openPromise, NThreading::TPromise<void> closePromise) {

ydb/tests/tools/kqprun/src/actors.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ struct TCreateSessionRequest {
2727
ui8 VerboseLevel;
2828
};
2929

30+
struct TWaitResourcesSettings {
31+
i32 ExpectedNodeCount;
32+
ui8 HealthCheckLevel;
33+
ui8 VerboseLevel;
34+
TString Database;
35+
};
36+
3037
struct TEvPrivate {
3138
enum EEv : ui32 {
3239
EvStartAsyncQuery = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
@@ -83,7 +90,7 @@ NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPr
8390

8491
NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings);
8592

86-
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount);
93+
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, const TWaitResourcesSettings& settings);
8794

8895
NActors::IActor* CreateSessionHolderActor(TCreateSessionRequest request, NThreading::TPromise<TString> openPromise, NThreading::TPromise<void> closePromise);
8996

ydb/tests/tools/kqprun/src/common.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ struct TYdbSetupSettings {
3636
std::unordered_set<TString> SharedTenants;
3737
std::unordered_set<TString> ServerlessTenants;
3838
TDuration InitializationTimeout = TDuration::Seconds(10);
39+
ui8 HealthCheckLevel = 1;
3940
bool SameSession = false;
4041

4142
bool DisableDiskMock = false;
4243
bool FormatStorage = false;
4344
std::optional<TString> PDisksPath;
44-
ui64 DiskSize = 32_GB;
45+
std::optional<ui64> DiskSize;
4546

4647
bool MonitoringEnabled = false;
4748
ui16 MonitoringPortOffset = 0;

ydb/tests/tools/kqprun/src/proto/storage_meta.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ package NKqpRun;
44

55
message TStorageMeta {
66
uint64 StorageGeneration = 1;
7+
uint64 StorageSize = 2;
78
}

ydb/tests/tools/kqprun/src/ydb_setup.cpp

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,10 @@ class TYdbSetup::TImpl {
178178
void SetStorageSettings(NKikimr::Tests::TServerSettings& serverSettings) const {
179179
TString diskPath;
180180
if (Settings_.PDisksPath && *Settings_.PDisksPath != "-") {
181-
diskPath = TStringBuilder() << *Settings_.PDisksPath << "/";
181+
if (Settings_.PDisksPath->empty()) {
182+
ythrow yexception() << "Storage directory path should not be empty";
183+
}
184+
diskPath = TStringBuilder() << Settings_.PDisksPath << (Settings_.PDisksPath->back() != '/' ? "/" : "");
182185
}
183186

184187
bool formatDisk = true;
@@ -196,6 +199,13 @@ class TYdbSetup::TImpl {
196199
formatDisk = false;
197200
}
198201

202+
if (Settings_.DiskSize && storageMeta.GetStorageSize() != *Settings_.DiskSize) {
203+
if (!formatDisk) {
204+
ythrow yexception() << "Cannot change disk size without formatting storage, please use --format-storage";
205+
}
206+
storageMeta.SetStorageSize(*Settings_.DiskSize);
207+
}
208+
199209
TString storageMetaStr;
200210
google::protobuf::TextFormat::PrintToString(storageMeta, &storageMetaStr);
201211

@@ -208,7 +218,7 @@ class TYdbSetup::TImpl {
208218
.UseDisk = !!Settings_.PDisksPath,
209219
.SectorSize = NKikimr::TTestStorageFactory::SECTOR_SIZE,
210220
.ChunkSize = Settings_.PDisksPath ? NKikimr::TTestStorageFactory::CHUNK_SIZE : NKikimr::TTestStorageFactory::MEM_CHUNK_SIZE,
211-
.DiskSize = Settings_.DiskSize,
221+
.DiskSize = Settings_.DiskSize ? *Settings_.DiskSize : 32_GB,
212222
.FormatDisk = formatDisk,
213223
.DiskPath = diskPath
214224
};
@@ -351,7 +361,13 @@ class TYdbSetup::TImpl {
351361

352362
void WaitResourcesPublishing() const {
353363
auto promise = NThreading::NewPromise();
354-
GetRuntime()->Register(CreateResourcesWaiterActor(promise, Settings_.NodeCount), 0, GetRuntime()->GetAppData().SystemPoolId);
364+
const TWaitResourcesSettings settings = {
365+
.ExpectedNodeCount = static_cast<i32>(Settings_.NodeCount),
366+
.HealthCheckLevel = Settings_.HealthCheckLevel,
367+
.VerboseLevel = Settings_.VerboseLevel,
368+
.Database = NKikimr::CanonizePath(Settings_.DomainName)
369+
};
370+
GetRuntime()->Register(CreateResourcesWaiterActor(promise, settings), 0, GetRuntime()->GetAppData().SystemPoolId);
355371

356372
try {
357373
promise.GetFuture().GetValue(Settings_.InitializationTimeout);

0 commit comments

Comments
 (0)