Skip to content

Commit 904084e

Browse files
committed
Remove TabletId from TWriteOperation
1 parent b95ccbf commit 904084e

File tree

5 files changed

+28
-34
lines changed

5 files changed

+28
-34
lines changed

ydb/core/tx/datashard/check_write_unit.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
6666

6767
DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
6868

69-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
69+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID());
7070
op->Abort(EExecutionUnitKind::FinishProposeWrite);
7171

7272
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
@@ -87,7 +87,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
8787
<< " bytes which exceeds limit " << NLimits::MaxWriteKeySize
8888
<< " bytes at " << DataShard.TabletID();
8989

90-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err);
90+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err, DataShard.TabletID());
9191
op->Abort(EExecutionUnitKind::FinishProposeWrite);
9292

9393
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
@@ -103,7 +103,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
103103
<< "Transaction write column value of " << col.ImmediateUpdateSize
104104
<< " bytes is larger than the allowed threshold";
105105

106-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err);
106+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err, DataShard.TabletID());
107107
op->Abort(EExecutionUnitKind::FinishProposeWrite);
108108

109109
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
@@ -126,7 +126,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
126126

127127
DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
128128

129-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
129+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID());
130130
op->Abort(EExecutionUnitKind::FinishProposeWrite);
131131

132132
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
@@ -143,7 +143,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
143143
if (!Pipeline.AssignPlanInterval(op)) {
144144
TString err = TStringBuilder() << "Can't propose tx " << op->GetTxId() << " at blocked shard " << DataShard.TabletID();
145145

146-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
146+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID());
147147
op->Abort(EExecutionUnitKind::FinishProposeWrite);
148148

149149
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err);

ydb/core/tx/datashard/datashard_pipeline.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1568,7 +1568,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
15681568
op->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "WriteOperation", NWilson::EFlags::AUTO_END);
15691569

15701570
auto badRequest = [&](const TString& error) {
1571-
op->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, error);
1571+
op->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, error, Self->TabletID());
15721572
LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error);
15731573
};
15741574

ydb/core/tx/datashard/datashard_write_operation.cpp

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc,
2121
: Ev(ev)
2222
, EngineBay(self, txc, ctx, stepTxId.ToPair())
2323
, StepTxId(stepTxId)
24-
, TabletId(self->TabletID())
2524
, ReceivedAt(receivedAt)
2625
, TxSize(0)
2726
, ErrCode(NKikimrTxDataShard::TError::OK)
@@ -40,12 +39,12 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc,
4039

4140
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
4241

43-
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Parsing write transaction for " << StepTxId << " at " << TabletId << ", record: " << GetRecord().ShortDebugString());
42+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Parsing write transaction for " << StepTxId << " at " << self->TabletID() << ", record: " << GetRecord().ShortDebugString());
4443

4544
if (!ParseRecord(self->TableInfos))
4645
return;
4746

48-
SetTxKeys(RecordOperation().GetColumnIds(), typeRegistry, ctx);
47+
SetTxKeys(RecordOperation().GetColumnIds(), typeRegistry, self->TabletID(), ctx);
4948

5049
KqpSetTxLocksKeys(GetKqpLocks(), self->SysLocksTable(), EngineBay);
5150
EngineBay.MarkTxLoaded();
@@ -147,15 +146,15 @@ TVector<TEngineBay::TColumnWriteMeta> GetColumnWrites(const ::google::protobuf::
147146
return writeColumns;
148147
}
149148

150-
void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags, const NScheme::TTypeRegistry& typeRegistry, const TActorContext& ctx)
149+
void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags, const NScheme::TTypeRegistry& typeRegistry, ui64 tabletId, const TActorContext& ctx)
151150
{
152151
TVector<TCell> keyCells;
153152
for (ui32 rowIdx = 0; rowIdx <Matrix.GetRowCount(); ++rowIdx)
154153
{
155-
Matrix.GetSubmatrix(rowIdx, rowIdx, 0, TableInfo->KeyColumnIds.size() - 1, keyCells);
154+
Matrix.GetSubmatrix(rowIdx, rowIdx, 0, TableInfo->KeyColumnIds.size() - 1, keyCells);
156155

157-
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo->Path << ", shard: " << TabletId << ", "
158-
<< "write point " << DebugPrintPoint(TableInfo->KeyColumnTypes, keyCells, typeRegistry));
156+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo->Path << ", shard: " << tabletId << ", "
157+
<< "write point " << DebugPrintPoint(TableInfo->KeyColumnTypes, keyCells, typeRegistry));
159158
TTableRange tableRange(keyCells);
160159
EngineBay.AddWriteRange(TableId, tableRange, TableInfo->KeyColumnTypes, GetColumnWrites(columnTags), false);
161160
}
@@ -316,7 +315,7 @@ void TWriteOperation::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase&
316315
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << GetTxId() << " released its data");
317316
}
318317

319-
void TWriteOperation::DbStoreLocksAccessLog(TTransactionContext& txc, const TActorContext& ctx)
318+
void TWriteOperation::DbStoreLocksAccessLog(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx)
320319
{
321320
using Schema = TDataShard::Schema;
322321

@@ -334,17 +333,17 @@ void TWriteOperation::DbStoreLocksAccessLog(TTransactionContext& txc, const TAct
334333
TStringBuf vecData(vecDataStart, vecDataSize);
335334
db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update(NIceDb::TUpdate<Schema::TxArtifacts::Locks>(vecData));
336335

337-
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing " << vec.size() << " locks for txid=" << GetTxId() << " in " << GetTabletId());
336+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing " << vec.size() << " locks for txid=" << GetTxId() << " in " << self->TabletID());
338337
}
339338

340-
void TWriteOperation::DbStoreArtifactFlags(TTransactionContext& txc, const TActorContext& ctx)
339+
void TWriteOperation::DbStoreArtifactFlags(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx)
341340
{
342341
using Schema = TDataShard::Schema;
343342

344343
NIceDb::TNiceDb db(txc.DB);
345344
db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update<Schema::TxArtifacts::Flags>(ArtifactFlags);
346345

347-
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " in " << GetTabletId());
346+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " in " << self->TabletID());
348347
}
349348

350349
ui64 TWriteOperation::GetMemoryConsumption() const {
@@ -488,9 +487,9 @@ void TWriteOperation::UntrackMemory() const {
488487
NActors::NMemory::TLabel<MemoryLabelActiveTransactionBody>::Sub(GetRecord().SpaceUsed());
489488
}
490489

491-
void TWriteOperation::SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg) {
490+
void TWriteOperation::SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg, ui64 tabletId) {
492491
SetAbortedFlag();
493-
WriteResult = NEvents::TDataEvents::TEvWriteResult::BuildError(GetTabletId(), GetTxId(), status, errorMsg);
492+
WriteResult = NEvents::TDataEvents::TEvWriteResult::BuildError(tabletId, GetTxId(), status, errorMsg);
494493
}
495494

496495
void TWriteOperation::SetWriteResult(std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& writeResult) {

ydb/core/tx/datashard/datashard_write_operation.h

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ class TValidatedWriteTx: TNonCopyable {
148148
}
149149

150150
bool ParseRecord(const TDataShard::TTableInfos& tableInfos);
151-
void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds, const NScheme::TTypeRegistry& typeRegistry, const TActorContext& ctx);
151+
void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds, const NScheme::TTypeRegistry& typeRegistry, ui64 tabletId, const TActorContext& ctx);
152152

153153
ui32 ExtractKeys(bool allowErrors);
154154
bool ReValidateKeys();
@@ -181,7 +181,6 @@ class TValidatedWriteTx: TNonCopyable {
181181
YDB_ACCESSOR_DEF(TActorId, Source);
182182

183183
YDB_READONLY(TStepOrder, StepTxId, TStepOrder(0, 0));
184-
YDB_READONLY(ui64, TabletId, 0);
185184
YDB_READONLY_DEF(TTableId, TableId);
186185
YDB_READONLY_DEF(TSerializedCellMatrix, Matrix);
187186
YDB_READONLY_DEF(TInstant, ReceivedAt);
@@ -222,10 +221,6 @@ class TWriteOperation : public TOperation {
222221
TrackMemory();
223222
}
224223

225-
ui64 GetTabletId() const {
226-
return WriteTx->GetTabletId();
227-
}
228-
229224
void Deactivate() override {
230225
ClearEv();
231226

@@ -268,8 +263,8 @@ class TWriteOperation : public TOperation {
268263
return ArtifactFlags & LOCKS_STORED;
269264
}
270265

271-
void DbStoreLocksAccessLog(TTransactionContext& txc, const TActorContext& ctx);
272-
void DbStoreArtifactFlags(TTransactionContext& txc, const TActorContext& ctx);
266+
void DbStoreLocksAccessLog(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx);
267+
void DbStoreArtifactFlags(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx);
273268

274269
ui64 GetMemoryConsumption() const;
275270

@@ -338,7 +333,7 @@ class TWriteOperation : public TOperation {
338333
return std::move(WriteResult);
339334
}
340335

341-
void SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg);
336+
void SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg, ui64 tabletId);
342337
void SetWriteResult(std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& writeResult);
343338

344339
private:

ydb/core/tx/datashard/write_unit.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class TWriteUnit : public TExecutionUnit {
4242
const TTableId fullTableId(self->GetPathOwnerId(), tableId);
4343
const ui64 localTableId = self->GetLocalTableId(fullTableId);
4444
if (localTableId == 0) {
45-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId);
45+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId, self->TabletID());
4646
return;
4747
}
4848
const ui64 shadowTableId = self->GetShadowTableId(fullTableId);
@@ -73,7 +73,7 @@ class TWriteUnit : public TExecutionUnit {
7373
const auto& cellType = TableInfo_.KeyColumnTypes[keyColIdx];
7474
const TCell& cell = matrix.GetCell(rowIdx, keyColIdx);
7575
if (cellType.GetTypeId() == NScheme::NTypeIds::Uint8 && !cell.IsNull() && cell.AsValue<ui8>() > 127) {
76-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "Keys with Uint8 column values >127 are currently prohibited");
76+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "Keys with Uint8 column values >127 are currently prohibited", self->TabletID());
7777
return;
7878
}
7979

@@ -83,7 +83,7 @@ class TWriteUnit : public TExecutionUnit {
8383
}
8484

8585
if (keyBytes > NLimits::MaxWriteKeySize) {
86-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize);
86+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize, self->TabletID());
8787
return;
8888
}
8989

@@ -92,7 +92,7 @@ class TWriteUnit : public TExecutionUnit {
9292
ui32 columnTag = writeTx->RecordOperation().GetColumnIds(valueColIdx);
9393
const TCell& cell = matrix.GetCell(rowIdx, valueColIdx);
9494
if (cell.Size() > NLimits::MaxWriteValueSize) {
95-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize);
95+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize, self->TabletID());
9696
return;
9797
}
9898

@@ -136,7 +136,7 @@ class TWriteUnit : public TExecutionUnit {
136136
}
137137
else {
138138
//TODO: Prepared
139-
writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(writeOp->GetTabletId(), op->GetTxId(), {0, 0, {}}));
139+
writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(DataShard.TabletID(), op->GetTxId(), {0, 0, {}}));
140140
return EExecutionStatus::DelayCompleteNoMoreRestarts;
141141
}
142142

@@ -187,7 +187,7 @@ class TWriteUnit : public TExecutionUnit {
187187
Y_ABORT_UNLESS(writeOp != nullptr);
188188

189189
const auto& status = writeOp->GetWriteResult()->Record.status();
190-
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Completed write operation for " << *op << " at " << writeOp->GetTabletId() << ", status " << status);
190+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Completed write operation for " << *op << " at " << DataShard.TabletID() << ", status " << status);
191191

192192
//TODO: Counters
193193
// if (WriteResult->Record.status() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED || WriteResult->Record.status() == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED) {

0 commit comments

Comments
 (0)