Skip to content

YQ-4046 KqpRun improved templates #13951

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ ActorSystemConfig {

ColumnShardConfig {
DisabledOnSchemeShard: false
WritingInFlightRequestBytesLimit: 104857600
}

FeatureFlags {
EnableExternalDataSources: true
EnableScriptExecutionOperations: true
EnableExternalSourceSchemaInference: true
EnableTempTables: true
EnableReplaceIfExistsForExternalEntities: true
}

KQPConfig {
Expand Down
70 changes: 53 additions & 17 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ struct TExecutionOptions {

TRequestOptions GetSchemeQueryOptions() const {
TString sql = SchemeQuery;
if (UseTemplates) {
ReplaceYqlTokenTemplate(sql);
}

return {
.Query = sql,
Expand All @@ -108,7 +105,6 @@ struct TExecutionOptions {

TString sql = ScriptQueries[index];
if (UseTemplates) {
ReplaceYqlTokenTemplate(sql);
SubstGlobal(sql, "${QUERY_ID}", ToString(queryId));
}

Expand Down Expand Up @@ -271,16 +267,6 @@ struct TExecutionOptions {
ythrow yexception() << "Cannot format storage without real PDisks, please use --storage-path";
}
}

private:
static void ReplaceYqlTokenTemplate(TString& sql) {
const TString variableName = TStringBuilder() << "${" << YQL_TOKEN_VARIABLE << "}";
if (const TString& yqlToken = GetEnv(YQL_TOKEN_VARIABLE)) {
SubstGlobal(sql, variableName, yqlToken);
} else if (sql.Contains(variableName)) {
ythrow yexception() << "Failed to replace ${YQL_TOKEN} template, please specify YQL_TOKEN environment variable\n";
}
}
};


Expand Down Expand Up @@ -446,6 +432,7 @@ class TMain : public TMainClassArgs {
TExecutionOptions ExecutionOptions;
TRunnerOptions RunnerOptions;

std::unordered_map<TString, TString> Templates;
THashMap<TString, TString> TablesMapping;
TVector<TString> UdfsPaths;
TString UdfsDirectory;
Expand Down Expand Up @@ -537,6 +524,31 @@ class TMain : public TMainClassArgs {
.NoArgument()
.SetFlag(&ExecutionOptions.UseTemplates);

options.AddLongOption("var-template", "Add template from environment variables or file for -s and -p queries (use variable@file for files)")
.RequiredArgument("variable")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
TStringBuf variable;
TStringBuf filePath;
TStringBuf(option->CurVal()).Split('@', variable, filePath);
if (variable.empty()) {
ythrow yexception() << "Variable name should not be empty";
}

TString value;
if (!filePath.empty()) {
value = LoadFile(TString(filePath));
} else {
value = GetEnv(TString(variable));
if (!value) {
ythrow yexception() << "Invalid env template, can not find value for variable '" << variable << "'";
}
}

if (!Templates.emplace(variable, value).second) {
ythrow yexception() << "Got duplicated template variable name '" << variable << "'";
}
});

options.AddLongOption('t', "table", "File with input table (can be used by YT with -E flag), table@file")
.RequiredArgument("table@file")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
Expand Down Expand Up @@ -845,6 +857,11 @@ class TMain : public TMainClassArgs {
.NoArgument()
.SetFlag(&EmulateYt);

options.AddLongOption('H', "health-check", "Level of health check before start (max level 2)")
.RequiredArgument("uint")
.DefaultValue(1)
.StoreResult(&RunnerOptions.YdbSettings.HealthCheckLevel);

options.AddLongOption("domain", "Test cluster domain name")
.RequiredArgument("name")
.DefaultValue(RunnerOptions.YdbSettings.DomainName)
Expand All @@ -862,9 +879,8 @@ class TMain : public TMainClassArgs {
.RequiredArgument("path")
.InsertTo(&RunnerOptions.YdbSettings.ServerlessTenants);

options.AddLongOption("storage-size", "Domain storage size in gigabytes")
options.AddLongOption("storage-size", "Domain storage size in gigabytes (32 GiB by default)")
.RequiredArgument("uint")
.DefaultValue(32)
.StoreMappedResultT<ui32>(&RunnerOptions.YdbSettings.DiskSize, [](ui32 diskSize) {
return static_cast<ui64>(diskSize) << 30;
});
Expand Down Expand Up @@ -898,6 +914,11 @@ class TMain : public TMainClassArgs {
int DoRun(NLastGetopt::TOptsParseResult&&) override {
ExecutionOptions.Validate(RunnerOptions);

ReplaceTemplates(ExecutionOptions.SchemeQuery);
for (auto& sql : ExecutionOptions.ScriptQueries) {
ReplaceTemplates(sql);
}

RunnerOptions.YdbSettings.YqlToken = YqlToken;
RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get();

Expand Down Expand Up @@ -943,6 +964,21 @@ class TMain : public TMainClassArgs {

return 0;
}

private:
void ReplaceTemplates(TString& sql) const {
for (const auto& [variable, value] : Templates) {
SubstGlobal(sql, TStringBuilder() << "${" << variable <<"}", value);
}
if (ExecutionOptions.UseTemplates) {
const TString tokenVariableName = TStringBuilder() << "${" << YQL_TOKEN_VARIABLE << "}";
if (const TString& yqlToken = GetEnv(YQL_TOKEN_VARIABLE)) {
SubstGlobal(sql, tokenVariableName, yqlToken);
} else if (sql.Contains(tokenVariableName)) {
ythrow yexception() << "Failed to replace ${YQL_TOKEN} template, please specify YQL_TOKEN environment variable";
}
}
}
};


Expand Down Expand Up @@ -972,7 +1008,7 @@ void FloatingPointExceptionHandler(int) {

Cerr << colors.Red() << "======= floating point exception call stack ========" << colors.Default() << Endl;
FormatBackTrace(&Cerr);
Cerr << colors.Red() << "==============================================" << colors.Default() << Endl;
Cerr << colors.Red() << "====================================================" << colors.Default() << Endl;

abort();
}
Expand Down
81 changes: 69 additions & 12 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,40 +232,78 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10);

public:
TResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount)
: ExpectedNodeCount_(expectedNodeCount)
TResourcesWaiterActor(NThreading::TPromise<void> promise, const TWaitResourcesSettings& settings)
: Settings_(settings)
, Promise_(promise)
{}

void Bootstrap() {
Become(&TResourcesWaiterActor::StateFunc);
if (Settings_.HealthCheckLevel < 1) {
Finish();
return;
}

Become(&TResourcesWaiterActor::StateWaitNodeCont);
CheckResourcesPublish();
}

void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
void HandleWaitNodeCountWakeup() {
CheckResourcesPublish();
}

void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) {
if (ev->Get()->NodeCount == ExpectedNodeCount_) {
Promise_.SetValue();
PassAway();
const auto nodeCont = ev->Get()->NodeCount;
if (nodeCont == Settings_.ExpectedNodeCount) {
if (Settings_.HealthCheckLevel < 2) {
Finish();
} else {
Become(&TResourcesWaiterActor::StateWaitScript);
StartScriptQuery();
}
return;
}

if (Settings_.VerboseLevel >= 2) {
Cout << CoutColors_.Cyan() << "Retry invalid node count, got " << nodeCont << ", expected " << Settings_.ExpectedNodeCount << CoutColors_.Default() << Endl;
}
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
}

STRICT_STFUNC(StateFunc,
hFunc(NActors::TEvents::TEvWakeup, Handle);
void HandleWaitScriptWakeup() {
StartScriptQuery();
}

void Handle(NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) {
const auto status = ev->Get()->Status;
if (status == Ydb::StatusIds::SUCCESS) {
Finish();
return;
}

if (Settings_.VerboseLevel >= 2) {
Cout << CoutColors_.Cyan() << "Retry script creation fail with status " << status << ", reason:\n" << CoutColors_.Default() << ev->Get()->Issues.ToString() << Endl;
}
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
}

STRICT_STFUNC(StateWaitNodeCont,
sFunc(NActors::TEvents::TEvWakeup, HandleWaitNodeCountWakeup);
hFunc(TEvPrivate::TEvResourcesInfo, Handle);
)

STRICT_STFUNC(StateWaitScript,
sFunc(NActors::TEvents::TEvWakeup, HandleWaitScriptWakeup);
hFunc(NKikimr::NKqp::TEvKqp::TEvScriptResponse, Handle);
)

private:
void CheckResourcesPublish() {
GetResourceManager();

if (!ResourceManager_) {
if (Settings_.VerboseLevel >= 2) {
Cout << CoutColors_.Cyan() << "Retry uninitialized resource manager" << CoutColors_.Default() << Endl;
}
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
return;
}
Expand All @@ -287,8 +325,27 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
});
}

void StartScriptQuery() {
auto event = MakeHolder<NKikimr::NKqp::TEvKqp::TEvScriptRequest>();
event->Record.SetUserToken(NACLib::TUserToken("", BUILTIN_ACL_ROOT, {}).SerializeAsString());

auto request = event->Record.MutableRequest();
request->SetQuery("SELECT 42");
request->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT);
request->SetAction(NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE);
request->SetDatabase(Settings_.Database);

Send(NKikimr::NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
}

void Finish() {
Promise_.SetValue();
PassAway();
}

private:
const i32 ExpectedNodeCount_;
const TWaitResourcesSettings Settings_;
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
NThreading::TPromise<void> Promise_;

std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
Expand Down Expand Up @@ -415,8 +472,8 @@ NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settin
return new TAsyncQueryRunnerActor(settings);
}

NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount) {
return new TResourcesWaiterActor(promise, expectedNodeCount);
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, const TWaitResourcesSettings& settings) {
return new TResourcesWaiterActor(promise, settings);
}

NActors::IActor* CreateSessionHolderActor(TCreateSessionRequest request, NThreading::TPromise<TString> openPromise, NThreading::TPromise<void> closePromise) {
Expand Down
9 changes: 8 additions & 1 deletion ydb/tests/tools/kqprun/src/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ struct TCreateSessionRequest {
ui8 VerboseLevel;
};

struct TWaitResourcesSettings {
i32 ExpectedNodeCount;
ui8 HealthCheckLevel;
ui8 VerboseLevel;
TString Database;
};

struct TEvPrivate {
enum EEv : ui32 {
EvStartAsyncQuery = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
Expand Down Expand Up @@ -83,7 +90,7 @@ NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPr

NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings);

NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount);
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, const TWaitResourcesSettings& settings);

NActors::IActor* CreateSessionHolderActor(TCreateSessionRequest request, NThreading::TPromise<TString> openPromise, NThreading::TPromise<void> closePromise);

Expand Down
3 changes: 2 additions & 1 deletion ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ struct TYdbSetupSettings {
std::unordered_set<TString> SharedTenants;
std::unordered_set<TString> ServerlessTenants;
TDuration InitializationTimeout = TDuration::Seconds(10);
ui8 HealthCheckLevel = 1;
bool SameSession = false;

bool DisableDiskMock = false;
bool FormatStorage = false;
std::optional<TString> PDisksPath;
ui64 DiskSize = 32_GB;
std::optional<ui64> DiskSize;

bool MonitoringEnabled = false;
ui16 MonitoringPortOffset = 0;
Expand Down
1 change: 1 addition & 0 deletions ydb/tests/tools/kqprun/src/proto/storage_meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ package NKqpRun;

message TStorageMeta {
uint64 StorageGeneration = 1;
uint64 StorageSize = 2;
}
22 changes: 19 additions & 3 deletions ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ class TYdbSetup::TImpl {
void SetStorageSettings(NKikimr::Tests::TServerSettings& serverSettings) const {
TString diskPath;
if (Settings_.PDisksPath && *Settings_.PDisksPath != "-") {
diskPath = TStringBuilder() << *Settings_.PDisksPath << "/";
if (Settings_.PDisksPath->empty()) {
ythrow yexception() << "Storage directory path should not be empty";
}
diskPath = TStringBuilder() << Settings_.PDisksPath << (Settings_.PDisksPath->back() != '/' ? "/" : "");
}

bool formatDisk = true;
Expand All @@ -196,6 +199,13 @@ class TYdbSetup::TImpl {
formatDisk = false;
}

if (Settings_.DiskSize && storageMeta.GetStorageSize() != *Settings_.DiskSize) {
if (!formatDisk) {
ythrow yexception() << "Cannot change disk size without formatting storage, please use --format-storage";
}
storageMeta.SetStorageSize(*Settings_.DiskSize);
}

TString storageMetaStr;
google::protobuf::TextFormat::PrintToString(storageMeta, &storageMetaStr);

Expand All @@ -208,7 +218,7 @@ class TYdbSetup::TImpl {
.UseDisk = !!Settings_.PDisksPath,
.SectorSize = NKikimr::TTestStorageFactory::SECTOR_SIZE,
.ChunkSize = Settings_.PDisksPath ? NKikimr::TTestStorageFactory::CHUNK_SIZE : NKikimr::TTestStorageFactory::MEM_CHUNK_SIZE,
.DiskSize = Settings_.DiskSize,
.DiskSize = Settings_.DiskSize ? *Settings_.DiskSize : 32_GB,
.FormatDisk = formatDisk,
.DiskPath = diskPath
};
Expand Down Expand Up @@ -351,7 +361,13 @@ class TYdbSetup::TImpl {

void WaitResourcesPublishing() const {
auto promise = NThreading::NewPromise();
GetRuntime()->Register(CreateResourcesWaiterActor(promise, Settings_.NodeCount), 0, GetRuntime()->GetAppData().SystemPoolId);
const TWaitResourcesSettings settings = {
.ExpectedNodeCount = static_cast<i32>(Settings_.NodeCount),
.HealthCheckLevel = Settings_.HealthCheckLevel,
.VerboseLevel = Settings_.VerboseLevel,
.Database = NKikimr::CanonizePath(Settings_.DomainName)
};
GetRuntime()->Register(CreateResourcesWaiterActor(promise, settings), 0, GetRuntime()->GetAppData().SystemPoolId);

try {
promise.GetFuture().GetValue(Settings_.InitializationTimeout);
Expand Down