Skip to content

Commit b7192f6

Browse files
maximyurchukpnv1
andauthored
Add log workload into cli (#13353)
Co-authored-by: Nikolay Perfilov <pnv1@yandex-team.ru>
1 parent 976d5ab commit b7192f6

File tree

11 files changed

+570
-10
lines changed

11 files changed

+570
-10
lines changed

ydb/library/workload/abstract/workload_query_generator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ struct TQueryInfo {
3535
TString TablePath;
3636
std::optional<NYdb::TValue> KeyToRead;
3737
std::optional<NYdb::NTable::TAlterTableSettings> AlterTable;
38+
std::function<NYdb::TStatus(NYdb::NTable::TTableClient& tableClient)> TableOperation;
3839

3940
std::optional<std::function<void(NYdb::NTable::TReadRowsResult)>> ReadRowsResultCallback;
4041
std::optional<std::function<void(NYdb::NTable::TDataQueryResult)>> DataQueryResultCallback;

ydb/library/workload/log/log.cpp

Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
#include "log.h"
2+
#include <util/generic/serialized_enum.h>
3+
#include <util/random/random.h>
4+
#include <util/datetime/base.h>
5+
6+
#include <cmath>
7+
#include <iomanip>
8+
#include <string>
9+
#include <thread>
10+
#include <random>
11+
#include <sstream>
12+
#include <chrono>
13+
#include <format>
14+
15+
namespace NYdbWorkload {
16+
17+
namespace NLog {
18+
19+
using TRow = TLogGenerator::TRow;
20+
21+
22+
TLogGenerator::TLogGenerator(const TLogWorkloadParams* params)
23+
: TBase(params)
24+
, TotalColumnsCnt(1 + Params.IntColumnsCnt + Params.StrColumnsCnt)
25+
, RandomDevice()
26+
, Mt19937(RandomDevice())
27+
{
28+
Y_ABORT_UNLESS(TotalColumnsCnt >= Params.KeyColumnsCnt);
29+
}
30+
31+
std::string TLogGenerator::GetDDLQueries() const {
32+
std::stringstream ss;
33+
34+
ss << "--!syntax_v1\n";
35+
ss << "CREATE TABLE `" << Params.DbPath << "/" << Params.TableName << "`(";
36+
37+
for (size_t i = 0; i < TotalColumnsCnt; ++i) {
38+
if (i == 0) {
39+
ss << "ts Timestamp";
40+
} else if (i < Params.IntColumnsCnt + 1) {
41+
ss << "c" << i << " Uint64";
42+
} else {
43+
ss << "c" << i << " String";
44+
}
45+
46+
if (i < Params.KeyColumnsCnt && Params.GetStoreType() == TLogWorkloadParams::EStoreType::Column) {
47+
ss << " NOT NULL";
48+
}
49+
ss << ", ";
50+
}
51+
52+
ss << "PRIMARY KEY(";
53+
ss << "ts, ";
54+
for (size_t i = 1; i < Params.KeyColumnsCnt; ++i) {
55+
ss << "c" << i;
56+
if (i + 1 < Params.KeyColumnsCnt) {
57+
ss << ", ";
58+
}
59+
}
60+
ss << ")) WITH (";
61+
62+
ss << "TTL = Interval(\"PT" << Params.TimestampTtlMinutes << "M\") ON ts, ";
63+
64+
switch (Params.GetStoreType()) {
65+
case TLogWorkloadParams::EStoreType::Row:
66+
ss << "STORE = ROW, ";
67+
break;
68+
case TLogWorkloadParams::EStoreType::Column:
69+
ss << "STORE = COLUMN, ";
70+
break;
71+
default:
72+
throw yexception() << "Unsupported store type: " << Params.GetStoreType();
73+
}
74+
if (Params.PartitionsByLoad) {
75+
ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, ";
76+
}
77+
ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ", ";
78+
ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", ";
79+
ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << Params.MinPartitions << ")";
80+
return ss.str();
81+
}
82+
83+
TQueryInfoList TLogGenerator::GetWorkload(int type) {
84+
switch (static_cast<EType>(type)) {
85+
case EType::Insert:
86+
return Insert(GenerateRandomRows());
87+
case EType::Upsert:
88+
return Upsert(GenerateRandomRows());
89+
case EType::BulkUpsert:
90+
return BulkUpsert(GenerateRandomRows());
91+
default:
92+
return TQueryInfoList();
93+
}
94+
}
95+
96+
97+
TVector<IWorkloadQueryGenerator::TWorkloadType> TLogGenerator::GetSupportedWorkloadTypes() const {
98+
TVector<TWorkloadType> result;
99+
result.emplace_back(static_cast<int>(EType::Insert), "insert", "Insert random rows into table near current ts");
100+
result.emplace_back(static_cast<int>(EType::Upsert), "upsert", "Upsert random rows into table near current ts");
101+
result.emplace_back(static_cast<int>(EType::BulkUpsert), "bulk_upsert", "Bulk upsert random rows into table near current ts");
102+
return result;
103+
}
104+
105+
TQueryInfoList TLogGenerator::WriteRows(TString operation, TVector<TRow>&& rows) {
106+
std::stringstream ss;
107+
108+
NYdb::TParamsBuilder paramsBuilder;
109+
110+
ss << "--!syntax_v1\n";
111+
112+
for (size_t row = 0; row < Params.RowsCnt; ++row) {
113+
for (size_t col = 0; col < TotalColumnsCnt; ++col) {
114+
TString cname = "$c" + std::to_string(row) + "_" + std::to_string(col);
115+
if (col == 0) {
116+
ss << "DECLARE " << cname << " AS Timestamp;\n";
117+
paramsBuilder.AddParam(cname).Timestamp(rows[row].Ts).Build();
118+
} else if (col < Params.IntColumnsCnt + 1) {
119+
ss << "DECLARE " << cname << " AS Uint64;\n";
120+
paramsBuilder.AddParam(cname).Uint64(rows[row].Ints[col - 1]).Build();
121+
} else {
122+
ss << "DECLARE " << cname << " AS String;\n";
123+
paramsBuilder.AddParam(cname).String(rows[row].Strings[col - Params.IntColumnsCnt - 1]).Build();
124+
}
125+
}
126+
}
127+
128+
ss << operation << " INTO `" << Params.TableName << "` (";
129+
130+
for (size_t col = 0; col < TotalColumnsCnt; ++col) {
131+
if (col != 0) {
132+
ss << "c" << col;
133+
} else {
134+
ss << "ts";
135+
}
136+
137+
if (col + 1 < TotalColumnsCnt) {
138+
ss << ", ";
139+
}
140+
}
141+
142+
ss << ") VALUES ";
143+
144+
for (size_t row = 0; row < Params.RowsCnt; ++row) {
145+
ss << "(";
146+
147+
for (size_t col = 0; col < TotalColumnsCnt; ++col) {
148+
ss << "$c" << row << "_" << col;
149+
if (col + 1 < TotalColumnsCnt) {
150+
ss << ", ";
151+
}
152+
}
153+
154+
ss << ")";
155+
156+
if (row + 1 < Params.RowsCnt) {
157+
ss << ", ";
158+
}
159+
}
160+
auto params = paramsBuilder.Build();
161+
return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params)));
162+
}
163+
164+
TQueryInfoList TLogGenerator::Insert(TVector<TRow>&& rows) {
165+
return WriteRows("INSERT", std::move(rows));
166+
}
167+
168+
TQueryInfoList TLogGenerator::Upsert(TVector<TRow>&& rows) {
169+
return WriteRows("UPSERT", std::move(rows));
170+
}
171+
172+
TQueryInfoList TLogGenerator::BulkUpsert(TVector<TRow>&& rows) {
173+
NYdb::TValueBuilder valueBuilder;
174+
valueBuilder.BeginList();
175+
for (const TRow& row : rows) {
176+
auto &listItem = valueBuilder.AddListItem();
177+
listItem.BeginStruct();
178+
for (size_t col = 0; col < TotalColumnsCnt; ++col) {
179+
if (col == 0) {
180+
listItem.AddMember("ts").Timestamp(row.Ts);
181+
} else if (col < Params.IntColumnsCnt + 1) {
182+
listItem.AddMember(std::format("c{}", col)).Uint64(row.Ints[col-1]);
183+
} else {
184+
listItem.AddMember(std::format("c{}", col)).String(row.Strings[col - Params.IntColumnsCnt - 1]);
185+
}
186+
}
187+
listItem.EndStruct();
188+
}
189+
valueBuilder.EndList();
190+
TString table_path = Params.DbPath + "/" + Params.TableName;
191+
NYdb::TValue rowsValue = valueBuilder.Build();
192+
auto bulkUpsertOperation = [table_path, rowsValue](NYdb::NTable::TTableClient& tableClient) {
193+
auto r = rowsValue;
194+
auto status = tableClient.BulkUpsert(table_path, std::move(r));
195+
return status.GetValueSync();
196+
};
197+
TQueryInfo queryInfo;
198+
queryInfo.TableOperation = bulkUpsertOperation;
199+
return TQueryInfoList(1, std::move(queryInfo));
200+
}
201+
202+
203+
TQueryInfoList TLogGenerator::GetInitialData() {
204+
TQueryInfoList res;
205+
return res;
206+
}
207+
208+
TVector<std::string> TLogGenerator::GetCleanPaths() const {
209+
return { Params.TableName };
210+
}
211+
212+
TVector<TRow> TLogGenerator::GenerateRandomRows() {
213+
TVector<TRow> result(Params.RowsCnt);
214+
215+
std::normal_distribution<double> normal_distribution_generator(0, static_cast<double>(Params.TimestampStandardDeviationMinutes));
216+
for (size_t row = 0; row < Params.RowsCnt; ++row) {
217+
result[row].Ts = TInstant::Now();
218+
i64 millisecondsDiff = 60*1000*normal_distribution_generator(Mt19937);
219+
if (millisecondsDiff >= 0) { // TDuration::MilliSeconds can't be negative for some reason...
220+
result[row].Ts = result[row].Ts + TDuration::MilliSeconds(millisecondsDiff);
221+
} else {
222+
result[row].Ts = result[row].Ts - TDuration::MilliSeconds(-millisecondsDiff);
223+
}
224+
225+
result[row].Ints.resize(Params.IntColumnsCnt);
226+
result[row].Strings.resize(Params.StrColumnsCnt);
227+
228+
for (size_t col = 0; col < Params.IntColumnsCnt; ++col) {
229+
ui64 val = RandomNumber<ui64>();
230+
result[row].Ints[col] = val;
231+
}
232+
233+
for (size_t col = 0; col < Params.StrColumnsCnt; ++col) {
234+
TString val;
235+
val = TString(Params.StringLen, '_');
236+
for (size_t i = 0; i < Params.StringLen; i++) {
237+
val[i] = (char)('a' + RandomNumber<u_char>(26));
238+
}
239+
result[row].Strings[col] = val;
240+
}
241+
}
242+
243+
return result;
244+
}
245+
246+
void TLogWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) {
247+
opts.AddLongOption('p', "path", "Path where benchmark tables are located")
248+
.Optional()
249+
.DefaultValue(TableName)
250+
.Handler1T<TStringBuf>([this](TStringBuf arg) {
251+
while(arg.SkipPrefix("/"));
252+
while(arg.ChopSuffix("/"));
253+
TableName = arg;
254+
});
255+
switch (commandType) {
256+
case TWorkloadParams::ECommandType::Init:
257+
opts.AddLongOption("min-partitions", "Minimum partitions for tables.")
258+
.DefaultValue((ui64)LogWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions);
259+
opts.AddLongOption("max-partitions", "Maximum partitions for tables.")
260+
.DefaultValue((ui64)LogWorkloadConstants::MAX_PARTITIONS).StoreResult(&MaxPartitions);
261+
opts.AddLongOption("partition-size", "Maximum partition size in megabytes (AUTO_PARTITIONING_PARTITION_SIZE_MB).")
262+
.DefaultValue((ui64)LogWorkloadConstants::PARTITION_SIZE_MB).StoreResult(&PartitionSizeMb);
263+
opts.AddLongOption("auto-partition", "Enable auto partitioning by load.")
264+
.DefaultValue((ui64)LogWorkloadConstants::PARTITIONS_BY_LOAD).StoreResult(&PartitionsByLoad);
265+
opts.AddLongOption("len", "String len")
266+
.DefaultValue((ui64)LogWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
267+
opts.AddLongOption("int-cols", "Number of int columns")
268+
.DefaultValue((ui64)LogWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt);
269+
opts.AddLongOption("str-cols", "Number of string columns")
270+
.DefaultValue((ui64)LogWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt);
271+
opts.AddLongOption("key-cols", "Number of key columns")
272+
.DefaultValue((ui64)LogWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt);
273+
opts.AddLongOption("ttl", "TTL for timestamp column in minutes")
274+
.DefaultValue((ui64)LogWorkloadConstants::TIMESTAMP_TTL_MIN).StoreResult(&TimestampTtlMinutes);
275+
opts.AddLongOption("store", "Storage type."
276+
" Options: row, column\n"
277+
" row - use row-based storage engine;\n"
278+
" column - use column-based storage engine.")
279+
.DefaultValue(StoreType)
280+
.Handler1T<TStringBuf>([this](TStringBuf arg) {
281+
const auto l = to_lower(TString(arg));
282+
if (!TryFromString(arg, StoreType)) {
283+
throw yexception() << "Ivalid store type: " << arg;
284+
}
285+
});
286+
break;
287+
case TWorkloadParams::ECommandType::Run:
288+
opts.AddLongOption("int-cols", "Number of int columns")
289+
.DefaultValue((ui64)LogWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt);
290+
opts.AddLongOption("str-cols", "Number of string columns")
291+
.DefaultValue((ui64)LogWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt);
292+
opts.AddLongOption("key-cols", "Number of key columns")
293+
.DefaultValue((ui64)LogWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt);
294+
switch (static_cast<TLogGenerator::EType>(workloadType)) {
295+
case TLogGenerator::EType::Insert:
296+
case TLogGenerator::EType::Upsert:
297+
case TLogGenerator::EType::BulkUpsert:
298+
opts.AddLongOption("len", "String len")
299+
.DefaultValue((ui64)LogWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
300+
opts.AddLongOption("rows", "Number of rows to upsert")
301+
.DefaultValue((ui64)LogWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt);
302+
opts.AddLongOption("timestamp_deviation", "Standard deviation. For each timestamp, a random variable with a specified standard deviation in minutes is added.")
303+
.DefaultValue((ui64)LogWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES).StoreResult(&TimestampStandardDeviationMinutes);
304+
break;
305+
}
306+
break;
307+
default:
308+
break;
309+
}
310+
}
311+
312+
THolder<IWorkloadQueryGenerator> TLogWorkloadParams::CreateGenerator() const {
313+
return MakeHolder<TLogGenerator>(this);
314+
}
315+
316+
TString TLogWorkloadParams::GetWorkloadName() const {
317+
return "Log";
318+
}
319+
320+
} // namespace NLog
321+
322+
} // namespace NYdbWorkload

0 commit comments

Comments
 (0)