Skip to content

YQ-2744: add tpc bucket option for ydb workload tpch #1098

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 51 additions & 11 deletions ydb/public/lib/ydb_cli/commands/tpch.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "tpch.h"

#include <contrib/libs/fmt/include/fmt/format.h>
#include <util/string/split.h>
#include <util/stream/file.h>
#include <util/string/strip.h>
Expand Down Expand Up @@ -216,10 +217,19 @@ void TTpchCommandInit::Config(TConfig& config) {
TablesPath = arg;
});
config.Opts->AddLongOption("store", "Storage type."
" Options: row, column\n"
" Options: row, column, s3\n"
"row - use row-based storage engine;\n"
"column - use column-based storage engine.")
"column - use column-based storage engine.\n"
"s3 - use cloud tpc bucket")
.DefaultValue("row").StoreResult(&StoreType);
config.Opts->AddLongOption('s', "scale", "TPC-H dataset scale. One of 1, 10, 100, 1000. Default is 1")
.Optional()
.DefaultValue("1")
.StoreResult(&Scale);
config.Opts->AddLongOption('b', "bucket", "S3 bucket with TPC-H dataset")
.Optional()
.DefaultValue("")
.StoreResult(&Bucket);
};

void TTpchCommandInit::SetPartitionByCols(TString& createSql) {
Expand Down Expand Up @@ -248,10 +258,28 @@ int TTpchCommandInit::Run(TConfig& config) {
StoreType = to_lower(StoreType);
TString storageType = "";
TString notNull = "";
TString createExternalDataSource;
TString external;
TString partitioning = "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT";
TString primaryKey = ", PRIMARY KEY";
if (StoreType == "column") {
storageType = "STORE = COLUMN,";
storageType = "STORE = COLUMN, --";
notNull = "NOT NULL";
} else if (StoreType == "s3") {
storageType = R"(DATA_SOURCE = "_tpc_s3_external_source", FORMAT = "parquet", LOCATION = )";
notNull = "NOT NULL";
createExternalDataSource = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `_tpc_s3_external_source` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="https://storage.yandexcloud.net/{}/",
AUTH_METHOD="NONE"
);
)", Bucket);
external = "EXTERNAL";
partitioning = "--";
primaryKey = "--";
} else if (StoreType != "row") {
storageType = "-- ";
throw yexception() << "Incorrect storage type. Available options: \"row\", \"column\"." << Endl;
}

Expand All @@ -260,11 +288,18 @@ int TTpchCommandInit::Run(TConfig& config) {
TString createSql = NResource::Find("tpch_schema.sql");
TTableClient client(driver);

SubstGlobal(createSql, "{createExternal}", createExternalDataSource);
SubstGlobal(createSql, "{external}", external);
SubstGlobal(createSql, "{notnull}", notNull);
SubstGlobal(createSql, "{partitioning}", partitioning);
SubstGlobal(createSql, "{primary_key}", primaryKey);
SubstGlobal(createSql, "{path}", TablesPath);
SubstGlobal(createSql, "{scale}", Scale);
SubstGlobal(createSql, "{store}", storageType);
SetPartitionByCols(createSql);

Cout << createSql << Endl;

ThrowOnError(client.RetryOperationSync([createSql](TSession session) {
return session.ExecuteSchemeQuery(createSql).GetValueSync();
}));
Expand All @@ -281,25 +316,30 @@ TTpchCommandClean::TTpchCommandClean()
void TTpchCommandClean::Config(TConfig& config) {
NYdb::NConsoleClient::TClientCommand::Config(config);
config.SetFreeArgsNum(0);
config.Opts->AddLongOption('e', "external", "Drop tables as external. Use if initialized with external storage")
.Optional()
.StoreTrue(&IsExternal);
};

int TTpchCommandClean::Run(TConfig& config) {
auto driver = CreateDriver(config);
TTableClient client(driver);

static const char DropDdlTmpl[] = "DROP TABLE `%s`;";
char dropDdl[sizeof(DropDdlTmpl) + 8192*3]; // 32*256 for DbPath
TString dropDdl;
for (auto& table : Tables) {
TString fullPath = FullTablePath(config.Database, table);
int res = std::snprintf(dropDdl, sizeof(dropDdl), DropDdlTmpl, fullPath.c_str());
if (res < 0) {
Cerr << "Failed to generate DROP DDL query for `" << fullPath << "` table." << Endl;
return -1;
}
fmt::format_to(std::back_inserter(dropDdl), "DROP {} TABLE `{}`", IsExternal ? "EXTERNAL" : "", fullPath);

ThrowOnError(client.RetryOperationSync([dropDdl](TSession session) {
ThrowOnError(client.RetryOperationSync([&dropDdl](TSession session) {
return session.ExecuteSchemeQuery(dropDdl).GetValueSync();
}));
dropDdl.clear();
}

if (IsExternal) {
ThrowOnError(client.RetryOperationSync([](TSession session) {
return session.ExecuteSchemeQuery("DROP EXTERNAL DATA SOURCE `_tpc_s3_external_source`;").GetValueSync();
}));
}

Cout << "Clean succeeded." << Endl;
Expand Down
3 changes: 3 additions & 0 deletions ydb/public/lib/ydb_cli/commands/tpch.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class TTpchCommandInit : public NYdb::NConsoleClient::TYdbCommand {

TString TablesPath;
TString StoreType;
TString Scale;
TString Bucket;
};

class TTpchCommandClean : public NYdb::NConsoleClient::TYdbCommand {
Expand All @@ -28,6 +30,7 @@ class TTpchCommandClean : public NYdb::NConsoleClient::TYdbCommand {
private:
std::vector<TString> Tables = {"customer", "lineitem", "nation", "orders",
"region", "part", "partsupp", "supplier"};
bool IsExternal = false;
};

class TTpchCommandRun : public NYdb::NConsoleClient::TYdbCommand {
Expand Down
98 changes: 50 additions & 48 deletions ydb/public/lib/ydb_cli/commands/tpch_schema.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
CREATE TABLE `{path}customer` (
{createExternal}

CREATE {external} TABLE `{path}customer` (
c_acctbal Double {notnull}, -- it should be Decimal(12, 2)
c_address String {notnull},
c_comment String {notnull},
c_custkey Int32 {notnull}, -- Identifier
c_mktsegment String {notnull},
c_name String {notnull},
c_nationkey Int32 {notnull}, -- FK to N_NATIONKEY
c_phone String {notnull},
PRIMARY KEY (c_custkey)
c_phone String {notnull}
{primary_key} (c_custkey)
)
{partition_customer}
WITH ({store}
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 64)
;
WITH ({store}"/h/s{scale}/parquet/customer/"
{partitioning} = 64
);

CREATE TABLE `{path}lineitem` (
CREATE {external} TABLE `{path}lineitem` (
l_comment String {notnull},
l_commitdate Date {notnull},
l_discount Double {notnull}, -- it should be Decimal(12, 2)
Expand All @@ -30,27 +32,27 @@ CREATE TABLE `{path}lineitem` (
l_shipinstruct String {notnull},
l_shipmode String {notnull},
l_suppkey Int32 {notnull}, -- FK to S_SUPPKEY, second part of the compound FK to (PS_PARTKEY, PS_SUPPKEY) with L_PARTKEY
l_tax Double {notnull}, -- it should be Decimal(12, 2)
PRIMARY KEY (l_orderkey, l_linenumber)
l_tax Double {notnull} -- it should be Decimal(12, 2)
{primary_key} (l_orderkey, l_linenumber)
)
{partition_lineitem}
WITH ({store}
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 64)
;
WITH ({store}"/h/s{scale}/parquet/lineitem/"
{partitioning} = 64
);

CREATE TABLE `{path}nation` (
CREATE {external} TABLE `{path}nation` (
n_comment String {notnull},
n_name String {notnull},
n_nationkey Int32 {notnull}, -- Identifier
n_regionkey Int32 {notnull}, -- FK to R_REGIONKEY
PRIMARY KEY(n_nationkey)
n_regionkey Int32 {notnull} -- FK to R_REGIONKEY
{primary_key}(n_nationkey)
)
{partition_nation}
WITH ({store}
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1)
;
WITH ({store}"/h/s{scale}/parquet/nation/"
{partitioning} = 1
);

CREATE TABLE `{path}orders` (
CREATE {external} TABLE `{path}orders` (
o_clerk String {notnull},
o_comment String {notnull},
o_custkey Int32 {notnull}, -- FK to C_CUSTKEY
Expand All @@ -59,15 +61,15 @@ CREATE TABLE `{path}orders` (
o_orderpriority String {notnull},
o_orderstatus String {notnull},
o_shippriority Int32 {notnull},
o_totalprice Double {notnull}, -- it should be Decimal(12, 2)
PRIMARY KEY (o_orderkey)
o_totalprice Double {notnull} -- it should be Decimal(12, 2)
{primary_key} (o_orderkey)
)
{partition_orders}
WITH ({store}
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 64)
;
WITH ({store}"/h/s{scale}/parquet/orders/"
{partitioning} = 64
);

CREATE TABLE `{path}part` (
CREATE {external} TABLE `{path}part` (
p_brand String {notnull},
p_comment String {notnull},
p_container String {notnull},
Expand All @@ -76,49 +78,49 @@ CREATE TABLE `{path}part` (
p_partkey Int32 {notnull}, -- Identifier
p_retailprice Double {notnull}, -- it should be Decimal(12, 2)
p_size Int32 {notnull},
p_type String {notnull},
PRIMARY KEY(p_partkey)
p_type String {notnull}
{primary_key}(p_partkey)
)
{partition_part}
WITH ({store}
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 64)
;
WITH ({store}"/h/s{scale}/parquet/part/"
{partitioning} = 64
);

CREATE TABLE `{path}partsupp` (
CREATE {external} TABLE `{path}partsupp` (
ps_availqty Int32 {notnull},
ps_comment String {notnull},
ps_partkey Int32 {notnull}, -- FK to P_PARTKEY
ps_suppkey Int32 {notnull}, -- FK to S_SUPPKEY
ps_supplycost Double {notnull}, -- it should be Decimal(12, 2)
PRIMARY KEY(ps_partkey, ps_suppkey)
ps_supplycost Double {notnull} -- it should be Decimal(12, 2)
{primary_key}(ps_partkey, ps_suppkey)
)
{partition_partsupp}
WITH ({store}
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 64)
;
WITH ({store}"/h/s{scale}/parquet/partsupp/"
{partitioning} = 64
);

CREATE TABLE `{path}region` (
CREATE {external} TABLE `{path}region` (
r_comment String {notnull},
r_name String {notnull},
r_regionkey Int32 {notnull}, -- Identifier
PRIMARY KEY(r_regionkey)
r_regionkey Int32 {notnull} -- Identifier
{primary_key}(r_regionkey)
)
{partition_region}
WITH ({store}
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1)
;
WITH ({store}"/h/s{scale}/parquet/region/"
{partitioning} = 1
);

CREATE TABLE `{path}supplier` (
CREATE {external} TABLE `{path}supplier` (
s_acctbal Double {notnull}, -- it should be Decimal(12, 2)
s_address String {notnull},
s_comment String {notnull},
s_name String {notnull},
s_nationkey Int32 {notnull}, -- FK to N_NATIONKEY
s_phone String {notnull},
s_suppkey Int32 {notnull}, -- Identifier
PRIMARY KEY(s_suppkey)
s_suppkey Int32 {notnull} -- Identifier
{primary_key}(s_suppkey)
)
{partition_supplier}
WITH ({store}
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 64)
;
WITH ({store}"/h/s{scale}/parquet/supplier/"
{partitioning} = 64
);
1 change: 1 addition & 0 deletions ydb/public/lib/ydb_cli/commands/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ SRCS(
)

PEERDIR(
contrib/libs/fmt
contrib/restricted/patched/replxx
library/cpp/histogram/hdr
library/cpp/protobuf/json
Expand Down