Skip to content

Commit 7ca436e

Browse files
authored
Remove TabletId from TValidatedDataTx (#683)
1 parent 59eff3a commit 7ca436e

9 files changed

+28
-29
lines changed

ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ EExecutionStatus TBuildDataTxOutRSUnit::Execute(TOperation::TPtr op,
6868
try {
6969
auto &outReadSets = op->OutReadSets();
7070

71-
if (tx->GetDataTx()->CheckCancelled())
71+
if (tx->GetDataTx()->CheckCancelled(DataShard.TabletID()))
7272
engine->Cancel();
7373
else
7474
engine->SetMemoryLimit(txc.GetMemoryLimit() - tx->GetDataTx()->GetTxSize());

ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
6666
const auto& dataTx = tx->GetDataTx();
6767
ui64 tabletId = DataShard.TabletID();
6868

69-
if (tx->GetDataTx()->CheckCancelled()) {
69+
if (tx->GetDataTx()->CheckCancelled(tabletId)) {
7070
tx->ReleaseTxData(txc, ctx);
7171
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED)
7272
->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Tx was cancelled");

ydb/core/tx/datashard/datashard_active_transaction.cpp

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
2020
const TString &txBody,
2121
bool usesMvccSnapshot)
2222
: StepTxId_(stepTxId)
23-
, TabletId_(self->TabletID())
2423
, TxBody(txBody)
2524
, EngineBay(self, txc, ctx, stepTxId.ToPair())
2625
, ErrCode(NKikimrTxDataShard::TError::OK)
@@ -33,6 +32,8 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
3332
, Cancelled(false)
3433
, ReceivedAt_(receivedAt)
3534
{
35+
const ui64 tabletId = self->TabletID();
36+
3637
bool success = Tx.ParseFromArray(TxBody.data(), TxBody.size());
3738
if (!success) {
3839
ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT;
@@ -70,7 +71,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
7071
}
7172
} else if (IsKqpTx()) {
7273
if (Y_UNLIKELY(!IsKqpDataTx())) {
73-
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Unexpected KQP transaction type, shard: " << TabletId()
74+
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Unexpected KQP transaction type, shard: " << tabletId
7475
<< ", txid: " << StepTxId_.TxId << ", tx: " << Tx.DebugString());
7576
ErrCode = NKikimrTxDataShard::TError::BAD_TX_KIND;
7677
ErrStr = TStringBuilder() << "Unexpected KQP transaction type: "
@@ -85,7 +86,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
8586
bool hasPersistentChannels = false;
8687
if (!KqpValidateTransaction(GetTasks(), Immediate(), StepTxId_.TxId, ctx, hasPersistentChannels)) {
8788
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "KQP transaction validation failed, datashard: "
88-
<< TabletId() << ", txid: " << StepTxId_.TxId);
89+
<< tabletId << ", txid: " << StepTxId_.TxId);
8990
ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR;
9091
ErrStr = "Transaction validation failed.";
9192
return;
@@ -96,15 +97,15 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
9697
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
9798
if (!task.GetMeta().UnpackTo(&meta)) {
9899
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "KQP transaction validation failed"
99-
<< ", datashard: " << TabletId()
100+
<< ", datashard: " << tabletId
100101
<< ", txid: " << StepTxId_.TxId
101102
<< ", failed to load task meta: " << task.GetMeta().value());
102103
ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR;
103104
ErrStr = "Transaction validation failed: invalid task metadata.";
104105
return;
105106
}
106107

107-
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TxId: " << StepTxId_.TxId << ", shard " << TabletId()
108+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TxId: " << StepTxId_.TxId << ", shard " << tabletId
108109
<< ", task: " << task.GetId() << ", meta: " << meta.ShortDebugString());
109110

110111
auto& tableMeta = meta.GetTable();
@@ -139,7 +140,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
139140
}
140141
}
141142

142-
KqpSetTxKeys(TabletId(), task.GetId(), tableInfo, meta, typeRegistry, ctx, EngineBay);
143+
KqpSetTxKeys(tabletId, task.GetId(), tableInfo, meta, typeRegistry, ctx, EngineBay);
143144

144145
for (auto& output : task.GetOutputs()) {
145146
for (auto& channel : output.GetChannels()) {
@@ -166,13 +167,13 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
166167
tasksRunner.Prepare(DefaultKqpDataReqMemoryLimits(), *execCtx);
167168
} catch (const TMemoryLimitExceededException&) {
168169
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Not enough memory to create tasks runner, datashard: "
169-
<< TabletId() << ", txid: " << StepTxId_.TxId);
170+
<< tabletId << ", txid: " << StepTxId_.TxId);
170171
ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR;
171172
ErrStr = TStringBuilder() << "Transaction validation failed: not enough memory.";
172173
return;
173174
} catch (const yexception& e) {
174175
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Exception while validating KQP transaction, datashard: "
175-
<< TabletId() << ", txid: " << StepTxId_.TxId << ", error: " << e.what());
176+
<< tabletId << ", txid: " << StepTxId_.TxId << ", error: " << e.what());
176177
ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR;
177178
ErrStr = TStringBuilder() << "Transaction validation failed: " << e.what() << ".";
178179
return;
@@ -191,7 +192,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
191192
IsReadOnly = IsReadOnly && Tx.GetReadOnly();
192193

193194
auto engine = EngineBay.GetEngine();
194-
auto result = engine->AddProgram(TabletId_, Tx.GetMiniKQL(), Tx.GetReadOnly());
195+
auto result = engine->AddProgram(tabletId, Tx.GetMiniKQL(), Tx.GetReadOnly());
195196

196197
ErrStr = engine->GetErrors();
197198
ErrCode = ConvertErrCode(result);
@@ -258,7 +259,7 @@ bool TValidatedDataTx::CanCancel() {
258259
return true;
259260
}
260261

261-
bool TValidatedDataTx::CheckCancelled() {
262+
bool TValidatedDataTx::CheckCancelled(ui64 tabletId) {
262263
if (Cancelled) {
263264
return true;
264265
}
@@ -270,11 +271,11 @@ bool TValidatedDataTx::CheckCancelled() {
270271
TInstant now = AppData()->TimeProvider->Now();
271272
Cancelled = (now >= Deadline());
272273

273-
Cancelled = Cancelled || gCancelTxFailPoint.Check(TabletId(), TxId());
274+
Cancelled = Cancelled || gCancelTxFailPoint.Check(tabletId, TxId());
274275

275276
if (Cancelled) {
276277
LOG_NOTICE_S(*TlsActivationContext->ExecutorThread.ActorSystem, NKikimrServices::TX_DATASHARD,
277-
"CANCELLED TxId " << TxId() << " at " << TabletId());
278+
"CANCELLED TxId " << TxId() << " at " << tabletId);
278279
}
279280
return Cancelled;
280281
}
@@ -547,7 +548,7 @@ void TActiveTransaction::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBas
547548
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << GetTxId() << " released its data");
548549
}
549550

550-
void TActiveTransaction::DbStoreLocksAccessLog(TDataShard * self,
551+
void TActiveTransaction::DbStoreLocksAccessLog(ui64 tabletId,
551552
TTransactionContext &txc,
552553
const TActorContext &ctx)
553554
{
@@ -570,10 +571,10 @@ void TActiveTransaction::DbStoreLocksAccessLog(TDataShard * self,
570571

571572
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
572573
"Storing " << vec.size() << " locks for txid=" << GetTxId()
573-
<< " in " << self->TabletID());
574+
<< " in " << tabletId);
574575
}
575576

576-
void TActiveTransaction::DbStoreArtifactFlags(TDataShard * self,
577+
void TActiveTransaction::DbStoreArtifactFlags(ui64 tabletId,
577578
TTransactionContext &txc,
578579
const TActorContext &ctx)
579580
{
@@ -585,7 +586,7 @@ void TActiveTransaction::DbStoreArtifactFlags(TDataShard * self,
585586

586587
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
587588
"Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId()
588-
<< " in " << self->TabletID());
589+
<< " in " << tabletId);
589590
}
590591

591592
ui64 TActiveTransaction::GetMemoryConsumption() const {

ydb/core/tx/datashard/datashard_active_transaction.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ class TValidatedDataTx : TNonCopyable {
134134

135135
TStepOrder StepTxId() const { return StepTxId_; }
136136
ui64 TxId() const { return StepTxId_.TxId; }
137-
ui64 TabletId() const { return TabletId_; }
138137
const TString& Body() const { return TxBody; }
139138

140139
ui64 LockTxId() const { return Tx.GetLockTxId(); }
@@ -175,7 +174,7 @@ class TValidatedDataTx : TNonCopyable {
175174
void ResetCounters() { EngineBay.ResetCounters(); }
176175

177176
bool CanCancel();
178-
bool CheckCancelled();
177+
bool CheckCancelled(ui64 tabletId);
179178

180179
void SetWriteVersion(TRowVersion writeVersion) { EngineBay.SetWriteVersion(writeVersion); }
181180
void SetReadVersion(TRowVersion readVersion) { EngineBay.SetReadVersion(readVersion); }
@@ -291,7 +290,6 @@ class TValidatedDataTx : TNonCopyable {
291290

292291
private:
293292
TStepOrder StepTxId_;
294-
ui64 TabletId_;
295293
TString TxBody;
296294
TActorId Source_;
297295
TEngineBay EngineBay;
@@ -516,10 +514,10 @@ class TActiveTransaction : public TOperation {
516514
return ArtifactFlags & LOCKS_STORED;
517515
}
518516

519-
void DbStoreLocksAccessLog(TDataShard * self,
517+
void DbStoreLocksAccessLog(ui64 tabletId,
520518
TTransactionContext &txc,
521519
const TActorContext &ctx);
522-
void DbStoreArtifactFlags(TDataShard * self,
520+
void DbStoreArtifactFlags(ui64 tabletId,
523521
TTransactionContext &txc,
524522
const TActorContext &ctx);
525523

ydb/core/tx/datashard/execute_data_tx_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
115115
}
116116

117117
// TODO: cancel tx in special execution unit.
118-
if (tx->GetDataTx()->CheckCancelled())
118+
if (tx->GetDataTx()->CheckCancelled(DataShard.TabletID()))
119119
engine->Cancel();
120120
else {
121121
ui64 consumed = tx->GetDataTx()->GetTxSize() + engine->GetMemoryAllocated();

ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
115115
return EExecutionStatus::Executed;
116116
}
117117

118-
if (dataTx->CheckCancelled()) {
118+
if (dataTx->CheckCancelled(DataShard.TabletID())) {
119119
tx->ReleaseTxData(txc, ctx);
120120
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED)
121121
->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Tx was cancelled");

ydb/core/tx/datashard/prepare_data_tx_in_rs_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ EExecutionStatus TPrepareDataTxInRSUnit::Execute(TOperation::TPtr op,
6060
Y_VERIFY_S(engine, "missing engine for " << *op << " at " << DataShard.TabletID());
6161

6262
// TODO: cancel tx in special execution unit.
63-
if (tx->GetDataTx()->CheckCancelled())
63+
if (tx->GetDataTx()->CheckCancelled(DataShard.TabletID()))
6464
engine->Cancel();
6565

6666
try {

ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ EExecutionStatus TPrepareKqpDataTxInRSUnit::Execute(TOperation::TPtr op, TTransa
4545
}
4646
}
4747

48-
if (tx->GetDataTx()->CheckCancelled()) {
48+
if (tx->GetDataTx()->CheckCancelled(DataShard.TabletID())) {
4949
tx->ReleaseTxData(txc, ctx);
5050
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED)
5151
->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Tx was cancelled");

ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ EExecutionStatus TStoreAndSendOutRSUnit::Execute(TOperation::TPtr op,
5656
if (!tx->IsLocksStored() && !tx->LocksAccessLog().Locks.empty()) {
5757
// N.B. we copy access log to locks cache, so that future lock access is repeatable
5858
tx->LocksCache().Locks = tx->LocksAccessLog().Locks;
59-
tx->DbStoreLocksAccessLog(&DataShard, txc, ctx);
59+
tx->DbStoreLocksAccessLog(DataShard.TabletID(), txc, ctx);
6060
// Freeze persistent locks that we have cached
6161
for (auto& pr : tx->LocksCache().Locks) {
6262
ui64 lockId = pr.first;
@@ -69,7 +69,7 @@ EExecutionStatus TStoreAndSendOutRSUnit::Execute(TOperation::TPtr op,
6969
newArtifact = true;
7070
}
7171
if (newArtifact)
72-
tx->DbStoreArtifactFlags(&DataShard, txc, ctx);
72+
tx->DbStoreArtifactFlags(DataShard.TabletID(), txc, ctx);
7373

7474
bool hadWrites = false;
7575
if (tx->IsOutRSStored() || tx->IsLocksStored()) {

0 commit comments

Comments
 (0)