Skip to content

Commit e795c3a

Browse files
committed
EvWrite locks
1 parent fb106cc commit e795c3a

File tree

5 files changed

+44
-59
lines changed

5 files changed

+44
-59
lines changed

ydb/core/tx/datashard/datashard__engine_host.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class TEngineBay : TNonCopyable {
5252

5353
const NMiniKQL::IEngineFlat * GetEngine() const { return Engine.Get(); }
5454
NMiniKQL::IEngineFlat * GetEngine();
55+
NMiniKQL::TEngineHost * GetEngineHost() { return EngineHost.Get(); }
5556
void SetLockTxId(ui64 lockTxId, ui32 lockNodeId);
5657
void SetUseLlvmRuntime(bool llvmRuntime) { EngineSettings->LlvmRuntime = llvmRuntime; }
5758

ydb/core/tx/datashard/datashard_pipeline.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1580,7 +1580,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
15801580
return writeOp;
15811581
}
15821582

1583-
writeOp->ExtractKeys();
1583+
writeTx->ExtractKeys(true);
15841584

15851585
switch (rec.txmode()) {
15861586
case NKikimrDataEvents::TEvWrite::MODE_PREPARE:

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

Lines changed: 25 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,16 @@ struct TTestHelper {
735735
UNIT_ASSERT_VALUES_EQUAL(rowsRead, Min(rowCount, limit));
736736
}
737737

738+
void WriteRowTwin(const TString& tableName, const TVector<ui32>& values, bool isEvWrite) {
739+
if(isEvWrite)
740+
WriteRow(tableName, ++TxId, values);
741+
else
742+
ExecSQL(Server, Sender, TStringBuilder()
743+
<< "UPSERT INTO `/Root/" << tableName << "`\n"
744+
<< "(" << JoinSeq(",", MakeMappedRange(Tables[tableName].Columns, [](const auto& col) { return col.Name; })) << ")\n"
745+
<< "VALUES\n(" << JoinSeq(",", values) << ");");
746+
}
747+
738748
NKikimrDataEvents::TEvWriteResult WriteRow(const TString& tableName, ui64 txId, const TVector<ui32>& values, NKikimrDataEvents::TEvWrite::ETxMode txMode = NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE) {
739749
const auto& table = Tables[tableName];
740750

@@ -909,6 +919,7 @@ struct TTestHelper {
909919
ui64 ShardCount = 1;
910920
Tests::TServer::TPtr Server;
911921
TActorId Sender;
922+
ui64 TxId = 100;
912923

913924
THashMap<TString, TTableInfo> Tables;
914925
};
@@ -3148,59 +3159,32 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
31483159
UNIT_ASSERT(lock.GetCounter() < brokenLock.GetCounter());
31493160
}
31503161

3151-
Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadRange) {
3162+
Y_UNIT_TEST_TWIN(ShouldReturnBrokenLockWhenReadRange, EvWrite) {
31523163
// upsert into "left border -1 " and to the "right border + 1" - lock not broken
31533164
// upsert inside range - broken
31543165
TTestHelper helper;
31553166

31563167
const ui64 lockTxId = 1011121314;
3168+
const TString tableName = "table-1";
3169+
const TVector<ui32> checkKey = {11, 11, 11};
31573170

3158-
auto request1 = helper.GetBaseReadRequest("table-1", 1);
3171+
auto request1 = helper.GetBaseReadRequest(tableName, 1);
31593172
request1->Record.SetLockTxId(lockTxId);
3160-
AddRangeQuery<ui32>(
3161-
*request1,
3162-
{3, 3, 3},
3163-
true,
3164-
{8, 0, 1},
3165-
true
3166-
);
3173+
AddRangeQuery<ui32>(*request1, {3, 3, 3}, true, {8, 0, 1}, true);
3174+
auto readResult1 = helper.SendRead(tableName, request1.release());
31673175

3168-
auto readResult1 = helper.SendRead("table-1", request1.release());
3176+
// upsert to the left and check that lock is not broken
3177+
helper.WriteRowTwin(tableName, {1, 1, 1, 101}, EvWrite);
3178+
helper.CheckLockValid(tableName, 2, checkKey, lockTxId);
31693179

3170-
{
3171-
// upsert to the left and check that lock is not broken
3172-
ExecSQL(helper.Server, helper.Sender, R"(
3173-
UPSERT INTO `/Root/table-1`
3174-
(key1, key2, key3, value)
3175-
VALUES
3176-
(1, 1, 1, 101);
3177-
)");
3178-
3179-
helper.CheckLockValid("table-1", 2, {11, 11, 11}, lockTxId);
3180-
}
3181-
3182-
{
3183-
// upsert to the right and check that lock is not broken
3184-
ExecSQL(helper.Server, helper.Sender, R"(
3185-
UPSERT INTO `/Root/table-1`
3186-
(key1, key2, key3, value)
3187-
VALUES
3188-
(8, 1, 0, 802);
3189-
)");
3190-
3191-
helper.CheckLockValid("table-1", 2, {11, 11, 11}, lockTxId);
3192-
}
3180+
// upsert to the right and check that lock is not broken
3181+
helper.WriteRowTwin(tableName, {8, 1, 0, 802}, EvWrite);
3182+
helper.CheckLockValid(tableName, 2, checkKey, lockTxId);
31933183

31943184
// breaks lock
31953185
// also we modify range: insert new key
3196-
ExecSQL(helper.Server, helper.Sender, R"(
3197-
UPSERT INTO `/Root/table-1`
3198-
(key1, key2, key3, value)
3199-
VALUES
3200-
(4, 4, 4, 400);
3201-
)");
3202-
3203-
helper.CheckLockBroken("table-1", 3, {11, 11, 11}, lockTxId, *readResult1);
3186+
helper.WriteRowTwin(tableName, {4, 4, 4, 400}, EvWrite);
3187+
helper.CheckLockBroken(tableName, 3, checkKey, lockTxId, *readResult1);
32043188
}
32053189

32063190
Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadRangeInvisibleRowSkips) {

ydb/core/tx/datashard/datashard_write_operation.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ class TValidatedWriteTx: TNonCopyable {
8686
NMiniKQL::IEngineFlat* GetEngine() {
8787
return EngineBay.GetEngine();
8888
}
89+
NMiniKQL::TEngineHost* GetEngineHost() {
90+
return EngineBay.GetEngineHost();
91+
}
8992
void DestroyEngine() {
9093
EngineBay.DestroyEngine();
9194
}

ydb/core/tx/datashard/write_unit.cpp

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#include "datashard_locks_db.h"
55
#include "datashard_user_db.h"
66

7+
#include <ydb/core/engine/mkql_engine_flat_host.h>
8+
79
namespace NKikimr {
810
namespace NDataShard {
911

@@ -36,7 +38,7 @@ class TWriteUnit : public TExecutionUnit {
3638
}
3739

3840
void DoExecute(TDataShard* self, TWriteOperation* writeOp, TTransactionContext& txc, const TActorContext& ctx) {
39-
const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();
41+
TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();
4042

4143
const ui64 tableId = writeTx->GetTableId().PathId.LocalPathId;
4244
const TTableId fullTableId(self->GetPathOwnerId(), tableId);
@@ -51,54 +53,49 @@ class TWriteUnit : public TExecutionUnit {
5153
Y_ABORT_UNLESS(TableInfo_.LocalTid == localTableId);
5254
Y_ABORT_UNLESS(TableInfo_.ShadowTid == shadowTableId);
5355

54-
const ui32 writeTableId = localTableId;
5556
auto [readVersion, writeVersion] = self->GetReadWriteVersions(writeOp);
5657
writeTx->SetReadVersion(readVersion);
5758
writeTx->SetWriteVersion(writeVersion);
5859

5960
TDataShardUserDb userDb(*self, txc.DB, readVersion);
6061
TDataShardChangeGroupProvider groupProvider(*self, txc.DB);
6162

62-
TVector<TRawTypeValue> key;
63-
TVector<NTable::TUpdateOp> value;
64-
6563
TVector<TCell> keyCells;
64+
TVector<NMiniKQL::IEngineFlatHost::TUpdateCommand> commands;
6665

6766
const TSerializedCellMatrix& matrix = writeTx->GetMatrix();
6867

6968
for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx)
7069
{
71-
key.clear();
7270
keyCells.clear();
7371
keyCells.reserve(TableInfo_.KeyColumnIds.size());
7472
ui64 keyBytes = 0;
7573
for (ui16 keyColIdx = 0; keyColIdx < TableInfo_.KeyColumnIds.size(); ++keyColIdx) {
76-
const auto& cellType = TableInfo_.KeyColumnTypes[keyColIdx];
7774
const TCell& cell = matrix.GetCell(rowIdx, keyColIdx);
78-
keyBytes += cell.Size();
79-
key.emplace_back(TRawTypeValue(cell.AsRef(), cellType));
75+
keyBytes += cell.IsNull() ? 1 : cell.Size();
8076
keyCells.emplace_back(cell);
8177
}
8278

83-
value.clear();
79+
commands.clear();
80+
Y_ABORT_UNLESS(matrix.GetColCount() >= TableInfo_.KeyColumnIds.size());
81+
commands.reserve(matrix.GetColCount() - TableInfo_.KeyColumnIds.size());
82+
83+
ui64 valueBytes = 0;
8484
for (ui16 valueColIdx = TableInfo_.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) {
8585
ui32 columnTag = writeTx->RecordOperation().GetColumnIds(valueColIdx);
8686
const TCell& cell = matrix.GetCell(rowIdx, valueColIdx);
87-
auto* col = TableInfo_.Columns.FindPtr(valueColIdx + 1);
88-
Y_ABORT_UNLESS(col);
87+
valueBytes += cell.IsNull() ? 1 : cell.Size();
8988

90-
value.emplace_back(NTable::TUpdateOp(columnTag, NTable::ECellOp::Set, TRawTypeValue(cell.AsRef(), col->Type)));
89+
NMiniKQL::IEngineFlatHost::TUpdateCommand command = {columnTag, TKeyDesc::EColumnOperation::Set, {}, cell};
90+
commands.emplace_back(std::move(command));
9191
}
9292

93-
txc.DB.Update(writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion);
94-
self->GetConflictsCache().GetTableCache(writeTableId).RemoveUncommittedWrites(keyCells, txc.DB);
93+
writeTx->GetEngineHost()->UpdateRow(fullTableId, keyCells, commands);
9594
}
9695
//TODO: Counters
9796
// self->IncCounter(COUNTER_UPLOAD_ROWS, rowCount);
9897
// self->IncCounter(COUNTER_UPLOAD_ROWS_BYTES, matrix.GetBuffer().size());
9998

100-
TableInfo_.Stats.UpdateTime = TAppData::TimeProvider->Now();
101-
10299
writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildCommited(self->TabletID(), writeOp->GetTxId()));
103100

104101
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Executed write operation for " << *writeOp << " at " << self->TabletID());

0 commit comments

Comments
 (0)