Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 44 additions & 37 deletions ydb/public/lib/ydb_cli/commands/tpch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,27 @@ namespace {

TVector<TString> TTpchCommandRun::GetQueries() const {
TVector<TString> queries;
TFsPath queriesDir(ExternalQueriesDir);
TVector<TString> queriesList;
queriesDir.ListNames(queriesList);
std::sort(queriesList.begin(), queriesList.end(), [](const TString& l, const TString& r) {
auto leftNum = l.substr(1);
auto rightNum = r.substr(1);
return std::stoi(leftNum) < std::stoi(rightNum);
});
for (auto&& queryFileName : queriesList) {
const TString expectedFileName = "q" + ::ToString(getQueryNumber(queries.size())) + ".sql";
Y_ABORT_UNLESS(queryFileName == expectedFileName, "incorrect files naming. have to be q<number>.sql where number in [1, N], where N is requests count");
TFileInput fInput(ExternalQueriesDir + "/" + expectedFileName);
auto query = fInput.ReadAll();
if (!ExternalQueriesDir.Empty()) {
TFsPath queriesDir(ExternalQueriesDir);
TVector<TString> queriesList;
queriesDir.ListNames(queriesList);
std::sort(queriesList.begin(), queriesList.end(), [](const TString& l, const TString& r) {
auto leftNum = l.substr(1);
auto rightNum = r.substr(1);
return std::stoi(leftNum) < std::stoi(rightNum);
});
for (auto&& queryFileName : queriesList) {
const TString expectedFileName = "q" + ::ToString(getQueryNumber(queries.size())) + ".sql";
Y_ABORT_UNLESS(queryFileName == expectedFileName, "incorrect files naming. have to be q<number>.sql where number in [1, N], where N is requests count");
TFileInput fInput(ExternalQueriesDir + "/" + expectedFileName);
queries.emplace_back(fInput.ReadAll());
}
} else {
queries = StringSplitter(NResource::Find("tpch_queries.sql")).SplitByString("-- end query").ToList<TString>();
}

for (auto& query : queries) {
SubstGlobal(query, "{path}", TablesPath);
queries.emplace_back(query);
}
return queries;
}
Expand Down Expand Up @@ -125,11 +131,11 @@ bool TTpchCommandRun::RunBench(TConfig& config)
testInfo.ColdTime.MilliSeconds() * 0.001, testInfo.Min.MilliSeconds() * 0.001, testInfo.Max.MilliSeconds() * 0.001,
testInfo.Mean * 0.001, testInfo.Std * 0.001) << Endl;
if (collectJsonSensors) {
jsonReport.AppendValue(GetSensorValue("ColdTime", testInfo.ColdTime, queryN));
jsonReport.AppendValue(GetSensorValue("Min", testInfo.Min, queryN));
jsonReport.AppendValue(GetSensorValue("Max", testInfo.Max, queryN));
jsonReport.AppendValue(GetSensorValue("Mean", testInfo.Mean, queryN));
jsonReport.AppendValue(GetSensorValue("Std", testInfo.Std, queryN));
jsonReport.AppendValue(GetSensorValue("ColdTime", testInfo.ColdTime, getQueryNumber(queryN)));
jsonReport.AppendValue(GetSensorValue("Min", testInfo.Min, getQueryNumber(queryN)));
jsonReport.AppendValue(GetSensorValue("Max", testInfo.Max, getQueryNumber(queryN)));
jsonReport.AppendValue(GetSensorValue("Mean", testInfo.Mean, getQueryNumber(queryN)));
jsonReport.AppendValue(GetSensorValue("Std", testInfo.Std, getQueryNumber(queryN)));
}
}

Expand Down Expand Up @@ -222,14 +228,12 @@ void TTpchCommandInit::Config(TConfig& config) {
"column - use column-based storage engine.\n"
"s3 - use cloud tpc bucket")
.DefaultValue("row").StoreResult(&StoreType);
config.Opts->AddLongOption('s', "scale", "TPC-H dataset scale. One of 1, 10, 100, 1000. Default is 1")
config.Opts->AddLongOption("s3-prefix", "Root path to TPC-H dataset in s3 storage")
.Optional()
.DefaultValue("1")
.StoreResult(&Scale);
config.Opts->AddLongOption('b', "bucket", "S3 bucket with TPC-H dataset")
.StoreResult(&S3Prefix);
config.Opts->AddLongOption('e', "s3-endpoint", "Endpoint of S3 bucket with TPC-H dataset")
.Optional()
.DefaultValue("")
.StoreResult(&Bucket);
.StoreResult(&S3Endpoint);
};

void TTpchCommandInit::SetPartitionByCols(TString& createSql) {
Expand All @@ -256,7 +260,7 @@ void TTpchCommandInit::SetPartitionByCols(TString& createSql) {

int TTpchCommandInit::Run(TConfig& config) {
StoreType = to_lower(StoreType);
TString storageType = "";
TString storageType = "-- ";
TString notNull = "";
TString createExternalDataSource;
TString external;
Expand All @@ -266,20 +270,19 @@ int TTpchCommandInit::Run(TConfig& config) {
storageType = "STORE = COLUMN, --";
notNull = "NOT NULL";
} else if (StoreType == "s3") {
storageType = R"(DATA_SOURCE = "_tpc_s3_external_source", FORMAT = "parquet", LOCATION = )";
storageType = fmt::format(R"(DATA_SOURCE = "{}_tpc_s3_external_source", FORMAT = "parquet", LOCATION = )", TablesPath);
notNull = "NOT NULL";
createExternalDataSource = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `_tpc_s3_external_source` WITH (
CREATE EXTERNAL DATA SOURCE `{}_tpc_s3_external_source` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="https://storage.yandexcloud.net/{}/",
LOCATION="{}",
AUTH_METHOD="NONE"
);
)", Bucket);
)", TablesPath, S3Endpoint);
external = "EXTERNAL";
partitioning = "--";
primaryKey = "--";
} else if (StoreType != "row") {
storageType = "-- ";
throw yexception() << "Incorrect storage type. Available options: \"row\", \"column\"." << Endl;
}

Expand All @@ -292,9 +295,9 @@ int TTpchCommandInit::Run(TConfig& config) {
SubstGlobal(createSql, "{external}", external);
SubstGlobal(createSql, "{notnull}", notNull);
SubstGlobal(createSql, "{partitioning}", partitioning);
SubstGlobal(createSql, "{primary_key}", primaryKey);
SubstGlobal(createSql, "{path}", TablesPath);
SubstGlobal(createSql, "{scale}", Scale);
SubstGlobal(createSql, "{primary_key}", primaryKey);
SubstGlobal(createSql, "{s3_prefix}", S3Prefix);
SubstGlobal(createSql, "{store}", storageType);
SetPartitionByCols(createSql);

Expand All @@ -319,15 +322,18 @@ void TTpchCommandClean::Config(TConfig& config) {
config.Opts->AddLongOption('e', "external", "Drop tables as external. Use if initialized with external storage")
.Optional()
.StoreTrue(&IsExternal);
config.Opts->AddLongOption('p', "path", "Folder name where benchmark tables are located")
.Optional()
.StoreResult(&TablesPath);
};

int TTpchCommandClean::Run(TConfig& config) {
auto driver = CreateDriver(config);
TTableClient client(driver);

TString dropDdl;
for (auto& table : Tables) {
TString fullPath = FullTablePath(config.Database, table);
for (const auto& table : Tables) {
TString fullPath = FullTablePath(config.Database, fmt::format("{}{}", TablesPath, table));
fmt::format_to(std::back_inserter(dropDdl), "DROP {} TABLE `{}`", IsExternal ? "EXTERNAL" : "", fullPath);

ThrowOnError(client.RetryOperationSync([&dropDdl](TSession session) {
Expand All @@ -337,8 +343,9 @@ int TTpchCommandClean::Run(TConfig& config) {
}

if (IsExternal) {
ThrowOnError(client.RetryOperationSync([](TSession session) {
return session.ExecuteSchemeQuery("DROP EXTERNAL DATA SOURCE `_tpc_s3_external_source`;").GetValueSync();
TString fullPath = FullTablePath(config.Database, fmt::format("{}_tpc_s3_external_source", TablesPath));
ThrowOnError(client.RetryOperationSync([&](TSession session) {
return session.ExecuteSchemeQuery(fmt::format("DROP EXTERNAL DATA SOURCE `{}`;", fullPath)).GetValueSync();
}));
}

Expand Down
7 changes: 3 additions & 4 deletions ydb/public/lib/ydb_cli/commands/tpch.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class TTpchCommandInit : public NYdb::NConsoleClient::TYdbCommand {

TString TablesPath;
TString StoreType;
TString Scale;
TString Bucket;
TString S3Endpoint;
TString S3Prefix;
};

class TTpchCommandClean : public NYdb::NConsoleClient::TYdbCommand {
Expand All @@ -31,15 +31,14 @@ class TTpchCommandClean : public NYdb::NConsoleClient::TYdbCommand {
std::vector<TString> Tables = {"customer", "lineitem", "nation", "orders",
"region", "part", "partsupp", "supplier"};
bool IsExternal = false;
TString TablesPath;
};

class TTpchCommandRun : public NYdb::NConsoleClient::TYdbCommand {
protected:
TSet<ui32> QueriesToRun;
TSet<ui32> QueriesToSkip;
TVector<TString> QuerySettings;
TString ExternalQueries;
TString ExternalQueriesFile;
TString ExternalQueriesDir;
TString ExternalVariablesString;
TString QueryExecutorType;
Expand Down
Loading