Skip to content

Commit 9dfe3e5

Browse files
authored
Merge 06a439b into b4a7ed7
2 parents b4a7ed7 + 06a439b commit 9dfe3e5

File tree

11 files changed

+422
-114
lines changed

11 files changed

+422
-114
lines changed

ydb/core/protos/tx_columnshard.proto

+4
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ message TCommitTxBody {
181181
repeated uint64 WriteIds = 2;
182182
}
183183

184+
message TCommitWriteTxBody {
185+
optional uint64 LockId = 1;
186+
}
187+
184188
message TSchemaPresetVersionInfo {
185189
optional uint64 Id = 1;
186190
optional uint64 SinceStep = 2;

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

+15-2
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,15 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
8181
Y_ABORT_UNLESS(operation);
8282
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
8383
operation->OnWriteFinish(txc, aggr->GetWriteIds());
84-
ProposeTransaction(TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetTxId()), "", writeMeta.GetSource(), 0, txc);
84+
if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) {
85+
NKikimrTxColumnShard::TCommitWriteTxBody proto;
86+
proto.SetLockId(operation->GetLockId());
87+
TString txBody;
88+
Y_ABORT_UNLESS(proto.SerializeToString(&txBody));
89+
ProposeTransaction(TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetLockId()), txBody, writeMeta.GetSource(), operation->GetCookie(), txc);
90+
} else {
91+
Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID(), operation->GetLockId()));
92+
}
8593
} else {
8694
Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1);
8795
Results.emplace_back(std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS));
@@ -112,7 +120,12 @@ void TTxWrite::Complete(const TActorContext& ctx) {
112120
AFL_VERIFY(buffer.GetAggregations().size() == Results.size());
113121
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
114122
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta();
115-
ctx.Send(writeMeta.GetSource(), Results[i].release());
123+
auto operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
124+
if (operation) {
125+
ctx.Send(writeMeta.GetSource(), Results[i].release(), 0, operation->GetCookie());
126+
} else {
127+
ctx.Send(writeMeta.GetSource(), Results[i].release());
128+
}
116129
Self->CSCounters.OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
117130
Self->CSCounters.OnSuccessWriteResponse();
118131
}

ydb/core/tx/columnshard/columnshard__write.cpp

+107-22
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace NKikimr::NColumnShard {
1212

1313
using namespace NTabletFlatExecutor;
1414

15-
void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) {
15+
void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, const ui64 cookie, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) {
1616
IncCounter(COUNTER_WRITE_FAIL);
1717
switch (overloadReason) {
1818
case EOverloadStatus::Disk:
@@ -42,7 +42,7 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const
4242
<< " overload reason: [" << overloadReason << "]"
4343
<< " at tablet " << TabletID());
4444

45-
ctx.Send(writeData.GetWriteMeta().GetSource(), event.release());
45+
ctx.Send(writeData.GetWriteMeta().GetSource(), event.release(), 0, cookie);
4646
}
4747

4848
TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId) const {
@@ -116,8 +116,8 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
116116
} else {
117117
auto operation = OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
118118
Y_ABORT_UNLESS(operation);
119-
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails");
120-
ctx.Send(writeMeta.GetSource(), result.release());
119+
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetLockId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails");
120+
ctx.Send(writeMeta.GetSource(), result.release(), 0, operation->GetCookie());
121121
}
122122
CSCounters.OnFailedWriteResponse(EWriteFailReason::PutBlob);
123123
wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator());
@@ -149,6 +149,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
149149
const auto& record = Proto(ev->Get());
150150
const ui64 tableId = record.GetTableId();
151151
const ui64 writeId = record.GetWriteId();
152+
const ui64 cookie = ev->Cookie;
152153
const TString dedupId = record.GetDedupId();
153154
const auto source = ev->Sender;
154155

@@ -192,7 +193,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
192193
auto overloadStatus = CheckOverloaded(tableId);
193194
if (overloadStatus != EOverloadStatus::None) {
194195
std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED);
195-
OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx);
196+
OverloadWriteFail(overloadStatus, writeData, cookie, std::move(result), ctx);
196197
CSCounters.OnFailedWriteResponse(EWriteFailReason::Overload);
197198
} else {
198199
if (ui64 writeId = (ui64)HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) {
@@ -221,72 +222,156 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
221222
}
222223
}
223224

225+
class TCommitOperation {
226+
public:
227+
using TPtr = std::shared_ptr<TCommitOperation>;
228+
229+
bool Parse(const NEvents::TDataEvents::TEvWrite& evWrite) {
230+
LockId = evWrite.Record.GetLockTxId();
231+
TxId = evWrite.Record.GetTxId();
232+
KqpLocks = evWrite.Record.GetLocks();
233+
return !!LockId && !!TxId && KqpLocks.GetOp() == NKikimrDataEvents::TKqpLocks::Commit;
234+
}
235+
236+
private:
237+
NKikimrDataEvents::TKqpLocks KqpLocks;
238+
YDB_READONLY(ui64, LockId, 0);
239+
YDB_READONLY(ui64, TxId, 0);
240+
};
241+
class TProposeWriteTransaction : public TProposeTransactionBase {
242+
public:
243+
TProposeWriteTransaction(TColumnShard* self, TCommitOperation::TPtr op, const TActorId source, const ui64 cookie)
244+
: TProposeTransactionBase(self)
245+
, WriteCommit(op)
246+
, Source(source)
247+
, Cookie(cookie)
248+
{}
249+
250+
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
251+
void Complete(const TActorContext& ctx) override;
252+
TTxType GetTxType() const override { return TXTYPE_PROPOSE; }
253+
254+
private:
255+
void OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) override;
256+
void OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) override;
257+
258+
private:
259+
TCommitOperation::TPtr WriteCommit;
260+
TActorId Source;
261+
ui64 Cookie;
262+
std::unique_ptr<NActors::IEventBase> Result;
263+
};
264+
265+
bool TProposeWriteTransaction::Execute(TTransactionContext& txc, const TActorContext&) {
266+
NKikimrTxColumnShard::TCommitWriteTxBody proto;
267+
proto.SetLockId(WriteCommit->GetLockId());
268+
TString txBody;
269+
Y_ABORT_UNLESS(proto.SerializeToString(&txBody));
270+
ProposeTransaction(TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, WriteCommit->GetTxId()), txBody, Source, Cookie, txc);
271+
return true;
272+
}
273+
274+
void TProposeWriteTransaction::Complete(const TActorContext& ctx) {
275+
ctx.Send(Source, Result.release(), 0, Cookie);
276+
}
277+
278+
void TProposeWriteTransaction::OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) {
279+
Y_UNUSED(proposeResult);
280+
Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), txInfo.TxId, Self->GetProgressTxController().BuildCoordinatorInfo(txInfo));
281+
}
282+
283+
void TProposeWriteTransaction::OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) {
284+
Y_UNUSED(proposeResult);
285+
Result = NEvents::TDataEvents::TEvWriteResult::BuildError(Self->TabletID(), txInfo.TxId, NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, proposeResult.GetStatusMessage());
286+
}
287+
224288
void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx) {
225289
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("event", "TEvWrite");
226290

227291
const auto& record = ev->Get()->Record;
228-
const ui64 txId = ev->Get()->GetTxId();
229292
const auto source = ev->Sender;
293+
const auto cookie = ev->Cookie;
294+
const auto behaviour = TOperationsManager::GetBehaviour(*ev->Get());
295+
296+
if (behaviour == EOperationBehaviour::Undefined) {
297+
IncCounter(COUNTER_WRITE_FAIL);
298+
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "invalid write event");
299+
ctx.Send(source, result.release(), 0, cookie);
300+
return;
301+
}
302+
303+
if (behaviour == EOperationBehaviour::CommitWriteLock) {
304+
auto commitOperation = std::make_shared<TCommitOperation>();
305+
if (!commitOperation->Parse(*ev->Get())) {
306+
IncCounter(COUNTER_WRITE_FAIL);
307+
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "invalid commit event");
308+
ctx.Send(source, result.release(), 0, cookie);
309+
}
310+
Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx);
311+
return;
312+
}
313+
314+
const ui64 lockId = (behaviour == EOperationBehaviour::InTxWrite) ? record.GetTxId() : record.GetLockTxId();
230315

231316
if (record.GetOperations().size() != 1) {
232317
IncCounter(COUNTER_WRITE_FAIL);
233-
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "only single operation is supported");
234-
ctx.Send(source, result.release());
318+
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "only single operation is supported");
319+
ctx.Send(source, result.release(), 0, cookie);
235320
return;
236321
}
237322

238323
const auto& operation = record.GetOperations()[0];
239-
240324
if (operation.GetType() != NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE) {
241325
IncCounter(COUNTER_WRITE_FAIL);
242-
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "only REPLACE operation is supported");
243-
ctx.Send(source, result.release());
326+
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "only REPLACE operation is supported");
327+
ctx.Send(source, result.release(), 0, cookie);
244328
return;
245329
}
246330

247331
if (!operation.GetTableId().HasSchemaVersion()) {
248332
IncCounter(COUNTER_WRITE_FAIL);
249-
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "schema version not set");
250-
ctx.Send(source, result.release());
333+
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "schema version not set");
334+
ctx.Send(source, result.release(), 0, cookie);
251335
return;
252336
}
253337

254338
auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchema(operation.GetTableId().GetSchemaVersion());
255339
if (!schema) {
256340
IncCounter(COUNTER_WRITE_FAIL);
257-
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "unknown schema version");
258-
ctx.Send(source, result.release());
341+
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "unknown schema version");
342+
ctx.Send(source, result.release(), 0, cookie);
259343
return;
260344
}
261345

262346
const auto tableId = operation.GetTableId().GetTableId();
263347

264348
if (!TablesManager.IsReadyForWrite(tableId)) {
265349
IncCounter(COUNTER_WRITE_FAIL);
266-
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "table not writable");
267-
ctx.Send(source, result.release());
350+
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "table not writable");
351+
ctx.Send(source, result.release(), 0, cookie);
268352
return;
269353
}
270354

271355
auto arrowData = std::make_shared<TArrowData>(schema);
272356
if (!arrowData->Parse(operation, NEvWrite::TPayloadReader<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
273357
IncCounter(COUNTER_WRITE_FAIL);
274-
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "parsing data error");
275-
ctx.Send(source, result.release());
358+
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "parsing data error");
359+
ctx.Send(source, result.release(), 0, cookie);
276360
}
277361

278362
auto overloadStatus = CheckOverloaded(tableId);
279363
if (overloadStatus != EOverloadStatus::None) {
280364
NEvWrite::TWriteData writeData(NEvWrite::TWriteMeta(0, tableId, source), arrowData, nullptr, nullptr);
281-
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
282-
OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx);
365+
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
366+
OverloadWriteFail(overloadStatus, writeData, cookie, std::move(result), ctx);
283367
return;
284368
}
285369

286370
auto wg = WritesMonitor.RegisterWrite(arrowData->GetSize());
287371

288-
auto writeOperation = OperationsManager->RegisterOperation(txId);
372+
auto writeOperation = OperationsManager->RegisterOperation(lockId, cookie);
289373
Y_ABORT_UNLESS(writeOperation);
374+
writeOperation->SetBehaviour(behaviour);
290375
writeOperation->Start(*this, tableId, arrowData, source, ctx);
291376
}
292377

ydb/core/tx/columnshard/columnshard_impl.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ class TColumnShard
299299
}
300300

301301
private:
302-
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
302+
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, const ui64 cookie, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
303303
EOverloadStatus CheckOverloaded(const ui64 tableId) const;
304304

305305
protected:

ydb/core/tx/columnshard/columnshard_schema.h

+15-24
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ struct Schema : NIceDb::Schema {
4848
SharedBlobIdsTableId,
4949
BorrowedBlobIdsTableId,
5050
SourceSessionsTableId,
51-
DestinationSessionsTableId
51+
DestinationSessionsTableId,
52+
OperationTxIdsId
5253
};
5354

5455
enum class ETierTables: ui32 {
@@ -302,14 +303,23 @@ struct Schema : NIceDb::Schema {
302303

303304
struct Operations : NIceDb::Schema::Table<OperationsTableId> {
304305
struct WriteId : Column<1, NScheme::NTypeIds::Uint64> {};
305-
struct TxId : Column<2, NScheme::NTypeIds::Uint64> {};
306+
struct LockId : Column<2, NScheme::NTypeIds::Uint64> {};
306307
struct Status : Column<3, NScheme::NTypeIds::Uint32> {};
307308
struct CreatedAt : Column<4, NScheme::NTypeIds::Uint64> {};
308309
struct GlobalWriteId : Column<5, NScheme::NTypeIds::Uint64> {};
309310
struct Metadata : Column<6, NScheme::NTypeIds::String> {};
311+
struct Cookie : Column<7, NScheme::NTypeIds::Uint64> {};
310312

311313
using TKey = TableKey<WriteId>;
312-
using TColumns = TableColumns<TxId, WriteId, Status, CreatedAt, GlobalWriteId, Metadata>;
314+
using TColumns = TableColumns<LockId, WriteId, Status, CreatedAt, GlobalWriteId, Metadata, Cookie>;
315+
};
316+
317+
struct OperationTxIds : NIceDb::Schema::Table<OperationTxIdsId> {
318+
struct TxId : Column<1, NScheme::NTypeIds::Uint64> {};
319+
struct LockId : Column<2, NScheme::NTypeIds::Uint64> {};
320+
321+
using TKey = TableKey<TxId, LockId>;
322+
using TColumns = TableColumns<TxId, LockId>;
313323
};
314324

315325
struct TierBlobsDraft: NIceDb::Schema::Table<(ui32)ETierTables::TierBlobsDraft> {
@@ -456,7 +466,8 @@ struct Schema : NIceDb::Schema {
456466
SharedBlobIds,
457467
BorrowedBlobIds,
458468
SourceSessions,
459-
DestinationSessions
469+
DestinationSessions,
470+
OperationTxIds
460471
>;
461472

462473
//
@@ -678,26 +689,6 @@ struct Schema : NIceDb::Schema {
678689
}
679690
return true;
680691
}
681-
682-
// Operations
683-
static void Operations_Write(NIceDb::TNiceDb& db, const TWriteOperation& operation) {
684-
TString metadata;
685-
NKikimrTxColumnShard::TInternalOperationData proto;
686-
operation.ToProto(proto);
687-
Y_ABORT_UNLESS(proto.SerializeToString(&metadata));
688-
689-
db.Table<Operations>().Key((ui64)operation.GetWriteId()).Update(
690-
NIceDb::TUpdate<Operations::Status>((ui32)operation.GetStatus()),
691-
NIceDb::TUpdate<Operations::CreatedAt>(operation.GetCreatedAt().Seconds()),
692-
NIceDb::TUpdate<Operations::Metadata>(metadata),
693-
NIceDb::TUpdate<Operations::TxId>(operation.GetTxId())
694-
);
695-
}
696-
697-
static void Operations_Erase(NIceDb::TNiceDb& db, const TWriteId writeId) {
698-
db.Table<Operations>().Key((ui64)writeId).Delete();
699-
}
700-
701692
};
702693

703694
}

0 commit comments

Comments
 (0)