Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 69 additions & 6 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1256,12 +1256,72 @@ void TColumnShard::Handle(NOlap::NDataSharing::NEvents::TEvFinishedFromSource::T
}
};

class TPortionConstructorV2 {
private:
NOlap::TPortionInfo::TConstPtr PortionInfo;
std::optional<NOlap::TColumnChunkLoadContextV2> Records;
std::optional<std::vector<NOlap::TIndexChunkLoadContext>> Indexes;

public:
TPortionConstructorV2(const NOlap::TPortionInfo::TConstPtr& portionInfo)
: PortionInfo(portionInfo) {
}

void SetRecords(NOlap::TColumnChunkLoadContextV2&& records) {
AFL_VERIFY(!Records);
Records = std::move(records);
}

void SetIndexes(std::vector<NOlap::TIndexChunkLoadContext>&& indexes) {
AFL_VERIFY(!Indexes);
Indexes = std::move(indexes);
}

NOlap::TPortionDataAccessor BuildAccessor() {
AFL_VERIFY(PortionInfo && Records && Indexes);
std::vector<NOlap::TColumnChunkLoadContextV1> records = Records->BuildRecordsV1();
return NOlap::TPortionAccessorConstructor::BuildForLoading(std::move(PortionInfo), std::move(records), std::move(*Indexes));
}
};

class TAccessorsParsingTask: public NConveyor::ITask {
private:
std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback> FetchCallback;
std::vector<TPortionConstructorV2> Portions;

virtual TConclusionStatus DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) override {
std::vector<NOlap::TPortionDataAccessor> accessors;
accessors.reserve(Portions.size());
for (auto&& i : Portions) {
accessors.emplace_back(i.BuildAccessor());
}
FetchCallback->OnAccessorsFetched(std::move(accessors));
return TConclusionStatus::Success();
}
virtual void DoOnCannotExecute(const TString& reason) override {
AFL_VERIFY(false)("cannot parse metadata", reason);
}

public:
virtual TString GetTaskClassIdentifier() const override {
return "ASKED_METADATA_PARSER";
}

TAccessorsParsingTask(
const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& callback, std::vector<TPortionConstructorV2>&& portions)
: FetchCallback(callback)
, Portions(std::move(portions))
{

}
};

class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
private:
using TBase = TTransactionBase<TColumnShard>;
std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback> FetchCallback;
THashMap<ui64, std::vector<NOlap::TPortionInfo::TConstPtr>> PortionsByPath;
std::vector<NOlap::TPortionDataAccessor> FetchedAccessors;
std::vector<TPortionConstructorV2> FetchedAccessors;

public:
TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& fetchCallback,
Expand All @@ -1275,6 +1335,7 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {

bool Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) override {
NIceDb::TNiceDb db(txc.DB);

TBlobGroupSelector selector(Self->Info());
bool reask = false;
for (auto&& i : PortionsByPath) {
Expand Down Expand Up @@ -1302,21 +1363,22 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "processing")("size", i.second.size())("path_id", i.first);
while (i.second.size()) {
auto p = i.second.back();
std::vector<NOlap::TColumnChunkLoadContextV1> records;
std::vector<NOlap::TIndexChunkLoadContext> indexes;
TPortionConstructorV2 constructor(p);
{
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
return false;
}
while (!rowset.EndOfSet()) {
NOlap::TColumnChunkLoadContextV1::BuildFromDBV2(rowset, records);
NOlap::TColumnChunkLoadContextV2 info(rowset);
constructor.SetRecords(std::move(info));
if (!rowset.Next()) {
return false;
}
}
}
{
std::vector<NOlap::TIndexChunkLoadContext> indexes;
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
return false;
Expand All @@ -1327,16 +1389,17 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
return false;
}
}
constructor.SetIndexes(std::move(indexes));
}
FetchedAccessors.emplace_back(NOlap::TPortionAccessorConstructor::BuildForLoading(p, std::move(records), std::move(indexes)));
FetchedAccessors.emplace_back(std::move(constructor));
i.second.pop_back();
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished")("size", i.second.size())(
"path_id", i.first);
}

AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished");
FetchCallback->OnAccessorsFetched(std::move(FetchedAccessors));
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(std::make_shared<TAccessorsParsingTask>(FetchCallback, std::move(FetchedAccessors)));
return true;
}
void Complete(const TActorContext& /*ctx*/) override {
Expand Down
45 changes: 31 additions & 14 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,10 @@ class TColumnChunkLoadContext {
YDB_READONLY(TSnapshot, MinSnapshotDeprecated, TSnapshot::Zero());

public:
TPortionAddress GetPortionAddress() const {
return TPortionAddress(PathId, PortionId);
}

const TChunkAddress& GetAddress() const {
return Address;
}
Expand Down Expand Up @@ -1012,20 +1016,6 @@ class TColumnChunkLoadContextV1 {
return TPortionAddress(PathId, PortionId);
}

template <class TSource>
static void BuildFromDBV2(const TSource& rowset, std::vector<TColumnChunkLoadContextV1>& records) {
const ui64 pathId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PathId>();
const ui64 portionId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PortionId>();
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::Metadata>();
NKikimrTxColumnShard::TIndexPortionAccessor metaProto;
AFL_VERIFY(metaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
for (auto&& i : metaProto.GetChunks()) {
TColumnChunkLoadContextV1 result(pathId, portionId, TChunkAddress(i.GetSSColumnId(), i.GetChunkIdx()),
TBlobRangeLink16::BuildFromProto(i.GetBlobRangeLink()).DetachResult(), i.GetChunkMetadata());
records.emplace_back(std::move(result));
}
}

NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const {
NKikimrTxColumnShard::TColumnChunkInfo proto;
proto.SetSSColumnId(Address.GetColumnId());
Expand Down Expand Up @@ -1068,6 +1058,33 @@ class TColumnChunkLoadContextV1 {
}
};

class TColumnChunkLoadContextV2 {
private:
YDB_READONLY(ui64, PathId, 0);
YDB_READONLY(ui64, PortionId, 0);
YDB_READONLY_DEF(TString, MetadataProto);

public:
template <class TSource>
TColumnChunkLoadContextV2(const TSource& rowset) {
PathId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PathId>();
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PortionId>();
MetadataProto = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::Metadata>();
}

std::vector<TColumnChunkLoadContextV1> BuildRecordsV1() const {
std::vector<TColumnChunkLoadContextV1> records;
NKikimrTxColumnShard::TIndexPortionAccessor metaProto;
AFL_VERIFY(metaProto.ParseFromArray(MetadataProto.data(), MetadataProto.size()))("event", "cannot parse metadata as protobuf");
for (auto&& i : metaProto.GetChunks()) {
TColumnChunkLoadContextV1 result(PathId, PortionId, TChunkAddress(i.GetSSColumnId(), i.GetChunkIdx()),
TBlobRangeLink16::BuildFromProto(i.GetBlobRangeLink()).DetachResult(), i.GetChunkMetadata());
records.emplace_back(std::move(result));
}
return records;
}
};

class TIndexChunkLoadContext {
private:
YDB_READONLY_DEF(std::optional<TBlobRange>, BlobRange);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/db_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ void TDbWrapper::WritePortion(const NOlap::TPortionInfo& portion) {

void TDbWrapper::ErasePortion(const NOlap::TPortionInfo& portion) {
NIceDb::TNiceDb db(Database);
using IndexPortions = NColumnShard::Schema::IndexPortions;
db.Table<IndexPortions>().Key(portion.GetPathId(), portion.GetPortionId()).Delete();
db.Table<NColumnShard::Schema::IndexPortions>().Key(portion.GetPathId(), portion.GetPortionId()).Delete();
db.Table<NColumnShard::Schema::IndexColumnsV2>().Key(portion.GetPathId(), portion.GetPortionId()).Delete();
}

void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ enum class ENormalizerSequentialId: ui32 {
TablesCleaner,
DeprecatedPortionsMetadata,
CleanGranuleId,
EmptyPortionsCleaner,
DeprecatedEmptyPortionsCleaner,
CleanInsertionDedup,
GCCountersNormalizer,
RestorePortionFromChunks,
Expand Down
Loading
Loading