Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ struct TKikimrEvents : TEvents {
ES_GRAPH,
ES_REPLICATION_SERVICE,
ES_CHANGE_EXCHANGE,
ES_LIMITER
};
};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ union TBasicKikimrServicesMask {

bool EnableDatabaseMetadataCache:1;
bool EnableGraphService:1;
bool EnableCompDiskLimiter:1;
};

struct {
Expand Down
25 changes: 24 additions & 1 deletion ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,12 @@
#include <ydb/services/metadata/ds_table/service.h>
#include <ydb/services/metadata/service.h>

#include <ydb/core/tx/conveyor/usage/config.h>
#include <ydb/core/tx/conveyor/service/service.h>
#include <ydb/core/tx/conveyor/usage/config.h>
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/limiter/service/service.h>
#include <ydb/core/tx/limiter/usage/config.h>
#include <ydb/core/tx/limiter/usage/service.h>

#include <ydb/services/bg_tasks/ds_table/executor.h>
#include <ydb/services/bg_tasks/service.h>
Expand Down Expand Up @@ -2145,6 +2148,26 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
}
}

TCompDiskLimiterInitializer::TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}

void TCompDiskLimiterInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
NLimiter::TConfig serviceConfig;
Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto<NLimiter::TCompDiskLimiterPolicy>(Config.GetCompDiskLimiterConfig()));

if (serviceConfig.IsEnabled()) {
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
TIntrusivePtr<::NMonitoring::TDynamicCounters> countersGroup = tabletGroup->GetSubgroup("type", "TX_COMP_DISK_LIMITER");

auto service = NLimiter::TCompDiskOperator::CreateService(serviceConfig, countersGroup);

setup->LocalServices.push_back(std::make_pair(
NLimiter::TCompDiskOperator::MakeServiceId(NodeId),
TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
}
}

TCompConveyorInitializer::TCompConveyorInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ class TKqpServiceInitializer : public IKikimrServicesInitializer {
IGlobalObjectStorage& GlobalObjects;
};

class TCompDiskLimiterInitializer: public IKikimrServicesInitializer {
public:
TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TCompConveyorInitializer: public IKikimrServicesInitializer {
public:
TCompConveyorInitializer(const TKikimrRunConfig& runConfig);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TExternalIndexInitializer(runConfig));
}

if (serviceMask.EnableCompDiskLimiter) {
sil->AddServiceInitializer(new TCompDiskLimiterInitializer(runConfig));
}

if (serviceMask.EnableScanConveyor) {
sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig));
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ PEERDIR(
ydb/core/tx/columnshard
ydb/core/tx/coordinator
ydb/core/tx/conveyor/service
ydb/core/tx/limiter/service
ydb/core/tx/datashard
ydb/core/tx/long_tx_service
ydb/core/tx/long_tx_service/public
Expand Down
37 changes: 30 additions & 7 deletions ydb/core/formats/arrow/common/accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@ std::shared_ptr<arrow::Array> IChunkedArray::TReader::CopyRecord(const ui64 reco
return NArrow::CopyRecords(address.GetArray(), {address.GetPosition()});
}

std::shared_ptr<arrow::ChunkedArray> IChunkedArray::TReader::Slice(const ui32 offset, const ui32 count) const {
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, const ui32 count) const {
AFL_VERIFY(offset + count <= (ui64)GetRecordsCount())("offset", offset)("count", count)("length", GetRecordsCount());
ui32 currentOffset = offset;
ui32 countLeast = count;
std::vector<std::shared_ptr<arrow::Array>> chunks;
auto address = GetChunk({}, offset);
while (countLeast) {
auto address = GetReadChunk(currentOffset);
if (address.GetPosition() + countLeast <= (ui64)address.GetArray()->length()) {
chunks.emplace_back(address.GetArray()->Slice(address.GetPosition(), countLeast));
address = GetChunk(address, currentOffset);
const ui64 internalPos = currentOffset - address.GetStartPosition();
if (internalPos + countLeast <= (ui64)address.GetArray()->length()) {
chunks.emplace_back(address.GetArray()->Slice(internalPos, countLeast));
break;
} else {
const ui32 deltaCount = address.GetArray()->length() - address.GetPosition();
chunks.emplace_back(address.GetArray()->Slice(address.GetPosition(), deltaCount));
const ui32 deltaCount = address.GetArray()->length() - internalPos;
chunks.emplace_back(address.GetArray()->Slice(internalPos, deltaCount));
AFL_VERIFY(countLeast >= deltaCount);
countLeast -= deltaCount;
currentOffset += deltaCount;
}
}
return std::make_shared<arrow::ChunkedArray>(chunks, ChunkedArray->DataType);
return std::make_shared<arrow::ChunkedArray>(chunks, DataType);
}

TString IChunkedArray::TReader::DebugString(const ui32 position) const {
Expand Down Expand Up @@ -89,6 +91,27 @@ class TChunkAccessor {
return ChunkedArray->chunk(idx);
}
};

}

std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const {
AFL_VERIFY(StartPosition <= position);
AFL_VERIFY(position < FinishPosition);
AFL_VERIFY(item.StartPosition <= itemPosition);
AFL_VERIFY(itemPosition < item.FinishPosition);
return TComparator::TypedCompare<true>(*Array, position - StartPosition, *item.Array, itemPosition - item.StartPosition);
}

std::shared_ptr<arrow::Array> IChunkedArray::TCurrentChunkAddress::CopyRecord(const ui64 recordIndex) const {
AFL_VERIFY(StartPosition <= recordIndex);
AFL_VERIFY(recordIndex < FinishPosition);
return NArrow::CopyRecords(Array, { recordIndex - StartPosition });
}

TString IChunkedArray::TCurrentChunkAddress::DebugString(const ui64 position) const {
AFL_VERIFY(position < FinishPosition);
AFL_VERIFY(StartPosition <= position);
return NArrow::DebugString(Array, position - StartPosition);
}

IChunkedArray::TCurrentChunkAddress TTrivialChunkedArray::DoGetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const {
Expand Down
19 changes: 18 additions & 1 deletion ydb/core/formats/arrow/common/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,31 @@ class IChunkedArray {
private:
YDB_READONLY_DEF(std::shared_ptr<arrow::Array>, Array);
YDB_READONLY(ui64, StartPosition, 0);
YDB_READONLY(ui64, FinishPosition, 0);
YDB_READONLY(ui64, ChunkIndex, 0);
public:
TString DebugString(const ui64 position) const;

ui64 GetLength() const {
return Array->length();
}

bool Contains(const ui64 position) const {
return position >= StartPosition && position < FinishPosition;
}

std::shared_ptr<arrow::Array> CopyRecord(const ui64 recordIndex) const;

std::partial_ordering Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const;

TCurrentChunkAddress(const std::shared_ptr<arrow::Array>& arr, const ui64 pos, const ui32 chunkIdx)
: Array(arr)
, StartPosition(pos)
, ChunkIndex(chunkIdx)
{
AFL_VERIFY(arr);
AFL_VERIFY(arr->length());
FinishPosition = StartPosition + arr->length();
}

TString DebugString() const {
Expand Down Expand Up @@ -141,7 +153,6 @@ class IChunkedArray {
static std::partial_ordering CompareColumns(const std::vector<TReader>& l, const ui64 lPosition, const std::vector<TReader>& r, const ui64 rPosition);
void AppendPositionTo(arrow::ArrayBuilder& builder, const ui64 position, ui64* recordSize) const;
std::shared_ptr<arrow::Array> CopyRecord(const ui64 recordIndex) const;
std::shared_ptr<arrow::ChunkedArray> Slice(const ui32 offset, const ui32 count) const;
TString DebugString(const ui32 position) const;
};

Expand All @@ -150,6 +161,12 @@ class IChunkedArray {
}
virtual ~IChunkedArray() = default;

std::shared_ptr<arrow::ChunkedArray> Slice(const ui32 offset, const ui32 count) const;

TCurrentChunkAddress GetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const {
return DoGetChunk(chunkCurrent, position);
}

IChunkedArray(const ui64 recordsCount, const EType type, const std::shared_ptr<arrow::DataType>& dataType)
: DataType(dataType)
, RecordsCount(recordsCount)
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/formats/arrow/reader/batch_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ namespace NKikimr::NArrow::NMerger {
class TBatchIterator {
private:
bool ControlPointFlag;
TSortableBatchPosition KeyColumns;
TSortableBatchPosition VersionColumns;
TRWSortableBatchPosition KeyColumns;
TRWSortableBatchPosition VersionColumns;
i64 RecordsCount;
int ReverseSortKff;

Expand All @@ -34,17 +34,17 @@ class TBatchIterator {
return ControlPointFlag;
}

const TSortableBatchPosition& GetKeyColumns() const {
const TRWSortableBatchPosition& GetKeyColumns() const {
return KeyColumns;
}

const TSortableBatchPosition& GetVersionColumns() const {
const TRWSortableBatchPosition& GetVersionColumns() const {
return VersionColumns;
}

TBatchIterator(const TSortableBatchPosition& keyColumns)
TBatchIterator(TRWSortableBatchPosition&& keyColumns)
: ControlPointFlag(true)
, KeyColumns(keyColumns) {
, KeyColumns(std::move(keyColumns)) {

}

Expand Down
9 changes: 8 additions & 1 deletion ydb/core/formats/arrow/reader/heap.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class TRecordBatchBuilder;

template <class TSortCursor>
class TSortingHeap {
private:
std::deque<TSortCursor> FinishedCursors;
public:
TSortingHeap() = default;

Expand Down Expand Up @@ -40,14 +42,19 @@ class TSortingHeap {
}
}

void CleanFinished() {
FinishedCursors.clear();
}

void RemoveTop() {
std::pop_heap(Queue.begin(), Queue.end());
FinishedCursors.emplace_back(std::move(Queue.back()));
Queue.pop_back();
NextIdx = 0;
}

void Push(TSortCursor&& cursor) {
Queue.emplace_back(cursor);
Queue.emplace_back(std::move(cursor));
std::push_heap(Queue.begin(), Queue.end());
NextIdx = 0;
}
Expand Down
Loading