Skip to content

Commit 751ac1b

Browse files
committed
Fix
1 parent 87489e4 commit 751ac1b

File tree

2 files changed

+36
-30
lines changed

2 files changed

+36
-30
lines changed

ydb/core/persqueue/ut/partition_chooser_ut.cpp

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ void AddPartition(NKikimrSchemeOp::TPersQueueGroupDescription& conf,
2020
ui32 id,
2121
const std::optional<TString>&& boundaryFrom = std::nullopt,
2222
const std::optional<TString>&& boundaryTo = std::nullopt,
23-
std::vector<ui32> children = {}) {
23+
std::vector<ui32> children = {}) {
2424
auto* p = conf.AddPartitions();
2525
p->SetPartitionId(id);
2626
p->SetTabletId(1000 + id);
@@ -36,15 +36,21 @@ void AddPartition(NKikimrSchemeOp::TPersQueueGroupDescription& conf,
3636
}
3737
}
3838

39-
NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig0(bool SplitMergeEnabled) {
39+
NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig0(bool splitMergeEnabled) {
4040
NKikimrSchemeOp::TPersQueueGroupDescription result;
4141
NKikimrPQ::TPQTabletConfig* config = result.MutablePQTabletConfig();
4242

4343
result.SetBalancerTabletID(999);
4444

4545
auto* partitionStrategy = config->MutablePartitionStrategy();
4646
partitionStrategy->SetMinPartitionCount(3);
47-
partitionStrategy->SetMaxPartitionCount(SplitMergeEnabled ? 10 : 0);
47+
partitionStrategy->SetMaxPartitionCount(10);
48+
if (splitMergeEnabled) {
49+
partitionStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT);
50+
} else {
51+
partitionStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED);
52+
}
53+
4854

4955
config->SetTopicName("/Root/topic-1");
5056
config->SetTopicPath("/Root");
@@ -181,7 +187,7 @@ struct TWriteSessionMock: public NActors::TActorBootstrapped<TWriteSessionMock>
181187
hFunc(TEvPartitionChooser::TEvChooseResult, Handle);
182188
hFunc(TEvPartitionChooser::TEvChooseError, Handle);
183189
}
184-
}
190+
}
185191
};
186192

187193
NPersQueue::TTopicConverterPtr CreateTopicConverter() {
@@ -197,7 +203,7 @@ TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server,
197203
TWriteSessionMock* mock = new TWriteSessionMock();
198204

199205
NActors::TActorId parentId = server.GetRuntime()->Register(mock);
200-
server.GetRuntime()->Register(NKikimr::NPQ::CreatePartitionChooserActorM(parentId,
206+
server.GetRuntime()->Register(NKikimr::NPQ::CreatePartitionChooserActorM(parentId,
201207
config,
202208
fullConverter,
203209
sourceId,
@@ -265,7 +271,7 @@ void WriteToTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32
265271
if (pqConfig.GetTopicsAreFirstClassCitizen()) {
266272
query = TStringBuilder() << "--!syntax_v1\n"
267273
"UPSERT INTO `//Root/.metadata/TopicPartitionsMapping` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition, SeqNo) VALUES "
268-
"(" << encoded.KeysHash << ", \"" << fullConverter->GetClientsideName() << "\", \""
274+
"(" << encoded.KeysHash << ", \"" << fullConverter->GetClientsideName() << "\", \""
269275
<< encoded.EscapedSourceId << "\", "<< TInstant::Now().MilliSeconds() << ", "
270276
<< TInstant::Now().MilliSeconds() << ", " << partitionId << ", " << seqNo << ");";
271277
} else {
@@ -293,14 +299,14 @@ TMaybe<NYdb::TResultSet> SelectTable(NPersQueue::TTestServer& server, const TStr
293299
query = TStringBuilder() << "--!syntax_v1\n"
294300
"SELECT Partition, SeqNo "
295301
"FROM `//Root/.metadata/TopicPartitionsMapping` "
296-
"WHERE Hash = " << encoded.KeysHash <<
302+
"WHERE Hash = " << encoded.KeysHash <<
297303
" AND Topic = \"" << fullConverter->GetClientsideName() << "\"" <<
298304
" AND ProducerId = \"" << encoded.EscapedSourceId << "\"";
299305
} else {
300306
query = TStringBuilder() << "--!syntax_v1\n"
301307
"SELECT Partition, SeqNo "
302308
"FROM `/Root/PQ/SourceIdMeta2` "
303-
"WHERE Hash = " << encoded.KeysHash <<
309+
"WHERE Hash = " << encoded.KeysHash <<
304310
" AND Topic = \"" << fullConverter->GetClientsideName() << "\"" <<
305311
" AND SourceId = \"" << encoded.EscapedSourceId << "\"";
306312
}
@@ -320,7 +326,7 @@ void AssertTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32
320326

321327
UNIT_ASSERT(result);
322328
UNIT_ASSERT_VALUES_EQUAL_C(result->RowsCount(), 1, "Table must contains SourceId='" << sourceId << "'");
323-
329+
324330
NYdb::TResultSetParser parser(*result);
325331
UNIT_ASSERT(parser.TryNextRow());
326332
NYdb::TValueParser p(parser.GetValue(0));
@@ -407,7 +413,7 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_NewSourceId_Test) {
407413
}
408414

409415
Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionActive_BoundaryTrue_Test) {
410-
// We check the partition selection scenario when we have already written with the
416+
// We check the partition selection scenario when we have already written with the
411417
// specified SourceID, the partition to which we wrote is active, and the partition
412418
// boundaries coincide with the distribution.
413419
NPersQueue::TTestServer server = CreateServer();
@@ -427,7 +433,7 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionActive_Bo
427433
}
428434

429435
Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionActive_BoundaryFalse_Test) {
430-
// We check the partition selection scenario when we have already written with the
436+
// We check the partition selection scenario when we have already written with the
431437
// specified SourceID, the partition to which we wrote is active, and the partition
432438
// boundaries is not coincide with the distribution.
433439
NPersQueue::TTestServer server = CreateServer();

ydb/core/persqueue/ut/ya.make

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,26 @@ PEERDIR(
2929
YQL_LAST_ABI_VERSION()
3030

3131
SRCS(
32-
autoscaling_ut.cpp
33-
balancing_ut.cpp
34-
counters_ut.cpp
35-
pqtablet_mock.cpp
36-
internals_ut.cpp
37-
make_config.cpp
38-
metering_sink_ut.cpp
39-
mirrorer_ut.cpp
32+
# autoscaling_ut.cpp
33+
# balancing_ut.cpp
34+
# counters_ut.cpp
35+
# pqtablet_mock.cpp
36+
# internals_ut.cpp
37+
# make_config.cpp
38+
# metering_sink_ut.cpp
39+
# mirrorer_ut.cpp
4040
partition_chooser_ut.cpp
41-
pq_ut.cpp
42-
partition_ut.cpp
43-
partitiongraph_ut.cpp
44-
pqtablet_ut.cpp
45-
quota_tracker_ut.cpp
46-
sourceid_ut.cpp
47-
type_codecs_ut.cpp
48-
user_info_ut.cpp
49-
pqrb_describes_ut.cpp
50-
microseconds_sliding_window_ut.cpp
51-
fetch_request_ut.cpp
41+
# pq_ut.cpp
42+
# partition_ut.cpp
43+
# partitiongraph_ut.cpp
44+
# pqtablet_ut.cpp
45+
# quota_tracker_ut.cpp
46+
# sourceid_ut.cpp
47+
# type_codecs_ut.cpp
48+
# user_info_ut.cpp
49+
# pqrb_describes_ut.cpp
50+
# microseconds_sliding_window_ut.cpp
51+
# fetch_request_ut.cpp
5252
)
5353

5454
RESOURCE(

0 commit comments

Comments
 (0)