@@ -306,9 +306,10 @@ struct Schema : NIceDb::Schema {
306306
307307 struct BlobRangeOffset : Column<11 , NScheme::NTypeIds::Uint64> {};
308308 struct BlobRangeSize : Column<12 , NScheme::NTypeIds::Uint64> {};
309+ struct InsertWriteId : Column<13 , NScheme::NTypeIds::Uint64> {};
309310
310311 using TKey = TableKey<Committed, PlanStep, WriteTxId, PathId, DedupId>;
311- using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize>;
312+ using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize, InsertWriteId >;
312313 };
313314
314315 struct IndexGranules : NIceDb::Schema::Table<GranulesTableId> {
@@ -808,6 +809,7 @@ struct Schema : NIceDb::Schema {
808809 .Key ((ui8)recType, 0 , (ui64)data.GetInsertWriteId (), data.GetPathId (), " " )
809810 .Update (NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange ().GetBlobId ().ToStringLegacy ()),
810811 NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange ().Offset ),
812+ NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId ()),
811813 NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange ().Size ),
812814 NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta ().SerializeToProto ().SerializeAsString ()),
813815 NIceDb::TUpdate<InsertTable::SchemaVersion>(data.GetSchemaVersion ()));
@@ -818,6 +820,7 @@ struct Schema : NIceDb::Schema {
818820 .Key ((ui8)EInsertTableIds::Committed, data.GetSnapshot ().GetPlanStep (), data.GetSnapshot ().GetTxId (), data.GetPathId (),
819821 data.GetDedupId ())
820822 .Update (NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange ().GetBlobId ().ToStringLegacy ()),
823+ NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId ()),
821824 NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange ().Offset ),
822825 NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange ().Size ),
823826 NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta ().SerializeToProto ().SerializeAsString ()),
@@ -982,15 +985,16 @@ class TInsertTableRecordLoadContext {
982985 NColumnShard::Schema::EInsertTableIds RecType;
983986 ui64 PlanStep;
984987 ui64 WriteTxId;
988+ TInsertWriteId InsertWriteId;
985989 ui64 PathId;
986990 YDB_ACCESSOR_DEF (TString, DedupId);
987991 ui64 SchemaVersion;
988992 TString BlobIdString;
989993 std::optional<NOlap::TUnifiedBlobId> BlobId;
990994 TString MetadataString;
991995 std::optional<NKikimrTxColumnShard::TLogicalMetadata> Metadata;
992- std::optional< ui64> RangeOffset;
993- std::optional< ui64> RangeSize;
996+ ui64 RangeOffset;
997+ ui64 RangeSize;
994998
995999 void Prepare (const IBlobGroupSelector* dsGroupSelector) {
9961000 AFL_VERIFY (!PreparedFlag);
@@ -1004,7 +1008,6 @@ class TInsertTableRecordLoadContext {
10041008 AFL_VERIFY (MetadataString);
10051009 Y_ABORT_UNLESS (meta.ParseFromString (MetadataString));
10061010 Metadata = std::move (meta);
1007- AFL_VERIFY (!!RangeOffset == !!RangeSize);
10081011 }
10091012
10101013 bool PreparedFlag = false ;
@@ -1013,8 +1016,13 @@ class TInsertTableRecordLoadContext {
10131016public:
10141017 TInsertWriteId GetInsertWriteId () const {
10151018 AFL_VERIFY (ParsedFlag);
1016- AFL_VERIFY (RecType != NColumnShard::Schema::EInsertTableIds::Committed);
1017- return (TInsertWriteId)WriteTxId;
1019+ return InsertWriteId;
1020+ }
1021+
1022+ ui64 GetTxId () const {
1023+ AFL_VERIFY (ParsedFlag);
1024+ AFL_VERIFY (RecType == NColumnShard::Schema::EInsertTableIds::Committed);
1025+ return WriteTxId;
10181026 }
10191027
10201028 NColumnShard::Schema::EInsertTableIds GetRecType () const {
@@ -1024,6 +1032,7 @@ class TInsertTableRecordLoadContext {
10241032
10251033 ui64 GetPlanStep () const {
10261034 AFL_VERIFY (ParsedFlag);
1035+ AFL_VERIFY (RecType == NColumnShard::Schema::EInsertTableIds::Committed);
10271036 return PlanStep;
10281037 }
10291038
@@ -1035,19 +1044,12 @@ class TInsertTableRecordLoadContext {
10351044 void Upsert (NIceDb::TNiceDb& db) const {
10361045 AFL_VERIFY (ParsedFlag);
10371046 using namespace NColumnShard ;
1038- if (RangeOffset) {
1039- db.Table <Schema::InsertTable>()
1040- .Key ((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
1041- .Update (NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
1042- NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(*RangeOffset),
1043- NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(*RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
1044- NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
1045- } else {
1046- db.Table <Schema::InsertTable>()
1047- .Key ((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
1048- .Update (NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
1049- NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
1050- }
1047+ db.Table <Schema::InsertTable>()
1048+ .Key ((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
1049+ .Update (NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
1050+ NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(RangeOffset),
1051+ NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
1052+ NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
10511053 }
10521054
10531055 template <class TRowset >
@@ -1059,41 +1061,40 @@ class TInsertTableRecordLoadContext {
10591061 PlanStep = rowset.template GetValue <Schema::InsertTable::PlanStep>();
10601062 WriteTxId = rowset.template GetValueOrDefault <Schema::InsertTable::WriteTxId>();
10611063 AFL_VERIFY (WriteTxId);
1064+ InsertWriteId = (TInsertWriteId)rowset.template GetValueOrDefault <Schema::InsertTable::InsertWriteId>(WriteTxId);
10621065
10631066 PathId = rowset.template GetValue <Schema::InsertTable::PathId>();
10641067 DedupId = rowset.template GetValue <Schema::InsertTable::DedupId>();
1065- SchemaVersion =
1066- rowset.template HaveValue <Schema::InsertTable::SchemaVersion>() ? rowset.template GetValue <Schema::InsertTable::SchemaVersion>() : 0 ;
1068+ SchemaVersion = rowset.template GetValueOrDefault <Schema::InsertTable::SchemaVersion>(0 );
10671069 BlobIdString = rowset.template GetValue <Schema::InsertTable::BlobId>();
10681070 MetadataString = rowset.template GetValue <Schema::InsertTable::Meta>();
1069- if (rowset.template HaveValue <Schema::InsertTable::BlobRangeOffset>()) {
1070- RangeOffset = rowset.template GetValue <Schema::InsertTable::BlobRangeOffset>();
1071- }
1072- if (rowset.template HaveValue <Schema::InsertTable::BlobRangeSize>()) {
1073- RangeSize = rowset.template GetValue <Schema::InsertTable::BlobRangeSize>();
1074- }
1071+ AFL_VERIFY (rowset.template HaveValue <Schema::InsertTable::BlobRangeOffset>());
1072+ AFL_VERIFY (rowset.template HaveValue <Schema::InsertTable::BlobRangeSize>());
1073+ RangeOffset = rowset.template GetValue <Schema::InsertTable::BlobRangeOffset>();
1074+ RangeSize = rowset.template GetValue <Schema::InsertTable::BlobRangeSize>();
10751075 }
10761076
10771077 NOlap::TCommittedData BuildCommitted (const IBlobGroupSelector* dsGroupSelector) {
10781078 Prepare (dsGroupSelector);
10791079 using namespace NColumnShard ;
10801080 AFL_VERIFY (RecType == Schema::EInsertTableIds::Committed);
1081- auto userData = std::make_shared<NOlap::TUserData>(PathId,
1082- NOlap::TBlobRange (*BlobId, RangeOffset. value_or ( 0 ) , RangeSize. value_or (BlobId-> BlobSize ()) ), *Metadata, SchemaVersion, std::nullopt );
1081+ auto userData = std::make_shared<NOlap::TUserData>(
1082+ PathId, NOlap::TBlobRange (*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt );
10831083 AFL_VERIFY (!!DedupId);
10841084 AFL_VERIFY (PlanStep);
1085- return NOlap::TCommittedData (userData, PlanStep, WriteTxId, DedupId);
1085+ return NOlap::TCommittedData (userData, PlanStep, WriteTxId, InsertWriteId, DedupId);
10861086 }
10871087
10881088 NOlap::TInsertedData BuildInsertedOrAborted (const IBlobGroupSelector* dsGroupSelector) {
10891089 Prepare (dsGroupSelector);
10901090 using namespace NColumnShard ;
1091+ AFL_VERIFY (InsertWriteId == (TInsertWriteId)WriteTxId)(" insert" , InsertWriteId)(" write" , WriteTxId);
10911092 AFL_VERIFY (RecType != Schema::EInsertTableIds::Committed);
1092- auto userData = std::make_shared<NOlap::TUserData>(PathId,
1093- NOlap::TBlobRange (*BlobId, RangeOffset. value_or ( 0 ) , RangeSize. value_or (BlobId-> BlobSize ()) ), *Metadata, SchemaVersion, std::nullopt );
1093+ auto userData = std::make_shared<NOlap::TUserData>(
1094+ PathId, NOlap::TBlobRange (*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt );
10941095 AFL_VERIFY (!DedupId);
10951096 AFL_VERIFY (!PlanStep);
1096- return NOlap::TInsertedData ((TInsertWriteId)WriteTxId , userData);
1097+ return NOlap::TInsertedData (InsertWriteId , userData);
10971098 }
10981099};
10991100
0 commit comments