Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions ydb/core/formats/arrow/arrow_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,10 @@ TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter
}

TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui32 expectedSize) const {
if ((IsTotalAllowFilter() || IsTotalDenyFilter()) && !Filter.size()) {
return TIterator(reverse, expectedSize, LastValue);
if (IsTotalAllowFilter()) {
return TIterator(reverse, expectedSize, true);
} else if (IsTotalDenyFilter()) {
return TIterator(reverse, expectedSize, false);
} else {
AFL_VERIFY(expectedSize == Size())("expected", expectedSize)("size", Size())("reverse", reverse);
return TIterator(reverse, Filter, GetStartValue(reverse));
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7903,10 +7903,10 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.ReadData("SELECT * FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[3;\"-321\";\"-3.14\";[\"test_res_3\"]]]");
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[\"-3.14\"]]");
testHelper.ReadData("SELECT resource_id FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[[\"test_res_3\"]]]");
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]");
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]");

testHelper.RebootTablets(testTable.GetName());
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]");
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]");
}

Y_UNIT_TEST(AddColumnErrors) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,6 @@ message TFeatureFlags {
optional bool EnableExternalDataSourcesOnServerless = 143 [default = true];
optional bool EnableSparsedColumns = 144 [default = false];
optional bool EnableParameterizedDecimal = 145 [default = false];
optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false];
optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false];
optional bool EnableInsertWriteIdSpecialColumnCompatibility = 147 [default = false];
}
67 changes: 34 additions & 33 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,10 @@ struct Schema : NIceDb::Schema {

struct BlobRangeOffset: Column<11, NScheme::NTypeIds::Uint64> {};
struct BlobRangeSize: Column<12, NScheme::NTypeIds::Uint64> {};
struct InsertWriteId: Column<13, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<Committed, PlanStep, WriteTxId, PathId, DedupId>;
using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize>;
using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize, InsertWriteId>;
};

struct IndexGranules : NIceDb::Schema::Table<GranulesTableId> {
Expand Down Expand Up @@ -808,6 +809,7 @@ struct Schema : NIceDb::Schema {
.Key((ui8)recType, 0, (ui64)data.GetInsertWriteId(), data.GetPathId(), "")
.Update(NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()),
NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange().Offset),
NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId()),
NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange().Size),
NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()),
NIceDb::TUpdate<InsertTable::SchemaVersion>(data.GetSchemaVersion()));
Expand All @@ -818,6 +820,7 @@ struct Schema : NIceDb::Schema {
.Key((ui8)EInsertTableIds::Committed, data.GetSnapshot().GetPlanStep(), data.GetSnapshot().GetTxId(), data.GetPathId(),
data.GetDedupId())
.Update(NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()),
NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId()),
NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange().Offset),
NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange().Size),
NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()),
Expand Down Expand Up @@ -982,15 +985,16 @@ class TInsertTableRecordLoadContext {
NColumnShard::Schema::EInsertTableIds RecType;
ui64 PlanStep;
ui64 WriteTxId;
TInsertWriteId InsertWriteId;
ui64 PathId;
YDB_ACCESSOR_DEF(TString, DedupId);
ui64 SchemaVersion;
TString BlobIdString;
std::optional<NOlap::TUnifiedBlobId> BlobId;
TString MetadataString;
std::optional<NKikimrTxColumnShard::TLogicalMetadata> Metadata;
std::optional<ui64> RangeOffset;
std::optional<ui64> RangeSize;
ui64 RangeOffset;
ui64 RangeSize;

void Prepare(const IBlobGroupSelector* dsGroupSelector) {
AFL_VERIFY(!PreparedFlag);
Expand All @@ -1004,7 +1008,6 @@ class TInsertTableRecordLoadContext {
AFL_VERIFY(MetadataString);
Y_ABORT_UNLESS(meta.ParseFromString(MetadataString));
Metadata = std::move(meta);
AFL_VERIFY(!!RangeOffset == !!RangeSize);
}

bool PreparedFlag = false;
Expand All @@ -1013,8 +1016,13 @@ class TInsertTableRecordLoadContext {
public:
TInsertWriteId GetInsertWriteId() const {
AFL_VERIFY(ParsedFlag);
AFL_VERIFY(RecType != NColumnShard::Schema::EInsertTableIds::Committed);
return (TInsertWriteId)WriteTxId;
return InsertWriteId;
}

ui64 GetTxId() const {
AFL_VERIFY(ParsedFlag);
AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed);
return WriteTxId;
}

NColumnShard::Schema::EInsertTableIds GetRecType() const {
Expand All @@ -1024,6 +1032,7 @@ class TInsertTableRecordLoadContext {

ui64 GetPlanStep() const {
AFL_VERIFY(ParsedFlag);
AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed);
return PlanStep;
}

Expand All @@ -1035,19 +1044,12 @@ class TInsertTableRecordLoadContext {
void Upsert(NIceDb::TNiceDb& db) const {
AFL_VERIFY(ParsedFlag);
using namespace NColumnShard;
if (RangeOffset) {
db.Table<Schema::InsertTable>()
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(*RangeOffset),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(*RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
} else {
db.Table<Schema::InsertTable>()
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
}
db.Table<Schema::InsertTable>()
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(RangeOffset),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
}

template <class TRowset>
Expand All @@ -1059,41 +1061,40 @@ class TInsertTableRecordLoadContext {
PlanStep = rowset.template GetValue<Schema::InsertTable::PlanStep>();
WriteTxId = rowset.template GetValueOrDefault<Schema::InsertTable::WriteTxId>();
AFL_VERIFY(WriteTxId);
InsertWriteId = (TInsertWriteId)rowset.template GetValueOrDefault<Schema::InsertTable::InsertWriteId>(WriteTxId);

PathId = rowset.template GetValue<Schema::InsertTable::PathId>();
DedupId = rowset.template GetValue<Schema::InsertTable::DedupId>();
SchemaVersion =
rowset.template HaveValue<Schema::InsertTable::SchemaVersion>() ? rowset.template GetValue<Schema::InsertTable::SchemaVersion>() : 0;
SchemaVersion = rowset.template GetValueOrDefault<Schema::InsertTable::SchemaVersion>(0);
BlobIdString = rowset.template GetValue<Schema::InsertTable::BlobId>();
MetadataString = rowset.template GetValue<Schema::InsertTable::Meta>();
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>()) {
RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>();
}
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>()) {
RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>();
}
AFL_VERIFY(rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>());
AFL_VERIFY(rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>());
RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>();
RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>();
}

NOlap::TCommittedData BuildCommitted(const IBlobGroupSelector* dsGroupSelector) {
Prepare(dsGroupSelector);
using namespace NColumnShard;
AFL_VERIFY(RecType == Schema::EInsertTableIds::Committed);
auto userData = std::make_shared<NOlap::TUserData>(PathId,
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
auto userData = std::make_shared<NOlap::TUserData>(
PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt);
AFL_VERIFY(!!DedupId);
AFL_VERIFY(PlanStep);
return NOlap::TCommittedData(userData, PlanStep, WriteTxId, DedupId);
return NOlap::TCommittedData(userData, PlanStep, WriteTxId, InsertWriteId, DedupId);
}

NOlap::TInsertedData BuildInsertedOrAborted(const IBlobGroupSelector* dsGroupSelector) {
Prepare(dsGroupSelector);
using namespace NColumnShard;
AFL_VERIFY(InsertWriteId == (TInsertWriteId)WriteTxId)("insert", InsertWriteId)("write", WriteTxId);
AFL_VERIFY(RecType != Schema::EInsertTableIds::Committed);
auto userData = std::make_shared<NOlap::TUserData>(PathId,
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
auto userData = std::make_shared<NOlap::TUserData>(
PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt);
AFL_VERIFY(!DedupId);
AFL_VERIFY(!PlanStep);
return NOlap::TInsertedData((TInsertWriteId)WriteTxId, userData);
return NOlap::TInsertedData(InsertWriteId, userData);
}
};

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/common/portion.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ class TSpecialColumns {
public:
static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step";
static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id";
static constexpr const char* SPEC_COL_WRITE_ID = "_yql_write_id";
static constexpr const char* SPEC_COL_DELETE_FLAG = "_yql_delete_flag";
static const ui32 SPEC_COL_PLAN_STEP_INDEX = 0xffffff00;
static const ui32 SPEC_COL_TX_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 1;
static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2;
static const ui32 SPEC_COL_WRITE_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2;
static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 3;
};

}
11 changes: 4 additions & 7 deletions ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
TMergingContext mergingContext(batchResults, Batches);

for (auto&& [columnId, columnData] : columnsData) {
if (columnId == (ui32)IIndexInfo::ESpecialColumn::WRITE_ID &&
(!HasAppData() || !AppDataVerified().FeatureFlags.GetEnableInsertWriteIdSpecialColumnCompatibility())) {
continue;
}
const TString& columnName = resultFiltered->GetIndexInfo().GetColumnName(columnId);
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("field_name", columnName));
auto columnInfo = stats->GetColumnInfo(columnId);
Expand Down Expand Up @@ -125,13 +129,6 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())(
"current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first);
}
auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx);
Y_ABORT_UNLESS(columnSnapshotTxIdx);
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);

std::vector<TGeneralSerializedSlice> batchSlices;
std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultFiltered, stats));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(
if (dataColumnIds.contains((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) {
pkColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
}
dataColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::WRITE_ID);
}
resultFiltered = std::make_shared<TFilteredSnapshotSchema>(resultSchema, dataColumnIds);
{
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
blobSchema->AdaptBatchToSchema(*batch, resultSchema);
}
IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot());
IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot(), (ui64)inserted.GetInsertWriteId());

auto& pathInfo = pathBatches.GetPathInfo(inserted.GetPathId());

Expand Down
Loading