Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/check_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,

DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE);

writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID());
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
Expand All @@ -88,7 +88,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,

DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE);

writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID());
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
Expand All @@ -105,7 +105,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
if (!Pipeline.AssignPlanInterval(op)) {
TString err = TStringBuilder() << "Can't propose tx " << op->GetTxId() << " at blocked shard " << DataShard.TabletID();

writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID());
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1584,7 +1584,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
Y_ABORT_UNLESS(writeTx);

auto badRequest = [&](const TString& error) {
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << " at tablet# " << Self->TabletID(), Self->TabletID());
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << " at tablet# " << Self->TabletID());
LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error);
};

Expand Down
36 changes: 19 additions & 17 deletions ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ namespace NDataShard {
TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite::TPtr& ev)
: Ev(ev)
, EngineBay(self, txc, ctx, stepTxId.ToPair())
, TabletId(self->TabletID())
, Ctx(ctx)
, StepTxId(stepTxId)
, ReceivedAt(receivedAt)
, TxSize(0)
Expand All @@ -35,16 +37,14 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc,
if (Immediate())
EngineBay.SetIsImmediateTx();

auto& typeRegistry = *AppData()->TypeRegistry;

NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;

LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Parsing write transaction for " << StepTxId << " at " << self->TabletID() << ", record: " << GetRecord().ShortDebugString());
LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Parsing write transaction for " << StepTxId << " at " << TabletId << ", record: " << GetRecord().ShortDebugString());

if (!ParseRecord(self->TableInfos))
return;

SetTxKeys(RecordOperation().GetColumnIds(), typeRegistry, self->TabletID(), ctx);
SetTxKeys(RecordOperation().GetColumnIds());

KqpSetTxLocksKeys(GetKqpLocks(), self->SysLocksTable(), EngineBay);
EngineBay.MarkTxLoaded();
Expand Down Expand Up @@ -176,15 +176,15 @@ TVector<TKeyValidator::TColumnWriteMeta> GetColumnWrites(const ::google::protobu
return writeColumns;
}

void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags, const NScheme::TTypeRegistry& typeRegistry, ui64 tabletId, const TActorContext& ctx)
void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags)
{
TVector<TCell> keyCells;
for (ui32 rowIdx = 0; rowIdx <Matrix.GetRowCount(); ++rowIdx)
{
Matrix.GetSubmatrix(rowIdx, rowIdx, 0, TableInfo->KeyColumnIds.size() - 1, keyCells);

LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo->Path << ", shard: " << tabletId << ", "
<< "write point " << DebugPrintPoint(TableInfo->KeyColumnTypes, keyCells, typeRegistry));
LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo->Path << ", shard: " << TabletId << ", "
<< "write point " << DebugPrintPoint(TableInfo->KeyColumnTypes, keyCells, *AppData()->TypeRegistry));
TTableRange tableRange(keyCells);
EngineBay.GetKeyValidator().AddWriteRange(TableId, tableRange, TableInfo->KeyColumnTypes, GetColumnWrites(columnTags), false);
}
Expand Down Expand Up @@ -254,6 +254,8 @@ TWriteOperation* TWriteOperation::CastWriteOperation(TOperation::TPtr op)
TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx)
: TOperation(op)
, Ev(ev)
, TabletId(self->TabletID())
, Ctx(ctx)
, ArtifactFlags(0)
, TxCacheUsage(0)
, ReleasedTxDataSize(0)
Expand Down Expand Up @@ -350,11 +352,11 @@ void TWriteOperation::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase&
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << GetTxId() << " released its data");
}

void TWriteOperation::DbStoreLocksAccessLog(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx)
void TWriteOperation::DbStoreLocksAccessLog(NTable::TDatabase& txcDb)
{
using Schema = TDataShard::Schema;

NIceDb::TNiceDb db(txc.DB);
NIceDb::TNiceDb db(txcDb);

using TLocksVector = TVector<TSysTables::TLocksTable::TPersistentLock>;
TLocksVector vec;
Expand All @@ -368,17 +370,17 @@ void TWriteOperation::DbStoreLocksAccessLog(ui64 tabletId, TTransactionContext&
TStringBuf vecData(vecDataStart, vecDataSize);
db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update(NIceDb::TUpdate<Schema::TxArtifacts::Locks>(vecData));

LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing " << vec.size() << " locks for txid=" << GetTxId() << " in " << tabletId);
LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Storing " << vec.size() << " locks for txid=" << GetTxId() << " in " << TabletId);
}

void TWriteOperation::DbStoreArtifactFlags(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx)
void TWriteOperation::DbStoreArtifactFlags(NTable::TDatabase& txcDb)
{
using Schema = TDataShard::Schema;

NIceDb::TNiceDb db(txc.DB);
NIceDb::TNiceDb db(txcDb);
db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update<Schema::TxArtifacts::Flags>(ArtifactFlags);

LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " in " << tabletId);
LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " in " << TabletId);
}

ui64 TWriteOperation::GetMemoryConsumption() const {
Expand Down Expand Up @@ -418,7 +420,7 @@ ERestoreDataStatus TWriteOperation::RestoreTxData(

TVector<TSysTables::TLocksTable::TLock> locks;
if (!IsImmediate() && !HasVolatilePrepareFlag()) {
NIceDb::TNiceDb db(txc.DB);ExtractKeys
NIceDb::TNiceDb db(txc.DB);
bool ok = self->TransQueue.LoadTxDetails(db, GetTxId(), Target, Ev, locks, ArtifactFlags);
if (!ok) {
Ev.Reset();
Expand All @@ -437,7 +439,7 @@ ERestoreDataStatus TWriteOperation::RestoreTxData(
bool extractKeys = WriteTx->IsTxInfoLoaded();
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, ctx, GetStepOrder(), GetReceivedAt(), Ev);
if (WriteTx->Ready() && extractKeys) {
WriteTx->ExtractKeys(true);
WriteTx->ExtractKeys();
}

if (!WriteTx->Ready()) {
Expand Down Expand Up @@ -521,9 +523,9 @@ void TWriteOperation::UntrackMemory() const {
NActors::NMemory::TLabel<MemoryLabelActiveTransactionBody>::Sub(GetRecord().SpaceUsed());
}

void TWriteOperation::SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg, ui64 tabletId) {
void TWriteOperation::SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg) {
SetAbortedFlag();
WriteResult = NEvents::TDataEvents::TEvWriteResult::BuildError(tabletId, GetTxId(), status, errorMsg);
WriteResult = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletId, GetTxId(), status, errorMsg);
}

void TWriteOperation::SetWriteResult(std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& writeResult) {
Expand Down
27 changes: 18 additions & 9 deletions ydb/core/tx/datashard/datashard_write_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ class TValidatedWriteTx: TNonCopyable {
return TxInfo().DynKeysCount != 0;
}

// TODO: It's an expensive operation (Precharge() inside). We need avoid it.
TEngineBay::TSizes CalcReadSizes(bool needsTotalKeysSize) const {
return EngineBay.CalcSizes(needsTotalKeysSize);
}

ui64 GetMemoryAllocated() const {
return EngineBay.GetEngine() ? EngineBay.GetEngine()->GetMemoryAllocated() : 0;
}
Expand All @@ -92,6 +87,14 @@ class TValidatedWriteTx: TNonCopyable {
void DestroyEngine() {
EngineBay.DestroyEngine();
}

TKeyValidator& GetKeyValidator() {
return EngineBay.GetKeyValidator();
}
const TKeyValidator& GetKeyValidator() const {
return EngineBay.GetKeyValidator();
}

const NMiniKQL::TEngineHostCounters& GetCounters() {
return EngineBay.GetCounters();
}
Expand Down Expand Up @@ -145,7 +148,7 @@ class TValidatedWriteTx: TNonCopyable {
}

bool ParseRecord(const TDataShard::TTableInfos& tableInfos);
void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds, const NScheme::TTypeRegistry& typeRegistry, ui64 tabletId, const TActorContext& ctx);
void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds);

ui32 ExtractKeys(bool allowErrors);
bool ReValidateKeys();
Expand Down Expand Up @@ -175,6 +178,9 @@ class TValidatedWriteTx: TNonCopyable {
const NEvents::TDataEvents::TEvWrite::TPtr& Ev;
TEngineBay EngineBay;

const ui64 TabletId;
const TActorContext& Ctx;

YDB_ACCESSOR_DEF(TActorId, Source);

YDB_READONLY(TStepOrder, StepTxId, TStepOrder(0, 0));
Expand Down Expand Up @@ -262,8 +268,8 @@ class TWriteOperation : public TOperation {
return ArtifactFlags & LOCKS_STORED;
}

void DbStoreLocksAccessLog(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx);
void DbStoreArtifactFlags(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx);
void DbStoreLocksAccessLog(NTable::TDatabase& txcDb);
void DbStoreArtifactFlags(NTable::TDatabase& txcDb);

ui64 GetMemoryConsumption() const;

Expand Down Expand Up @@ -335,7 +341,7 @@ class TWriteOperation : public TOperation {
return std::move(WriteResult);
}

void SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg, ui64 tabletId);
void SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg);
void SetWriteResult(std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& writeResult);

private:
Expand All @@ -347,6 +353,9 @@ class TWriteOperation : public TOperation {
TValidatedWriteTx::TPtr WriteTx;
std::unique_ptr<NEvents::TDataEvents::TEvWriteResult> WriteResult;

const ui64 TabletId;
const TActorContext& Ctx;

YDB_READONLY_DEF(ui64, ArtifactFlags);
YDB_ACCESSOR_DEF(ui64, TxCacheUsage);
YDB_ACCESSOR_DEF(ui64, ReleasedTxDataSize);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class TWriteUnit : public TExecutionUnit {
const TTableId fullTableId(self->GetPathOwnerId(), tableId);
const ui64 localTableId = self->GetLocalTableId(fullTableId);
if (localTableId == 0) {
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId, self->TabletID());
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId);
return;
}
const ui64 shadowTableId = self->GetShadowTableId(fullTableId);
Expand Down