Skip to content

Commit 420b7d6

Browse files
committed
Disabling the tx cache in tests for forced serialization/deserialization of txs
1 parent af4bda3 commit 420b7d6

File tree

5 files changed

+70
-15
lines changed

5 files changed

+70
-15
lines changed

ydb/core/tx/datashard/datashard_pipeline.cpp

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ bool TPipeline::Load(NIceDb::TNiceDb& db) {
5959
return false;
6060
}
6161

62-
Config.Validate();
62+
Config.SetDefaultLimits();
6363
UtmostCompleteTx = LastCompleteTx;
6464

6565
return true;
@@ -1335,14 +1335,17 @@ ui64 TPipeline::GetInactiveTxSize() const {
13351335

13361336
bool TPipeline::SaveForPropose(TValidatedTx::TPtr tx) {
13371337
Y_ABORT_UNLESS(tx && tx->GetTxId());
1338-
if (DataTxCache.size() <= Config.LimitDataTxCache) {
1339-
ui64 quota = tx->GetMemoryConsumption();
1340-
if (Self->TryCaptureTxCache(quota)) {
1341-
tx->SetTxCacheUsage(quota);
1342-
DataTxCache[tx->GetTxId()] = tx;
1343-
return true;
1344-
}
1338+
1339+
if (DataTxCache.size() >= Config.LimitDataTxCache)
1340+
return false;
1341+
1342+
ui64 quota = tx->GetMemoryConsumption();
1343+
if (Self->TryCaptureTxCache(quota)) {
1344+
tx->SetTxCacheUsage(quota);
1345+
DataTxCache[tx->GetTxId()] = tx;
1346+
return true;
13451347
}
1348+
13461349
return false;
13471350
}
13481351

ydb/core/tx/datashard/datashard_pipeline.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class TPipeline : TNonCopyable {
5555
bool DirtyImmediate() const { return Flags & EFlagsDirtyImmediate; }
5656
bool SoftUpdates() const { return Flags & (EFlagsForceOnlineRW|EFlagsDirtyOnline|EFlagsDirtyImmediate); }
5757

58-
void Validate() {
58+
void SetDefaultLimits() {
5959
if (!LimitActiveTx)
6060
LimitActiveTx = DefaultLimitActiveTx() ;
6161
if (!LimitDataTxCache)
@@ -78,14 +78,17 @@ class TPipeline : TNonCopyable {
7878
} else {
7979
Flags &= ~(EFlagsForceOnlineRW | EFlagsDirtyOnline | EFlagsDirtyImmediate);
8080
}
81-
if (cfg.GetNumActiveTx()) {
81+
if (cfg.HasNumActiveTx()) {
8282
LimitActiveTx = cfg.GetNumActiveTx();
83+
} else {
84+
LimitActiveTx = DefaultLimitActiveTx();
8385
}
84-
if (cfg.GetDataTxCacheSize()) {
86+
87+
if (cfg.HasDataTxCacheSize()) {
8588
LimitDataTxCache = cfg.GetDataTxCacheSize();
89+
} else {
90+
LimitDataTxCache = DefaultLimitDataTxCache();
8691
}
87-
88-
Validate();
8992
}
9093
};
9194

ydb/core/tx/datashard/datashard_ut_write.cpp

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
196196

197197
Cout << "========= Wait for completed transaction =========\n";
198198
{
199-
auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
200-
auto writeResult = ev->Get()->Record;
199+
auto writeResult = WaitForWriteCompleted(runtime, sender);
201200

202201
UNIT_ASSERT_VALUES_EQUAL_C(writeResult.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED, "Status: " << writeResult.GetStatus() << " Issues: " << writeResult.GetIssues());
203202
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrigin(), shard);
@@ -217,6 +216,43 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
217216
UNIT_ASSERT_VALUES_EQUAL(tableState, expectedTableState);
218217
}
219218

219+
}
220+
221+
Y_UNIT_TEST(WritePreparedNoTxCache) {
222+
auto [runtime, server, sender] = TestCreateServer();
223+
224+
TShardedTableOptions opts;
225+
opts.DataTxCacheSize(0); //disabling the tx cache for forced serialization/deserialization of txs
226+
const auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
227+
const ui64 shard = shards[0];
228+
const ui32 rowCount = 3;
229+
230+
ui64 txId = 100;
231+
ui64 minStep, maxStep;
232+
233+
Cout << "========= Send prepare =========\n";
234+
{
235+
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
236+
minStep = writeResult.GetMinStep();
237+
maxStep = writeResult.GetMaxStep();
238+
}
239+
240+
Cout << "========= Send propose to coordinator =========\n";
241+
{
242+
SendProposeToCoordinator(server, shards, minStep, maxStep, txId);
243+
}
244+
245+
Cout << "========= Wait for completed transaction =========\n";
246+
{
247+
WaitForWriteCompleted(runtime, sender);
248+
}
249+
250+
Cout << "========= Read table =========\n";
251+
{
252+
auto tableState = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
253+
UNIT_ASSERT_VALUES_EQUAL(tableState, expectedTableState);
254+
}
255+
220256
} // Y_UNIT_TEST
221257

222258
} // Y_UNIT_TEST_SUITE

ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1184,6 +1184,9 @@ std::tuple<TVector<ui64>, TTableId> CreateShardedTable(
11841184
if (!opts.EnableOutOfOrder_)
11851185
desc->MutablePartitionConfig()->MutablePipelineConfig()->SetEnableOutOfOrder(false);
11861186

1187+
if (opts.DataTxCacheSize_)
1188+
desc->MutablePartitionConfig()->MutablePipelineConfig()->SetDataTxCacheSize(*opts.DataTxCacheSize_);
1189+
11871190
if (opts.Policy_) {
11881191
opts.Policy_->Serialize(*desc->MutablePartitionConfig()->MutableCompactionPolicy());
11891192
}
@@ -1986,6 +1989,14 @@ TTestActorRuntimeBase::TEventObserverHolderPair ReplaceEvProposeTransactionWithE
19861989
return {std::move(requestObserver), std::move(responseObserver)};
19871990
}
19881991

1992+
NKikimrDataEvents::TEvWriteResult WaitForWriteCompleted(TTestActorRuntime& runtime, TActorId sender)
1993+
{
1994+
auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
1995+
auto resultRecord = ev->Get()->Record;
1996+
UNIT_ASSERT_C(resultRecord.GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED, "Status: " << resultRecord.GetStatus() << " Issues: " << resultRecord.GetIssues());
1997+
return resultRecord;
1998+
}
1999+
19892000
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values)
19902001
{
19912002
auto txTypes = std::make_shared<NTxProxy::TUploadTypes>();

ydb/core/tx/datashard/ut_common/datashard_ut_common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ struct TShardedTableOptions {
475475
TABLE_OPTION(bool, FollowerPromotion, false);
476476
TABLE_OPTION(bool, ExternalStorage, false);
477477
TABLE_OPTION(std::optional<ui64>, ExecutorCacheSize, std::nullopt);
478+
TABLE_OPTION(std::optional<ui32>, DataTxCacheSize, std::nullopt);
478479
TABLE_OPTION(bool, Replicated, false);
479480
TABLE_OPTION(std::optional<EReplicationConsistency>, ReplicationConsistency, std::nullopt);
480481
TABLE_OPTION(TAttributes, Attributes, {});
@@ -710,6 +711,7 @@ void ExecSQL(Tests::TServer::TPtr server,
710711

711712
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
712713
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
714+
NKikimrDataEvents::TEvWriteResult WaitForWriteCompleted(TTestActorRuntime& runtime, TActorId sender);
713715

714716
struct TEvWriteRow {
715717
TEvWriteRow(const TTableId& tableId, std::initializer_list<ui32> init)

0 commit comments

Comments
 (0)