Skip to content

KIKIMR-19671 Refactoring of IWorkloadQueryGenerator and TWorkloadFactory #1103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions ydb/apps/ydb/main.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
#include <ydb/public/lib/ydb_cli/commands/ydb_service_topic.h>
#include <ydb/apps/ydb/commands/ydb_cloud_root.h>

TVector<NYdb::NTopic::ECodec> NYdb::NConsoleClient::InitAllowedCodecs() {
return TVector<NYdb::NTopic::ECodec>{
NYdb::NTopic::ECodec::RAW,
NYdb::NTopic::ECodec::ZSTD,
NYdb::NTopic::ECodec::GZIP,
};
}
#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>

int main(int argc, char **argv) {
try {
Expand Down
33 changes: 15 additions & 18 deletions ydb/core/kqp/ut/perf/kqp_workload_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

#include <ydb/library/workload/workload_factory.h>
#include <ydb/library/workload/stock_workload.h>
#include <ydb/library/workload/stock_workload.h_serialized.h>
#include <ydb/library/workload/kv_workload.h>
#include <ydb/library/workload/kv_workload.h_serialized.h>

#include <library/cpp/threading/local_executor/local_executor.h>
#include <util/generic/serialized_enum.h>

namespace NKikimr::NKqp {

Expand All @@ -28,30 +31,24 @@ void ExecuteQuery(TTableClient& db, TSession& session, NYdbWorkload::TQueryInfo&
}
}

void Test(NYdbWorkload::EWorkload workloadType) {
void Test(const TString& workloadType) {
auto settings = TKikimrSettings().SetWithSampleTables(false);
auto kikimr = TKikimrRunner{settings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

std::unique_ptr<NYdbWorkload::TWorkloadParams> params;
if (workloadType == NYdbWorkload::EWorkload::STOCK) {
auto stockParams = std::make_unique<NYdbWorkload::TStockWorkloadParams>();
auto params = NYdbWorkload::TWorkloadFactory::MakeHolder(workloadType);
UNIT_ASSERT(params);
if (auto* stockParams = dynamic_cast<NYdbWorkload::TStockWorkloadParams*>(params.Get())) {
stockParams->ProductCount = 100;
stockParams->Quantity = 1000;
stockParams->OrderCount = 100;
stockParams->Limit = 10;
stockParams->MinPartitions = 40;
stockParams->PartitionsByLoad = true;
params = std::move(stockParams);
} else if (workloadType == NYdbWorkload::EWorkload::KV) {
params = std::make_unique<NYdbWorkload::TKvWorkloadParams>();
} else {
UNIT_ASSERT(false);
}
UNIT_ASSERT(params);
params->DbPath = "/Root";
auto workloadQueryGen = NYdbWorkload::TWorkloadFactory().GetWorkloadQueryGenerator(workloadType, params.get());
auto workloadQueryGen = params->CreateGenerator();

auto result = session.ExecuteSchemeQuery(workloadQueryGen->GetDDLQueries()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
Expand All @@ -62,17 +59,17 @@ void Test(NYdbWorkload::EWorkload workloadType) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
int maxType = 0;
if (workloadType == NYdbWorkload::EWorkload::STOCK) {
maxType = static_cast<int>(NYdbWorkload::TStockWorkloadGenerator::EType::MaxType);
} else if (workloadType == NYdbWorkload::EWorkload::KV) {
maxType = static_cast<int>(NYdbWorkload::TKvWorkloadGenerator::EType::MaxType);
if (workloadType == "stock") {
maxType = GetEnumItemsCount<NYdbWorkload::TStockWorkloadGenerator::EType>();
} else if (workloadType == "kv") {
maxType = GetEnumItemsCount<NYdbWorkload::TKvWorkloadGenerator::EType>();
}
for (int type = 0; type < maxType; ++type) {
size_t InFlight = 10;
NPar::LocalExecutor().RunAdditionalThreads(InFlight);
NPar::LocalExecutor().ExecRange([&db, type, &params, workloadType](int /*id*/) {
TTimer t;
auto workloadQueryGen = NYdbWorkload::TWorkloadFactory().GetWorkloadQueryGenerator(workloadType, params.get());
auto workloadQueryGen = params->CreateGenerator();
auto session = db.CreateSession().GetValueSync().GetSession();
for (size_t i = 0; i < REPEATS; ++i) {
auto queriesList = workloadQueryGen->GetWorkload(type);
Expand All @@ -88,11 +85,11 @@ void Test(NYdbWorkload::EWorkload workloadType) {

Y_UNIT_TEST_SUITE(KqpWorkload) {
Y_UNIT_TEST(STOCK) {
Test(NYdbWorkload::EWorkload::STOCK);
Test("stock");
}

Y_UNIT_TEST(KV) {
Test(NYdbWorkload::EWorkload::KV);
Test("kv");
}
}
}
55 changes: 28 additions & 27 deletions ydb/core/load_test/kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,30 +254,30 @@ class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> {
IncreaseSessions = cmd.GetIncreaseSessions();
Total = std::make_unique<MonitoringData>();

NYdbWorkload::TWorkloadFactory factory;
if (cmd.Workload_case() == NKikimr::TEvLoadTestRequest_TKqpLoad::WorkloadCase::kStock) {
WorkloadClass = NYdbWorkload::EWorkload::STOCK;
NYdbWorkload::TStockWorkloadParams params;
params.PartitionsByLoad = cmd.GetStock().GetPartitionsByLoad();
params.OrderCount = cmd.GetStock().GetOrderCount();
params.ProductCount = cmd.GetStock().GetProductCount();
params.Quantity = cmd.GetStock().GetQuantity();
params.Limit = cmd.GetStock().GetLimit();
params.DbPath = WorkingDir;
params.MinPartitions = UniformPartitionsCount;
WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, &params);
WorkloadClass = "stock";
auto params = std::make_shared<NYdbWorkload::TStockWorkloadParams>();
params->PartitionsByLoad = cmd.GetStock().GetPartitionsByLoad();
params->OrderCount = cmd.GetStock().GetOrderCount();
params->ProductCount = cmd.GetStock().GetProductCount();
params->Quantity = cmd.GetStock().GetQuantity();
params->Limit = cmd.GetStock().GetLimit();
params->DbPath = WorkingDir;
params->MinPartitions = UniformPartitionsCount;
WorkloadQueryGen = std::make_shared<NYdbWorkload::TStockWorkloadGenerator>(params.get());
WorkloadQueryGenParams = params;
} else if (cmd.Workload_case() == NKikimr::TEvLoadTestRequest_TKqpLoad::WorkloadCase::kKv) {
WorkloadClass = NYdbWorkload::EWorkload::KV;
NYdbWorkload::TKvWorkloadParams params;
params.InitRowCount = cmd.GetKv().GetInitRowCount();
params.PartitionsByLoad = cmd.GetKv().GetPartitionsByLoad();
params.MaxFirstKey = cmd.GetKv().GetMaxFirstKey();
params.StringLen = cmd.GetKv().GetStringLen();
params.ColumnsCnt = cmd.GetKv().GetColumnsCnt();
params.RowsCnt = cmd.GetKv().GetRowsCnt();
params.MinPartitions = UniformPartitionsCount;
params.DbPath = WorkingDir;
WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, &params);
WorkloadClass = "kv";
auto params = std::make_shared<NYdbWorkload::TKvWorkloadParams>();
params->InitRowCount = cmd.GetKv().GetInitRowCount();
params->PartitionsByLoad = cmd.GetKv().GetPartitionsByLoad();
params->MaxFirstKey = cmd.GetKv().GetMaxFirstKey();
params->StringLen = cmd.GetKv().GetStringLen();
params->ColumnsCnt = cmd.GetKv().GetColumnsCnt();
params->RowsCnt = cmd.GetKv().GetRowsCnt();
params->MinPartitions = UniformPartitionsCount;
WorkloadQueryGen = std::make_shared<NYdbWorkload::TKvWorkloadGenerator>(params.get());
WorkloadQueryGenParams = params;
} else {
return;
}
Expand All @@ -301,8 +301,8 @@ class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> {

Become(&TKqpLoadActor::StateStart);

if (WorkloadClass == NYdbWorkload::EWorkload::STOCK) {
NYdbWorkload::TStockWorkloadParams* params = static_cast<NYdbWorkload::TStockWorkloadParams*>(WorkloadQueryGen->GetParams());
if (WorkloadClass == "stock") {
NYdbWorkload::TStockWorkloadParams* params = static_cast<NYdbWorkload::TStockWorkloadParams*>(WorkloadQueryGenParams.get());
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Starting load actor with workload STOCK, Params: {"
<< "PartitionsByLoad: " << params->PartitionsByLoad << " "
<< "OrderCount: " << params->OrderCount << " "
Expand All @@ -311,8 +311,8 @@ class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> {
<< "Limit: " << params->Limit << " "
<< "DbPath: " << params->DbPath << " "
<< "MinPartitions: " << params->MinPartitions);
} else if (WorkloadClass == NYdbWorkload::EWorkload::KV) {
NYdbWorkload::TKvWorkloadParams* params = static_cast<NYdbWorkload::TKvWorkloadParams*>(WorkloadQueryGen->GetParams());
} else if (WorkloadClass == "kv") {
NYdbWorkload::TKvWorkloadParams* params = static_cast<NYdbWorkload::TKvWorkloadParams*>(WorkloadQueryGenParams.get());
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Starting load actor with workload KV, Params: {"
<< "InitRowCount: " << params->InitRowCount << " "
<< "PartitionsByLoad: " << params->PartitionsByLoad << " "
Expand Down Expand Up @@ -666,14 +666,15 @@ class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> {
size_t NumOfSessions = 0;
bool IncreaseSessions = false;
size_t ResultsReceived = 0;
NYdbWorkload::EWorkload WorkloadClass;
TString WorkloadClass;
NKikimrKqp::EQueryType QueryType;

NYdbWorkload::TQueryInfoList InitData;

const TActorId Parent;
ui64 Tag;
ui32 DurationSeconds;
std::shared_ptr<NYdbWorkload::TWorkloadParams> WorkloadQueryGenParams;
std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> WorkloadQueryGen;

// Monitoring
Expand Down
120 changes: 101 additions & 19 deletions ydb/library/workload/kv_workload.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "kv_workload.h"
#include "format"
#include "util/random/random.h"

#include "workload_factory.h"
#include <util/generic/serialized_enum.h>
#include <util/random/random.h>
#include <util/datetime/base.h>

#include <ydb/core/util/lz4_data_generator.h>
Expand All @@ -13,15 +13,12 @@
#include <random>
#include <sstream>
#include <chrono>

template <>
void Out<NYdbWorkload::KvWorkloadConstants>(IOutputStream& out, NYdbWorkload::KvWorkloadConstants constant)
{
out << static_cast<ui64>(constant);
}
#include <format>

namespace NYdbWorkload {

TWorkloadFactory::TRegistrator<TKvWorkloadParams> KvRegistrar("kv");

using TRow = TKvWorkloadGenerator::TRow;

// Note: there is no mechanism to update row values for now so all keys should be different
Expand Down Expand Up @@ -129,7 +126,7 @@ void VerifyRows(const TRow& checkRow, const TVector<TRow>& readRows, TString mes


TKvWorkloadGenerator::TKvWorkloadGenerator(const TKvWorkloadParams* params)
: Params(*params)
: TBase(params)
, BigString(NKikimr::GenDataForLZ4(Params.StringLen))
{
if (Params.MixedChangePartitionsSize) {
Expand All @@ -142,10 +139,6 @@ TKvWorkloadGenerator::TKvWorkloadGenerator(const TKvWorkloadParams* params)
Y_ABORT_UNLESS(Params.KeyColumnsCnt <= Params.ColumnsCnt);
}

TKvWorkloadParams* TKvWorkloadGenerator::GetParams() {
return &Params;
}

std::string TKvWorkloadGenerator::GetDDLQueries() const {
std::stringstream ss;

Expand Down Expand Up @@ -198,6 +191,16 @@ TQueryInfoList TKvWorkloadGenerator::GetWorkload(int type) {
}


TVector<IWorkloadQueryGenerator::TWorkloadType> TKvWorkloadGenerator::GetSupportedWorkloadTypes() const {
TVector<TWorkloadType> result;
result.emplace_back(static_cast<int>(EType::UpsertRandom), "upsert", "Upsert random rows into table");
result.emplace_back(static_cast<int>(EType::InsertRandom), "insert", "Insert random rows into table");
result.emplace_back(static_cast<int>(EType::SelectRandom), "select", "Select rows matching primary key(s)");
result.emplace_back(static_cast<int>(EType::ReadRowsRandom), "read-rows", "ReadRows rows matching primary key(s)");
result.emplace_back(static_cast<int>(EType::Mixed), "mixed", "Writes and SELECT/ReadsRows rows randomly, verifies them");
return result;
}

TQueryInfoList TKvWorkloadGenerator::WriteRows(TString operation, TVector<TRow>&& rows) {
std::stringstream ss;

Expand Down Expand Up @@ -449,10 +452,8 @@ TQueryInfoList TKvWorkloadGenerator::GetInitialData() {
return res;
}

std::string TKvWorkloadGenerator::GetCleanDDLQueries() const {
std::string query = "DROP TABLE `" + Params.TableName + "`;";

return query;
TVector<std::string> TKvWorkloadGenerator::GetCleanPaths() const {
return { Params.TableName };
}

TVector<TRow> TKvWorkloadGenerator::GenerateRandomRows(bool randomValues) {
Expand Down Expand Up @@ -486,5 +487,86 @@ TVector<TRow> TKvWorkloadGenerator::GenerateRandomRows(bool randomValues) {
return result;
}

void TKvWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) {
opts.SetFreeArgsNum(0);
switch (commandType) {
case TWorkloadParams::ECommandType::Init:
opts.AddLongOption("init-upserts", "count of upserts need to create while table initialization")
.DefaultValue((ui64)KvWorkloadConstants::INIT_ROW_COUNT).StoreResult(&InitRowCount);
opts.AddLongOption("min-partitions", "Minimum partitions for tables.")
.DefaultValue((ui64)KvWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions);
opts.AddLongOption("partition-size", "Maximum partition size in megabytes (AUTO_PARTITIONING_PARTITION_SIZE_MB).")
.DefaultValue((ui64)KvWorkloadConstants::PARTITION_SIZE_MB).StoreResult(&PartitionSizeMb);
opts.AddLongOption("auto-partition", "Enable auto partitioning by load.")
.DefaultValue((ui64)KvWorkloadConstants::PARTITIONS_BY_LOAD).StoreResult(&PartitionsByLoad);
opts.AddLongOption("max-first-key", "Maximum value of a first primary key")
.DefaultValue((ui64)KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey);
opts.AddLongOption("len", "String len")
.DefaultValue((ui64)KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
opts.AddLongOption("cols", "Number of columns")
.DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt);
opts.AddLongOption("int-cols", "Number of int columns")
.DefaultValue((ui64)KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt);
opts.AddLongOption("key-cols", "Number of key columns")
.DefaultValue((ui64)KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt);
opts.AddLongOption("rows", "Number of rows")
.DefaultValue((ui64)KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt);
break;
case TWorkloadParams::ECommandType::Run:
opts.AddLongOption("max-first-key", "Maximum value of a first primary key")
.DefaultValue((ui64)KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey);
opts.AddLongOption("int-cols", "Number of int columns")
.DefaultValue((ui64)KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt);
opts.AddLongOption("key-cols", "Number of key columns")
.DefaultValue((ui64)KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt);
switch (static_cast<TKvWorkloadGenerator::EType>(workloadType)) {
case TKvWorkloadGenerator::EType::UpsertRandom:
opts.AddLongOption("len", "String len")
.DefaultValue((ui64)KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
opts.AddLongOption("cols", "Number of columns to upsert")
.DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt);
opts.AddLongOption("rows", "Number of rows to upsert")
.DefaultValue((ui64)NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt);
break;
case TKvWorkloadGenerator::EType::InsertRandom:
opts.AddLongOption("len", "String len")
.DefaultValue((ui64)KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
opts.AddLongOption("cols", "Number of columns to insert")
.DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt);
opts.AddLongOption("rows", "Number of rows to insert")
.DefaultValue((ui64)NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt);
break;
case TKvWorkloadGenerator::EType::SelectRandom:
case TKvWorkloadGenerator::EType::ReadRowsRandom:
opts.AddLongOption("cols", "Number of columns to select for a single query")
.DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt);
opts.AddLongOption("rows", "Number of rows to select for a single query")
.DefaultValue((ui64)NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt);
break;
case TKvWorkloadGenerator::EType::Mixed:
opts.AddLongOption("len", "String len")
.DefaultValue((ui64)KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
opts.AddLongOption("cols", "Number of columns")
.DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt);
opts.AddLongOption("change-partitions-size", "Apply random changes of AUTO_PARTITIONING_PARTITION_SIZE_MB setting")
.DefaultValue((ui64)KvWorkloadConstants::MIXED_CHANGE_PARTITIONS_SIZE).StoreResult(&MixedChangePartitionsSize);
opts.AddLongOption("do-select", "Do SELECT operations")
.DefaultValue((ui64)KvWorkloadConstants::MIXED_DO_SELECT).StoreResult(&MixedDoSelect);
opts.AddLongOption("do-read-rows", "Do ReadRows operations")
.DefaultValue((ui64)KvWorkloadConstants::MIXED_DO_READ_ROWS).StoreResult(&MixedDoReadRows);
}
break;
case TWorkloadParams::ECommandType::Clean:
break;
}
}

}
THolder<IWorkloadQueryGenerator> TKvWorkloadParams::CreateGenerator() const {
return MakeHolder<TKvWorkloadGenerator>(this);
}

TString TKvWorkloadParams::GetWorkloadName() const {
return "Key-Value";
}

}
Loading