Skip to content

YQ-2744: fix row mode in workload tpch init #1266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 44 additions & 37 deletions ydb/public/lib/ydb_cli/commands/tpch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,27 @@ namespace {

TVector<TString> TTpchCommandRun::GetQueries() const {
TVector<TString> queries;
TFsPath queriesDir(ExternalQueriesDir);
TVector<TString> queriesList;
queriesDir.ListNames(queriesList);
std::sort(queriesList.begin(), queriesList.end(), [](const TString& l, const TString& r) {
auto leftNum = l.substr(1);
auto rightNum = r.substr(1);
return std::stoi(leftNum) < std::stoi(rightNum);
});
for (auto&& queryFileName : queriesList) {
const TString expectedFileName = "q" + ::ToString(getQueryNumber(queries.size())) + ".sql";
Y_ABORT_UNLESS(queryFileName == expectedFileName, "incorrect files naming. have to be q<number>.sql where number in [1, N], where N is requests count");
TFileInput fInput(ExternalQueriesDir + "/" + expectedFileName);
auto query = fInput.ReadAll();
if (!ExternalQueriesDir.Empty()) {
TFsPath queriesDir(ExternalQueriesDir);
TVector<TString> queriesList;
queriesDir.ListNames(queriesList);
std::sort(queriesList.begin(), queriesList.end(), [](const TString& l, const TString& r) {
auto leftNum = l.substr(1);
auto rightNum = r.substr(1);
return std::stoi(leftNum) < std::stoi(rightNum);
});
for (auto&& queryFileName : queriesList) {
const TString expectedFileName = "q" + ::ToString(getQueryNumber(queries.size())) + ".sql";
Y_ABORT_UNLESS(queryFileName == expectedFileName, "incorrect files naming. have to be q<number>.sql where number in [1, N], where N is requests count");
TFileInput fInput(ExternalQueriesDir + "/" + expectedFileName);
queries.emplace_back(fInput.ReadAll());
}
} else {
queries = StringSplitter(NResource::Find("tpch_queries.sql")).SplitByString("-- end query").ToList<TString>();
}

for (auto& query : queries) {
SubstGlobal(query, "{path}", TablesPath);
queries.emplace_back(query);
}
return queries;
}
Expand Down Expand Up @@ -125,11 +131,11 @@ bool TTpchCommandRun::RunBench(TConfig& config)
testInfo.ColdTime.MilliSeconds() * 0.001, testInfo.Min.MilliSeconds() * 0.001, testInfo.Max.MilliSeconds() * 0.001,
testInfo.Mean * 0.001, testInfo.Std * 0.001) << Endl;
if (collectJsonSensors) {
jsonReport.AppendValue(GetSensorValue("ColdTime", testInfo.ColdTime, queryN));
jsonReport.AppendValue(GetSensorValue("Min", testInfo.Min, queryN));
jsonReport.AppendValue(GetSensorValue("Max", testInfo.Max, queryN));
jsonReport.AppendValue(GetSensorValue("Mean", testInfo.Mean, queryN));
jsonReport.AppendValue(GetSensorValue("Std", testInfo.Std, queryN));
jsonReport.AppendValue(GetSensorValue("ColdTime", testInfo.ColdTime, getQueryNumber(queryN)));
jsonReport.AppendValue(GetSensorValue("Min", testInfo.Min, getQueryNumber(queryN)));
jsonReport.AppendValue(GetSensorValue("Max", testInfo.Max, getQueryNumber(queryN)));
jsonReport.AppendValue(GetSensorValue("Mean", testInfo.Mean, getQueryNumber(queryN)));
jsonReport.AppendValue(GetSensorValue("Std", testInfo.Std, getQueryNumber(queryN)));
}
}

Expand Down Expand Up @@ -222,14 +228,12 @@ void TTpchCommandInit::Config(TConfig& config) {
"column - use column-based storage engine.\n"
"s3 - use cloud tpc bucket")
.DefaultValue("row").StoreResult(&StoreType);
config.Opts->AddLongOption('s', "scale", "TPC-H dataset scale. One of 1, 10, 100, 1000. Default is 1")
config.Opts->AddLongOption("s3-prefix", "Root path to TPC-H dataset in s3 storage")
.Optional()
.DefaultValue("1")
.StoreResult(&Scale);
config.Opts->AddLongOption('b', "bucket", "S3 bucket with TPC-H dataset")
.StoreResult(&S3Prefix);
config.Opts->AddLongOption('e', "s3-endpoint", "Endpoint of S3 bucket with TPC-H dataset")
.Optional()
.DefaultValue("")
.StoreResult(&Bucket);
.StoreResult(&S3Endpoint);
};

void TTpchCommandInit::SetPartitionByCols(TString& createSql) {
Expand All @@ -256,7 +260,7 @@ void TTpchCommandInit::SetPartitionByCols(TString& createSql) {

int TTpchCommandInit::Run(TConfig& config) {
StoreType = to_lower(StoreType);
TString storageType = "";
TString storageType = "-- ";
TString notNull = "";
TString createExternalDataSource;
TString external;
Expand All @@ -266,20 +270,19 @@ int TTpchCommandInit::Run(TConfig& config) {
storageType = "STORE = COLUMN, --";
notNull = "NOT NULL";
} else if (StoreType == "s3") {
storageType = R"(DATA_SOURCE = "_tpc_s3_external_source", FORMAT = "parquet", LOCATION = )";
storageType = fmt::format(R"(DATA_SOURCE = "{}_tpc_s3_external_source", FORMAT = "parquet", LOCATION = )", TablesPath);
notNull = "NOT NULL";
createExternalDataSource = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `_tpc_s3_external_source` WITH (
CREATE EXTERNAL DATA SOURCE `{}_tpc_s3_external_source` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="https://storage.yandexcloud.net/{}/",
LOCATION="{}",
AUTH_METHOD="NONE"
);
)", Bucket);
)", TablesPath, S3Endpoint);
external = "EXTERNAL";
partitioning = "--";
primaryKey = "--";
} else if (StoreType != "row") {
storageType = "-- ";
throw yexception() << "Incorrect storage type. Available options: \"row\", \"column\"." << Endl;
}

Expand All @@ -292,9 +295,9 @@ int TTpchCommandInit::Run(TConfig& config) {
SubstGlobal(createSql, "{external}", external);
SubstGlobal(createSql, "{notnull}", notNull);
SubstGlobal(createSql, "{partitioning}", partitioning);
SubstGlobal(createSql, "{primary_key}", primaryKey);
SubstGlobal(createSql, "{path}", TablesPath);
SubstGlobal(createSql, "{scale}", Scale);
SubstGlobal(createSql, "{primary_key}", primaryKey);
SubstGlobal(createSql, "{s3_prefix}", S3Prefix);
SubstGlobal(createSql, "{store}", storageType);
SetPartitionByCols(createSql);

Expand All @@ -319,15 +322,18 @@ void TTpchCommandClean::Config(TConfig& config) {
config.Opts->AddLongOption('e', "external", "Drop tables as external. Use if initialized with external storage")
.Optional()
.StoreTrue(&IsExternal);
config.Opts->AddLongOption('p', "path", "Folder name where benchmark tables are located")
.Optional()
.StoreResult(&TablesPath);
};

int TTpchCommandClean::Run(TConfig& config) {
auto driver = CreateDriver(config);
TTableClient client(driver);

TString dropDdl;
for (auto& table : Tables) {
TString fullPath = FullTablePath(config.Database, table);
for (const auto& table : Tables) {
TString fullPath = FullTablePath(config.Database, fmt::format("{}{}", TablesPath, table));
fmt::format_to(std::back_inserter(dropDdl), "DROP {} TABLE `{}`", IsExternal ? "EXTERNAL" : "", fullPath);

ThrowOnError(client.RetryOperationSync([&dropDdl](TSession session) {
Expand All @@ -337,8 +343,9 @@ int TTpchCommandClean::Run(TConfig& config) {
}

if (IsExternal) {
ThrowOnError(client.RetryOperationSync([](TSession session) {
return session.ExecuteSchemeQuery("DROP EXTERNAL DATA SOURCE `_tpc_s3_external_source`;").GetValueSync();
TString fullPath = FullTablePath(config.Database, fmt::format("{}_tpc_s3_external_source", TablesPath));
ThrowOnError(client.RetryOperationSync([&](TSession session) {
return session.ExecuteSchemeQuery(fmt::format("DROP EXTERNAL DATA SOURCE `{}`;", fullPath)).GetValueSync();
}));
}

Expand Down
7 changes: 3 additions & 4 deletions ydb/public/lib/ydb_cli/commands/tpch.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class TTpchCommandInit : public NYdb::NConsoleClient::TYdbCommand {

TString TablesPath;
TString StoreType;
TString Scale;
TString Bucket;
TString S3Endpoint;
TString S3Prefix;
};

class TTpchCommandClean : public NYdb::NConsoleClient::TYdbCommand {
Expand All @@ -31,15 +31,14 @@ class TTpchCommandClean : public NYdb::NConsoleClient::TYdbCommand {
std::vector<TString> Tables = {"customer", "lineitem", "nation", "orders",
"region", "part", "partsupp", "supplier"};
bool IsExternal = false;
TString TablesPath;
};

class TTpchCommandRun : public NYdb::NConsoleClient::TYdbCommand {
protected:
TSet<ui32> QueriesToRun;
TSet<ui32> QueriesToSkip;
TVector<TString> QuerySettings;
TString ExternalQueries;
TString ExternalQueriesFile;
TString ExternalQueriesDir;
TString ExternalVariablesString;
TString QueryExecutorType;
Expand Down
Loading