Skip to content

Commit acb4df3

Browse files
compaction speedup (#10323)
1 parent 0ea658a commit acb4df3

File tree

136 files changed

+3140
-376
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

136 files changed

+3140
-376
lines changed

.github/config/muted_ya.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,15 @@ ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingConsistency64
2424
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingModuloN
2525
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.UpsertWhileSplitTest
2626
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
27+
ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictStatActualization
28+
ydb/core/kqp/ut/olap KqpOlapAggregations.Aggregation_SumL_GroupL_OrderL
29+
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
30+
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInBS
31+
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInLocalMetadata
32+
ydb/core/tx/columnshard/ut_rw Normalizers.CleanEmptyPortionsNormalizer
33+
ydb/core/kqp/ut/pg KqpPg.CreateIndex
34+
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
35+
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
2736
ydb/core/kqp/ut/query KqpLimits.ComputeActorMemoryAllocationFailureQueryService
2837
ydb/core/kqp/ut/query KqpLimits.QueryExecTimeoutCancel
2938
ydb/core/kqp/ut/query KqpStats.SysViewClientLost

ydb/core/base/events.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ struct TKikimrEvents : TEvents {
183183
ES_GROUPED_ALLOCATIONS_MANAGER = 4260,
184184
ES_INCREMENTAL_RESTORE_SCAN = 4261,
185185
ES_FEATURE_FLAGS = 4262,
186+
ES_PRIORITY_QUEUE = 4263,
186187
};
187188
};
188189

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@
185185
#include <ydb/core/tx/conveyor/service/service.h>
186186
#include <ydb/core/tx/conveyor/usage/config.h>
187187
#include <ydb/core/tx/conveyor/usage/service.h>
188+
#include <ydb/core/tx/priorities/usage/config.h>
189+
#include <ydb/core/tx/priorities/usage/service.h>
188190
#include <ydb/core/tx/limiter/service/service.h>
189191
#include <ydb/core/tx/limiter/usage/config.h>
190192
#include <ydb/core/tx/limiter/usage/service.h>
@@ -2199,6 +2201,28 @@ void TCompDiskLimiterInitializer::InitializeServices(NActors::TActorSystemSetup*
21992201
}
22002202
}
22012203

2204+
TCompPrioritiesInitializer::TCompPrioritiesInitializer(const TKikimrRunConfig& runConfig)
2205+
: IKikimrServicesInitializer(runConfig) {
2206+
}
2207+
2208+
void TCompPrioritiesInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
2209+
NPrioritiesQueue::TConfig serviceConfig;
2210+
if (Config.HasCompPrioritiesConfig()) {
2211+
Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto(Config.GetCompPrioritiesConfig()));
2212+
}
2213+
2214+
if (serviceConfig.IsEnabled()) {
2215+
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
2216+
TIntrusivePtr<::NMonitoring::TDynamicCounters> conveyorGroup = tabletGroup->GetSubgroup("type", "TX_COMP_PRIORITIES");
2217+
2218+
auto service = NPrioritiesQueue::TCompServiceOperator::CreateService(serviceConfig, conveyorGroup);
2219+
2220+
setup->LocalServices.push_back(std::make_pair(
2221+
NPrioritiesQueue::TCompServiceOperator::MakeServiceId(NodeId),
2222+
TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
2223+
}
2224+
}
2225+
22022226
TCompConveyorInitializer::TCompConveyorInitializer(const TKikimrRunConfig& runConfig)
22032227
: IKikimrServicesInitializer(runConfig) {
22042228
}

ydb/core/driver_lib/run/kikimr_services_initializers.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,12 @@ class TGroupedMemoryLimiterInitializer: public IKikimrServicesInitializer {
410410
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
411411
};
412412

413+
class TCompPrioritiesInitializer: public IKikimrServicesInitializer {
414+
public:
415+
TCompPrioritiesInitializer(const TKikimrRunConfig& runConfig);
416+
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
417+
};
418+
413419
class TCompConveyorInitializer: public IKikimrServicesInitializer {
414420
public:
415421
TCompConveyorInitializer(const TKikimrRunConfig& runConfig);

ydb/core/driver_lib/run/run.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,6 +1613,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
16131613
sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig));
16141614
}
16151615

1616+
if (serviceMask.EnableCompPriorities) {
1617+
sil->AddServiceInitializer(new TCompPrioritiesInitializer(runConfig));
1618+
}
1619+
16161620
if (serviceMask.EnableCompConveyor) {
16171621
sil->AddServiceInitializer(new TCompConveyorInitializer(runConfig));
16181622
}

ydb/core/driver_lib/run/service_mask.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ union TBasicKikimrServicesMask {
8080
bool EnableCompDiskLimiter:1;
8181
bool EnableGroupedMemoryLimiter:1;
8282
bool EnableAwsService:1;
83+
bool EnableCompPriorities : 1;
8384
};
8485

8586
struct {

ydb/core/formats/arrow/accessor/plain/accessor.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ class TTrivialArray: public IChunkedArray {
3232
}
3333

3434
public:
35+
const std::shared_ptr<arrow::Array>& GetArray() const {
36+
return Array;
37+
}
38+
3539
TTrivialArray(const std::shared_ptr<arrow::Array>& data)
3640
: TBase(data->length(), EType::Array, data->type())
3741
, Array(data) {

ydb/core/formats/arrow/accessor/plain/constructor.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,15 @@ std::shared_ptr<arrow::Schema> TConstructor::DoGetExpectedSchema(const std::shar
3535

3636
std::shared_ptr<arrow::RecordBatch> TConstructor::DoConstruct(
3737
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
38-
auto chunked = columnData->GetChunkedArray();
3938
auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("val", externalInfo.GetColumnType()) }));
40-
auto table = arrow::Table::Make(schema, { chunked }, columnData->GetRecordsCount());
41-
return NArrow::ToBatch(table, true);
39+
if (columnData->GetType() == IChunkedArray::EType::Array) {
40+
const auto* arr = static_cast<const TTrivialArray*>(columnData.get());
41+
return arrow::RecordBatch::Make(schema, columnData->GetRecordsCount(), { arr->GetArray() });
42+
} else {
43+
auto chunked = columnData->GetChunkedArray();
44+
auto table = arrow::Table::Make(schema, { chunked }, columnData->GetRecordsCount());
45+
return NArrow::ToBatch(table, chunked->num_chunks() > 1);
46+
}
4247
}
4348

4449
} // namespace NKikimr::NArrow::NAccessor::NPlain

ydb/core/formats/arrow/program.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ arrow::Status TDatumBatch::AddColumn(const std::string& name, arrow::Datum&& col
555555

556556
auto field = arrow::field(name, column.type());
557557
if (!field || !field->type()->Equals(column.type())) {
558-
return arrow::Status::Invalid("Cannot create field.");
558+
return arrow::Status::Invalid("Cannot create field " + name + ". type:" + field->type()->ToString() + " vs " + column.type()->ToString());
559559
}
560560
if (!column.is_scalar() && column.length() != Rows) {
561561
return arrow::Status::Invalid("Wrong column length.");

ydb/core/formats/arrow/reader/position.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,15 @@ TSortableScanData::TSortableScanData(
133133
BuildPosition(position);
134134
}
135135

136+
TSortableScanData::TSortableScanData(
137+
const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch) {
138+
for (auto&& c : batch->columns()) {
139+
Columns.emplace_back(std::make_shared<NAccessor::TTrivialArray>(c));
140+
}
141+
Fields = batch->schema()->fields();
142+
BuildPosition(position);
143+
}
144+
136145
TSortableScanData::TSortableScanData(const ui64 position, const std::shared_ptr<arrow::Table>& batch, const std::vector<std::string>& columns) {
137146
for (auto&& i : columns) {
138147
auto c = batch->GetColumnByName(i);

0 commit comments

Comments
 (0)