Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions ydb/core/tx/datashard/datashard__engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,16 +585,7 @@ class TDataShardEngineHost final
ui64 GetTableSchemaVersion(const TTableId& tableId) const override {
if (TSysTables::IsSystemTable(tableId))
return 0;
const auto& userTables = Self->GetUserTables();
auto it = userTables.find(tableId.PathId.LocalPathId);
if (it == userTables.end()) {
Y_FAIL_S("DatshardEngineHost (tablet id: " << Self->TabletID()
<< " state: " << Self->GetState()
<< ") unables to find given table with id: " << tableId);
return 0;
} else {
return it->second->GetTableSchemaVersion();
}
return GetKeyValidator().GetTableSchemaVersion(tableId);
}

ui64 GetWriteTxId(const TTableId& tableId) const override {
Expand Down Expand Up @@ -997,6 +988,13 @@ class TDataShardEngineHost final
return static_cast<const TDataShardSysTables *>(Self->GetDataShardSysTables())->Get(tableId);
}

TKeyValidator& GetKeyValidator() {
return EngineBay.GetKeyValidator();
}
const TKeyValidator& GetKeyValidator() const {
return EngineBay.GetKeyValidator();
}

TDataShard* Self;
TEngineBay& EngineBay;
NTable::TDatabase& DB;
Expand All @@ -1023,6 +1021,7 @@ class TDataShardEngineHost final
TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActorContext& ctx,
std::pair<ui64, ui64> stepTxId)
: StepTxId(stepTxId)
, KeyValidator(*self, txc.DB)
, LockTxId(0)
, LockNodeId(0)
{
Expand Down
17 changes: 5 additions & 12 deletions ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,11 @@ void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NPro

ui32 TValidatedWriteTx::ExtractKeys(bool allowErrors)
{
using EResult = NMiniKQL::IEngineFlat::EResult;
SetTxKeys(RecordOperation().GetColumnIds());

bool isValid = ReValidateKeys();
Y_ABORT_UNLESS(allowErrors || isValid, "Validation errors: %s", ErrStr.data());

EResult result = EngineBay.Validate();
if (allowErrors) {
if (result != EResult::Ok) {
ErrStr = EngineBay.GetEngine()->GetErrors();
ErrCode = ConvertErrCode(result);
return 0;
}
} else {
Y_ABORT_UNLESS(result == EResult::Ok, "Engine errors: %s", EngineBay.GetEngine()->GetErrors().data());
}
return KeysCount();
}

Expand All @@ -212,7 +205,7 @@ bool TValidatedWriteTx::ReValidateKeys()
using EResult = NMiniKQL::IEngineFlat::EResult;


auto [result, error] = EngineBay.GetKqpComputeCtx().ValidateKeys(EngineBay.TxInfo());
auto [result, error] = GetKeyValidator().ValidateKeys();
if (result != EResult::Ok) {
ErrStr = std::move(error);
ErrCode = ConvertErrCode(result);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_write_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class TValidatedWriteTx: TNonCopyable {
}

bool ParseRecord(const TDataShard::TTableInfos& tableInfos);
void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds);
void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds);

ui32 ExtractKeys(bool allowErrors);
bool ReValidateKeys();
Expand Down
67 changes: 65 additions & 2 deletions ydb/core/tx/datashard/key_validator.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
#include "key_validator.h"
#include "ydb/core/base/appdata_fwd.h"
#include "datashard_impl.h"
#include "range_ops.h"


#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/core/tx/datashard/range_ops.h>



using namespace NKikimr;
using namespace NKikimr::NDataShard;

TKeyValidator::TKeyValidator(const TDataShard& self, const NTable::TDatabase& db)
: Self(self)
, Db(db)
{

}

void TKeyValidator::AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit, bool reverse)
{
TVector<TKeyDesc::TColumnOp> columnOps;
Expand Down Expand Up @@ -56,6 +65,60 @@ void TKeyValidator::AddWriteRange(const TTableId& tableId, const TTableRange& ra
Info.SetLoaded();
}

bool TKeyValidator::IsValidKey(TKeyDesc& key) const {
ui64 localTableId = Self.GetLocalTableId(key.TableId);
return NMiniKQL::IsValidKey(Db.GetScheme(), localTableId, key);
}

ui64 TKeyValidator::GetTableSchemaVersion(const TTableId& tableId) const {
if (TSysTables::IsSystemTable(tableId))
return 0;

const auto& userTables = Self.GetUserTables();
auto it = userTables.find(tableId.PathId.LocalPathId);
if (it == userTables.end()) {
Y_FAIL_S("TKeyValidator (tablet id: " << Self.TabletID() << " state: " << Self.GetState() << ") unable to find given table with id: " << tableId);
return 0;
} else {
return it->second->GetTableSchemaVersion();
}
}

std::tuple<NMiniKQL::IEngineFlat::EResult, TString> TKeyValidator::ValidateKeys() const {
using EResult = NMiniKQL::IEngineFlat::EResult;

for (const auto& validKey : Info.Keys) {
TKeyDesc* key = validKey.Key.get();

bool valid = IsValidKey(*key);

if (valid) {
auto curSchemaVersion = GetTableSchemaVersion(key->TableId);
if (key->TableId.SchemaVersion && curSchemaVersion && curSchemaVersion != key->TableId.SchemaVersion) {
auto error = TStringBuilder()
<< "Schema version mismatch for table id: " << key->TableId
<< " key table version: " << key->TableId.SchemaVersion
<< " current table version: " << curSchemaVersion;
return {EResult::SchemeChanged, std::move(error)};
}
} else {
switch (key->Status) {
case TKeyDesc::EStatus::SnapshotNotExist:
return {EResult::SnapshotNotExist, ""};
case TKeyDesc::EStatus::SnapshotNotReady:
key->Status = TKeyDesc::EStatus::Ok;
return {EResult::SnapshotNotReady, ""};
default:
auto error = TStringBuilder()
<< "Validate (" << __LINE__ << "): Key validation status: " << (ui32)key->Status;
return {EResult::KeyError, std::move(error)};
}
}
}

return {EResult::Ok, ""};
}

NMiniKQL::IEngineFlat::TValidationInfo& TKeyValidator::GetInfo() {
return Info;
}
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/tx/datashard/key_validator.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
#pragma once

#include <ydb/core/tablet_flat/flat_database.h>

#include <ydb/core/engine/mkql_engine_flat.h>
#include <ydb/core/scheme_types/scheme_type_registry.h>
#include <ydb/core/tablet_flat/flat_table_column.h>

namespace NKikimr::NDataShard {

class TDataShard;

class TKeyValidator {
public:
TKeyValidator(const TDataShard& self, const NTable::TDatabase& db);

struct TColumnWriteMeta {
NTable::TColumn Column;
Expand All @@ -17,10 +21,18 @@ class TKeyValidator {

void AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit = 0, bool reverse = false);
void AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, const TVector<TColumnWriteMeta>& columns, bool isPureEraseOp);

bool IsValidKey(TKeyDesc& key) const;
std::tuple<NMiniKQL::IEngineFlat::EResult, TString> ValidateKeys() const;

ui64 GetTableSchemaVersion(const TTableId& tableId) const;

NMiniKQL::IEngineFlat::TValidationInfo& GetInfo();
const NMiniKQL::IEngineFlat::TValidationInfo& GetInfo() const;
private:
const TDataShard& Self;
const NTable::TDatabase& Db;

NMiniKQL::IEngineFlat::TValidationInfo Info;
};

Expand Down
16 changes: 16 additions & 0 deletions ydb/core/tx/datashard/write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ class TWriteUnit : public TExecutionUnit {
TDataShardLocksDb locksDb(DataShard, txc);
TSetupSysLocks guardLocks(op, DataShard, &locksDb);

const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();

if (op->IsImmediate() && !writeOp->ReValidateKeys()) {
// Immediate transactions may be reordered with schema changes and become invalid
Y_ABORT_UNLESS(!writeTx->Ready());
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, writeTx->GetErrStr());
return EExecutionStatus::Executed;
}

if (writeTx->CheckCancelled()) {
writeOp->ReleaseTxData(txc, ctx);
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED, "Tx was cancelled");
DataShard.IncCounter(COUNTER_WRITE_CANCELLED);
return EExecutionStatus::Executed;
}

try {
DoExecute(&DataShard, writeOp, txc, ctx);
} catch (const TNeedGlobalTxId&) {
Expand Down