Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class TTxUpdateSchema: public TTransactionBase<TColumnShard> {
};

bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard");
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
"process", "TTxUpdateSchema::Execute");
ACFL_INFO("step", "TTxUpdateSchema.Execute_Start")("details", Self->NormalizerController.DebugString());

while (!Self->NormalizerController.IsNormalizationFinished()) {
Expand All @@ -153,6 +153,8 @@ bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
}

void TTxUpdateSchema::Complete(const TActorContext& ctx) {
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("process", "TTxUpdateSchema::Complete");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxUpdateSchema.Complete");
Self->Counters.GetCSCounters().Initialization.OnTxUpdateSchemaFinished(TMonotonic::Now() - StartInstant);
if (NormalizerTasks.empty()) {
Expand Down Expand Up @@ -185,32 +187,34 @@ class TTxApplyNormalizer: public TTransactionBase<TColumnShard> {
}

private:
bool NormalizerFinished = false;
NOlap::INormalizerChanges::TPtr Changes;
};

bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard");
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "TTxApplyNormalizer::Execute");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Execute")("details", Self->NormalizerController.DebugString());
if (!Changes->ApplyOnExecute(txc, Self->NormalizerController)) {
return false;
}

if (Self->NormalizerController.GetNormalizer()->GetActiveTasksCount() == 1) {
if (Self->NormalizerController.GetNormalizer()->DecActiveCounters() == 0) {
NormalizerFinished = true;
NIceDb::TNiceDb db(txc.DB);
Self->NormalizerController.OnNormalizerFinished(db);
}
return true;
}

void TTxApplyNormalizer::Complete(const TActorContext& ctx) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Complete")("tablet_id", Self->TabletID())("event", "initialize_shard");
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
"event", "TTxApplyNormalizer::Complete");
AFL_VERIFY(!Self->NormalizerController.IsNormalizationFinished())("details", Self->NormalizerController.DebugString());
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "apply_normalizer_changes")(
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "apply_normalizer_changes")(
"details", Self->NormalizerController.DebugString())("size", Changes->GetSize());
Changes->ApplyOnComplete(Self->NormalizerController);
Self->NormalizerController.GetNormalizer()->OnResultReady();
if (Self->NormalizerController.GetNormalizer()->HasActiveTasks()) {
if (!NormalizerFinished) {
return;
}

Expand Down Expand Up @@ -240,6 +244,8 @@ class TTxInitSchema: public TTransactionBase<TColumnShard> {
};

bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
"process", "TTxInitSchema::Execute");
LOG_S_DEBUG("TxInitSchema.Execute at tablet " << Self->TabletID());

const bool isFirstRun = txc.DB.GetScheme().IsEmpty();
Expand Down Expand Up @@ -286,6 +292,8 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
}

void TTxInitSchema::Complete(const TActorContext& ctx) {
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("process", "TTxInitSchema::Complete");
Self->Counters.GetCSCounters().Initialization.OnTxInitSchemaFinished(TMonotonic::Now() - StartInstant);
LOG_S_DEBUG("TxInitSchema.Complete at tablet " << Self->TabletID(););
Self->Execute(new TTxUpdateSchema(Self), ctx);
Expand Down
25 changes: 20 additions & 5 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1278,10 +1278,19 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
TBlobGroupSelector selector(Self->Info());
bool reask = false;
for (auto&& i : PortionsByPath) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("size", i.second.size())("path_id", i.first);
for (auto&& p : i.second) {
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
reask = true;
{
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
reask = true;
}
}
{
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
reask = true;
}
}
}
}
Expand All @@ -1290,17 +1299,18 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
}

for (auto&& i : PortionsByPath) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "processing")("size", i.second.size())("path_id", i.first);
while (i.second.size()) {
auto p = i.second.back();
std::vector<NOlap::TColumnChunkLoadContextV1> records;
std::vector<NOlap::TIndexChunkLoadContext> indexes;
{
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
return false;
}
while (!rowset.EndOfSet()) {
records.emplace_back(NOlap::TColumnChunkLoadContextV1(rowset));
NOlap::TColumnChunkLoadContextV1::BuildFromDBV2(rowset, records);
if (!rowset.Next()) {
return false;
}
Expand All @@ -1321,8 +1331,11 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
FetchedAccessors.emplace_back(NOlap::TPortionAccessorConstructor::BuildForLoading(p, std::move(records), std::move(indexes)));
i.second.pop_back();
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished")("size", i.second.size())(
"path_id", i.first);
}

AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished");
FetchCallback->OnAccessorsFetched(std::move(FetchedAccessors));
return true;
}
Expand Down Expand Up @@ -1439,7 +1452,9 @@ void TColumnShard::Enqueue(STFUNC_SIG) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvPrivate::TEvTieringModified, Handle);
HFunc(TEvPrivate::TEvNormalizerResult, Handle);
HFunc(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors, Handle);
default:
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "unexpected event in enqueue");
return NTabletFlatExecutor::TTabletExecutedFlat::Enqueue(ev);
}
}
Expand Down
42 changes: 40 additions & 2 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ struct Schema : NIceDb::Schema {
RepairsTableId,
NormalizersTableId,
NormalizerEventsTableId,
ColumnsV1TableId
ColumnsV1TableId,
ColumnsV2TableId
};

enum class ETierTables: ui32 {
Expand Down Expand Up @@ -569,6 +570,15 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<PathId, PortionId, SSColumnId, ChunkIdx, Metadata, BlobIdx, Offset, Size>;
};

struct IndexColumnsV2: Table<ColumnsV2TableId> {
struct PathId: Column<1, NScheme::NTypeIds::Uint64> {};
struct PortionId: Column<2, NScheme::NTypeIds::Uint64> {};
struct Metadata: Column<3, NScheme::NTypeIds::String> {};

using TKey = TableKey<PathId, PortionId>;
using TColumns = TableColumns<PathId, PortionId, Metadata>;
};

using TTables = SchemaTables<
Value,
TxInfo,
Expand Down Expand Up @@ -607,7 +617,8 @@ struct Schema : NIceDb::Schema {
TxDependencies,
TxStates,
TxEvents,
IndexColumnsV1
IndexColumnsV1,
IndexColumnsV2
>;

//
Expand Down Expand Up @@ -997,6 +1008,33 @@ class TColumnChunkLoadContextV1 {
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto);

public:
TPortionAddress GetPortionAddress() const {
return TPortionAddress(PathId, PortionId);
}

template <class TSource>
static void BuildFromDBV2(const TSource& rowset, std::vector<TColumnChunkLoadContextV1>& records) {
const ui64 pathId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PathId>();
const ui64 portionId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PortionId>();
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::Metadata>();
NKikimrTxColumnShard::TIndexPortionAccessor metaProto;
AFL_VERIFY(metaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
for (auto&& i : metaProto.GetChunks()) {
TColumnChunkLoadContextV1 result(pathId, portionId, TChunkAddress(i.GetSSColumnId(), i.GetChunkIdx()),
TBlobRangeLink16::BuildFromProto(i.GetBlobRangeLink()).DetachResult(), i.GetChunkMetadata());
records.emplace_back(std::move(result));
}
}

NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const {
NKikimrTxColumnShard::TColumnChunkInfo proto;
proto.SetSSColumnId(Address.GetColumnId());
proto.SetChunkIdx(Address.GetChunkIdx());
*proto.MutableChunkMetadata() = MetaProto;
*proto.MutableBlobRangeLink() = BlobRange.SerializeToProto();
return proto;
}

TFullChunkAddress GetFullChunkAddress() const {
return TFullChunkAddress(PathId, PortionId, Address.GetEntityId(), Address.GetChunkIdx());
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/data_accessor/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class TActor: public TActorBootstrapped<TActor> {
void Bootstrap();

STFUNC(StateWait) {
const NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("self_id", SelfId())("tablet_id", TabletId)("parent", Parent);
switch (ev->GetTypeRewrite()) {
cFunc(NActors::TEvents::TEvPoison::EventType, StartStopping);
hFunc(TEvRegisterController, Handle);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/data_accessor/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class TLocalManager: public IDataAccessorsManager {
const std::shared_ptr<IAccessorCallback> AccessorCallback;

virtual void DoAskData(const std::shared_ptr<TDataAccessorsRequest>& request) override {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ask_data")("request", request->DebugString());
for (auto&& i : request->GetPathIds()) {
auto it = Managers.find(i);
if (it == Managers.end()) {
Expand Down
21 changes: 18 additions & 3 deletions ydb/core/tx/columnshard/data_accessor/request.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>

Expand Down Expand Up @@ -61,7 +62,7 @@ class TDataAccessorsResult {
}
};

class IDataAccessorRequestsSubscriber {
class IDataAccessorRequestsSubscriber: public NColumnShard::TMonitoringObjectsCounter<IDataAccessorRequestsSubscriber> {
private:
THashSet<ui64> RequestIds;

Expand Down Expand Up @@ -95,7 +96,6 @@ class IDataAccessorRequestsSubscriber {
class TFakeDataAccessorsSubscriber: public IDataAccessorRequestsSubscriber {
private:
virtual void DoOnRequestsFinished(TDataAccessorsResult&& /*result*/) override {

}
};

Expand All @@ -117,6 +117,12 @@ class TPathFetchingState {
THashMap<ui64, TPortionDataAccessor> PortionAccessors;

public:
TString DebugString() const {
TStringBuilder sb;
sb << "portions_count=" << Portions.size();
return sb;
}

TPathFetchingState(const ui64 pathId)
: PathId(pathId) {
}
Expand Down Expand Up @@ -161,7 +167,7 @@ class TPathFetchingState {
}
};

class TDataAccessorsRequest {
class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDataAccessorsRequest> {
private:
static inline TAtomicCounter Counter = 0;
ui32 FetchStage = 0;
Expand Down Expand Up @@ -191,6 +197,15 @@ class TDataAccessorsRequest {
}

public:
TString DebugString() const {
TStringBuilder sb;
sb << "request_id=" << RequestId << ";";
for (auto&& i : PathIdStatus) {
sb << i.first << "={" << i.second.DebugString() << "};";
}
return sb;
}

TDataAccessorsRequest() = default;

bool HasSubscriber() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
namespace NKikimr::NOlap::NActualizer {

TTieringProcessContext::TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext,
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller)
: MemoryUsageLimit(memoryUsageLimit)
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const TVersionedIndex& versionedIndex,
const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller)
: VersionedIndex(versionedIndex)
, MemoryUsageLimit(memoryUsageLimit)
, SaverContext(saverContext)
, Counters(counters)
, Controller(controller)
Expand All @@ -31,10 +33,10 @@ bool TTieringProcessContext::AddPortion(
};
auto it = Tasks.find(features.GetRWAddress());
if (it == Tasks.end()) {
std::vector<TTaskConstructor> tasks = {buildNewTask()};
std::vector<TTaskConstructor> tasks = { buildNewTask() };
it = Tasks.emplace(features.GetRWAddress(), std::move(tasks)).first;
}
if (it->second.back().GetTxWriteVolume() + info->GetTxVolume() > TGlobalLimits::TxWriteLimitBytes / 2 && it->second.back().GetTxWriteVolume()) {
if (!it->second.back().CanTakePortionInTx(info, VersionedIndex)) {
if (Controller->IsNewTaskAvailable(it->first, it->second.size())) {
it->second.emplace_back(buildNewTask());
} else {
Expand All @@ -53,7 +55,7 @@ bool TTieringProcessContext::AddPortion(
}
it->second.back().MutableMemoryUsage() = it->second.back().GetMemoryPredictor()->AddPortion(info);
}
it->second.back().MutableTxWriteVolume() += info->GetTxVolume();
it->second.back().TakePortionInTx(info, VersionedIndex);
if (features.GetTargetTierName() == NTiering::NCommon::DeleteTierName) {
AFL_VERIFY(dWait);
Counters.OnPortionToDrop(info->GetTotalBlobBytes(), *dWait);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,34 @@ class TTaskConstructor {
YDB_READONLY_DEF(std::shared_ptr<TColumnEngineChanges::IMemoryPredictor>, MemoryPredictor);
YDB_READONLY_DEF(std::shared_ptr<TTTLColumnEngineChanges>, Task);
YDB_ACCESSOR(ui64, MemoryUsage, 0);
YDB_ACCESSOR(ui64, TxWriteVolume, 0);
YDB_READONLY(ui64, PortionsCount, 0);
YDB_READONLY(ui64, ChunksCount, 0);

public:
TTaskConstructor(const std::shared_ptr<TColumnEngineChanges::IMemoryPredictor>& predictor, const std::shared_ptr<TTTLColumnEngineChanges>& task)
: MemoryPredictor(predictor)
, Task(task) {

}

bool CanTakePortionInTx(const TPortionInfo::TConstPtr& portion, const TVersionedIndex& index) {
if (!PortionsCount) {
return true;
}
return
(PortionsCount + 1 < 1000) &&
(ChunksCount + portion->GetApproxChunksCount(portion->GetSchema(index)->GetColumnsCount()) < 100000);
}

void TakePortionInTx(const TPortionInfo::TConstPtr& portion, const TVersionedIndex& index) {
++PortionsCount;
ChunksCount += portion->GetApproxChunksCount(portion->GetSchema(index)->GetColumnsCount());
}
};

class TTieringProcessContext {
private:
const TVersionedIndex& VersionedIndex;
THashSet<TPortionAddress> UsedPortions;
const ui64 MemoryUsageLimit;
TSaverContext SaverContext;
Expand Down Expand Up @@ -63,7 +80,8 @@ class TTieringProcessContext {
}
}

TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager,
TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext,
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const TVersionedIndex& versionedIndex,
const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller);
};

Expand Down
Loading
Loading