Skip to content

Commit 93b6ebc

Browse files
Merge bdb2514 into 398fb41
2 parents 398fb41 + bdb2514 commit 93b6ebc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+4008
-92
lines changed

ydb/core/formats/arrow/common/container.cpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,27 +148,32 @@ std::shared_ptr<NKikimr::NArrow::TGeneralContainer> TGeneralContainer::BuildEmpt
148148
return std::make_shared<TGeneralContainer>(Schema, std::move(columns));
149149
}
150150

151-
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
151+
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const TTableConstructionContext& context) const {
152152
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
153153
std::vector<std::shared_ptr<arrow::Field>> fields;
154154
for (i32 i = 0; i < Schema->num_fields(); ++i) {
155-
if (columnNames && !columnNames->contains(Schema->field(i)->name())) {
155+
if (context.GetColumnNames() && !context.GetColumnNames()->contains(Schema->field(i)->name())) {
156156
continue;
157157
}
158-
columns.emplace_back(Columns[i]->GetChunkedArray());
158+
if (context.GetRecordsCount() || context.GetStartIndex()) {
159+
columns.emplace_back(Columns[i]->Slice(context.GetStartIndex().value_or(0),
160+
context.GetRecordsCount().value_or(GetRecordsCount() - context.GetStartIndex().value_or(0))));
161+
} else {
162+
columns.emplace_back(Columns[i]->GetChunkedArray());
163+
}
159164
fields.emplace_back(Schema->field(i));
160165
}
161166
if (fields.empty()) {
162167
return nullptr;
163168
}
164169
AFL_VERIFY(RecordsCount);
165-
return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns, *RecordsCount);
170+
return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns, context.GetRecordsCount().value_or(*RecordsCount));
166171
}
167172

168-
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
169-
auto result = BuildTableOptional(columnNames);
173+
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const TTableConstructionContext& context) const {
174+
auto result = BuildTableOptional(context);
170175
AFL_VERIFY(result);
171-
AFL_VERIFY(!columnNames || result->schema()->num_fields() == (i32)columnNames->size());
176+
AFL_VERIFY(!context.GetColumnNames() || result->schema()->num_fields() == (i32)context.GetColumnNames()->size());
172177
return result;
173178
}
174179

ydb/core/formats/arrow/common/container.h

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,29 @@ class TGeneralContainer {
6262
return Columns[idx];
6363
}
6464

65-
std::shared_ptr<arrow::Table> BuildTableVerified(const std::optional<std::set<std::string>>& columnNames = {}) const;
66-
std::shared_ptr<arrow::Table> BuildTableOptional(const std::optional<std::set<std::string>>& columnNames = {}) const;
65+
class TTableConstructionContext {
66+
private:
67+
YDB_ACCESSOR_DEF(std::optional<std::set<std::string>>, ColumnNames);
68+
YDB_ACCESSOR_DEF(std::optional<ui32>, StartIndex);
69+
YDB_ACCESSOR_DEF(std::optional<ui32>, RecordsCount);
70+
71+
public:
72+
TTableConstructionContext() = default;
73+
TTableConstructionContext(std::set<std::string>&& columnNames)
74+
: ColumnNames(std::move(columnNames)) {
75+
}
76+
77+
TTableConstructionContext(const std::set<std::string>& columnNames)
78+
: ColumnNames(columnNames) {
79+
}
80+
81+
void SetColumnNames(const std::vector<TString>& names) {
82+
ColumnNames = std::set<std::string>(names.begin(), names.end());
83+
}
84+
};
85+
86+
std::shared_ptr<arrow::Table> BuildTableVerified(const TTableConstructionContext& context = Default<TTableConstructionContext>()) const;
87+
std::shared_ptr<arrow::Table> BuildTableOptional(const TTableConstructionContext& context = Default<TTableConstructionContext>()) const;
6788

6889
std::shared_ptr<TGeneralContainer> BuildEmptySame() const;
6990

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1699,6 +1699,7 @@ message TColumnShardConfig {
16991699
optional bool ColumnChunksV0Usage = 25 [default = true];
17001700
optional bool ColumnChunksV1Usage = 26 [default = true];
17011701
optional uint64 MemoryLimitScanPortion = 27 [default = 100000000];
1702+
optional string ReaderClassName = 28;
17021703
}
17031704

17041705
message TSchemeShardConfig {

ydb/core/protos/tx_datashard.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1674,6 +1674,19 @@ message TComputeShardingPolicy {
16741674

16751675
}
16761676

1677+
message TEvKqpScanCursor {
1678+
message TColumnShardScanPlain {
1679+
}
1680+
message TColumnShardScanSimple {
1681+
optional uint64 SourceId = 1;
1682+
optional uint32 StartRecordIndex = 2;
1683+
}
1684+
oneof Implementation {
1685+
TColumnShardScanPlain ColumnShardPlain = 10;
1686+
TColumnShardScanSimple ColumnShardSimple = 11;
1687+
}
1688+
}
1689+
16771690
message TEvKqpScan {
16781691
optional uint64 TxId = 1;
16791692
optional uint64 ScanId = 2;
@@ -1700,6 +1713,7 @@ message TEvKqpScan {
17001713
optional TComputeShardingPolicy ComputeShardingPolicy = 23;
17011714
optional uint64 LockTxId = 24;
17021715
optional uint32 LockNodeId = 25;
1716+
optional TEvKqpScanCursor ScanCursor = 26;
17031717
}
17041718

17051719
message TEvCompactTable {

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ class TTxInternalScan;
5959
namespace NPlain {
6060
class TIndexScannerConstructor;
6161
}
62+
namespace NSimple {
63+
class TIndexScannerConstructor;
64+
}
6265
} // namespace NReader
6366

6467
namespace NDataSharing {
@@ -109,7 +112,7 @@ class TSharingSessionsInitializer;
109112
class TInFlightReadsInitializer;
110113
class TSpecialValuesInitializer;
111114
class TTablesManagerInitializer;
112-
}
115+
} // namespace NLoading
113116

114117
extern bool gAllowLogBatchingDefaultValue;
115118

@@ -198,6 +201,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
198201
friend class NOlap::NReader::TTxScan;
199202
friend class NOlap::NReader::TTxInternalScan;
200203
friend class NOlap::NReader::NPlain::TIndexScannerConstructor;
204+
friend class NOlap::NReader::NSimple::TIndexScannerConstructor;
201205

202206
class TStoragesManager;
203207
friend class TTxController;
@@ -246,7 +250,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
246250
void Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActorContext& ctx);
247251
void Handle(TEvPrivate::TEvStartCompaction::TPtr& ev, const TActorContext& ctx);
248252
void Handle(TEvPrivate::TEvMetadataAccessorsInfo::TPtr& ev, const TActorContext& ctx);
249-
253+
250254
void Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& ev, const TActorContext& ctx);
251255

252256
void Handle(TEvPrivate::TEvScanStats::TPtr& ev, const TActorContext& ctx);

ydb/core/tx/columnshard/counters/scan.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,13 +295,19 @@ class TConcreteScanCounters: public TScanCounters {
295295
std::shared_ptr<TAtomicCounter> AssembleTasksCount;
296296
std::shared_ptr<TAtomicCounter> ReadTasksCount;
297297
std::shared_ptr<TAtomicCounter> ResourcesAllocationTasksCount;
298+
std::shared_ptr<TAtomicCounter> ResultsForSourceCount;
299+
298300
public:
299301
TScanAggregations Aggregations;
300302

301303
TCounterGuard GetFetcherAcessorsGuard() const {
302304
return TCounterGuard(FetchAccessorsCount);
303305
}
304306

307+
TCounterGuard GetResultsForSourceGuard() const {
308+
return TCounterGuard(ResultsForSourceCount);
309+
}
310+
305311
TCounterGuard GetMergeTasksGuard() const {
306312
return TCounterGuard(MergeTasksCount);
307313
}
@@ -320,7 +326,7 @@ class TConcreteScanCounters: public TScanCounters {
320326

321327
bool InWaiting() const {
322328
return MergeTasksCount->Val() || AssembleTasksCount->Val() || ReadTasksCount->Val() || ResourcesAllocationTasksCount->Val() ||
323-
FetchAccessorsCount->Val();
329+
FetchAccessorsCount->Val() || ResultsForSourceCount->Val();
324330
}
325331

326332
void OnBlobsWaitDuration(const TDuration d, const TDuration fullScanDuration) const {
@@ -335,6 +341,7 @@ class TConcreteScanCounters: public TScanCounters {
335341
, AssembleTasksCount(std::make_shared<TAtomicCounter>())
336342
, ReadTasksCount(std::make_shared<TAtomicCounter>())
337343
, ResourcesAllocationTasksCount(std::make_shared<TAtomicCounter>())
344+
, ResultsForSourceCount(std::make_shared<TAtomicCounter>())
338345
, Aggregations(TBase::BuildAggregations())
339346
{
340347

ydb/core/tx/columnshard/engines/portions/data_accessor.cpp

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,106 @@ std::vector<const TColumnRecord*> TPortionDataAccessor::GetColumnChunksPointers(
350350
return result;
351351
}
352352

353+
std::vector<TPortionDataAccessor::TReadPage> TPortionDataAccessor::BuildReadPages(const ui64 memoryLimit, const std::set<ui32>& entityIds) const {
354+
class TEntityDelimiter {
355+
private:
356+
YDB_READONLY(ui32, IndexStart, 0);
357+
YDB_READONLY(ui32, EntityId, 0);
358+
YDB_READONLY(ui32, ChunkIdx, 0);
359+
YDB_READONLY(ui64, MemoryStartChunk, 0);
360+
YDB_READONLY(ui64, MemoryFinishChunk, 0);
361+
362+
public:
363+
TEntityDelimiter(const ui32 indexStart, const ui32 entityId, const ui32 chunkIdx, const ui64 memStartChunk, const ui64 memFinishChunk)
364+
: IndexStart(indexStart)
365+
, EntityId(entityId)
366+
, ChunkIdx(chunkIdx)
367+
, MemoryStartChunk(memStartChunk)
368+
, MemoryFinishChunk(memFinishChunk) {
369+
}
370+
371+
bool operator<(const TEntityDelimiter& item) const {
372+
return std::tie(IndexStart, EntityId, ChunkIdx) < std::tie(item.IndexStart, item.EntityId, item.ChunkIdx);
373+
}
374+
};
375+
376+
class TGlobalDelimiter {
377+
private:
378+
YDB_READONLY(ui32, IndexStart, 0);
379+
YDB_ACCESSOR(ui64, UsedMemory, 0);
380+
YDB_ACCESSOR(ui64, WholeChunksMemory, 0);
381+
382+
public:
383+
TGlobalDelimiter(const ui32 indexStart)
384+
: IndexStart(indexStart) {
385+
}
386+
};
387+
388+
std::vector<TEntityDelimiter> delimiters;
389+
390+
ui32 lastAppliedId = 0;
391+
ui32 currentRecordIdx = 0;
392+
bool needOne = false;
393+
const TColumnRecord* lastRecord = nullptr;
394+
for (auto&& i : GetRecordsVerified()) {
395+
if (lastAppliedId != i.GetEntityId()) {
396+
if (delimiters.size()) {
397+
AFL_VERIFY(delimiters.back().GetIndexStart() == PortionInfo->GetRecordsCount());
398+
}
399+
needOne = entityIds.contains(i.GetEntityId());
400+
currentRecordIdx = 0;
401+
lastAppliedId = i.GetEntityId();
402+
lastRecord = nullptr;
403+
}
404+
if (!needOne) {
405+
continue;
406+
}
407+
delimiters.emplace_back(
408+
currentRecordIdx, i.GetEntityId(), i.GetChunkIdx(), i.GetMeta().GetRawBytes(), lastRecord ? lastRecord->GetMeta().GetRawBytes() : 0);
409+
currentRecordIdx += i.GetMeta().GetRecordsCount();
410+
if (currentRecordIdx == PortionInfo->GetRecordsCount()) {
411+
delimiters.emplace_back(currentRecordIdx, i.GetEntityId(), i.GetChunkIdx() + 1, 0, i.GetMeta().GetRawBytes());
412+
}
413+
lastRecord = &i;
414+
}
415+
std::sort(delimiters.begin(), delimiters.end());
416+
std::vector<TGlobalDelimiter> sumDelimiters;
417+
for (auto&& i : delimiters) {
418+
if (sumDelimiters.empty()) {
419+
sumDelimiters.emplace_back(i.GetIndexStart());
420+
} else if (sumDelimiters.back().GetIndexStart() != i.GetIndexStart()) {
421+
AFL_VERIFY(sumDelimiters.back().GetIndexStart() < i.GetIndexStart());
422+
TGlobalDelimiter backDelimiter(i.GetIndexStart());
423+
backDelimiter.MutableWholeChunksMemory() = sumDelimiters.back().GetWholeChunksMemory();
424+
backDelimiter.MutableUsedMemory() = sumDelimiters.back().GetUsedMemory();
425+
sumDelimiters.emplace_back(std::move(backDelimiter));
426+
}
427+
sumDelimiters.back().MutableWholeChunksMemory() += i.GetMemoryFinishChunk();
428+
sumDelimiters.back().MutableUsedMemory() += i.GetMemoryStartChunk();
429+
}
430+
std::vector<ui32> recordIdx = { 0 };
431+
std::vector<ui64> packMemorySize;
432+
const TGlobalDelimiter* lastBorder = &sumDelimiters.front();
433+
for (auto&& i : sumDelimiters) {
434+
const i64 sumMemory = (i64)i.GetUsedMemory() - (i64)lastBorder->GetWholeChunksMemory();
435+
AFL_VERIFY(sumMemory > 0);
436+
if (((ui64)sumMemory >= memoryLimit || i.GetIndexStart() == PortionInfo->GetRecordsCount()) && i.GetIndexStart()) {
437+
AFL_VERIFY(lastBorder->GetIndexStart() < i.GetIndexStart());
438+
recordIdx.emplace_back(i.GetIndexStart());
439+
packMemorySize.emplace_back(sumMemory);
440+
lastBorder = &i;
441+
}
442+
}
443+
AFL_VERIFY(recordIdx.front() == 0);
444+
AFL_VERIFY(recordIdx.back() == PortionInfo->GetRecordsCount());
445+
AFL_VERIFY(recordIdx.size() == packMemorySize.size() + 1);
446+
std::vector<TReadPage> pages;
447+
for (ui32 i = 0; i < packMemorySize.size(); ++i) {
448+
pages.emplace_back(recordIdx[i], recordIdx[i + 1] - recordIdx[i], packMemorySize[i]);
449+
}
450+
return pages;
451+
}
452+
353453
std::vector<TPortionDataAccessor::TPage> TPortionDataAccessor::BuildPages() const {
354454
std::vector<TPage> pages;
355455
struct TPart {

ydb/core/tx/columnshard/engines/portions/data_accessor.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,23 @@ class TPortionDataAccessor {
436436

437437
std::vector<TPage> BuildPages() const;
438438
ui64 GetMinMemoryForReadColumns(const std::optional<std::set<ui32>>& columnIds) const;
439+
440+
class TReadPage {
441+
private:
442+
YDB_READONLY(ui32, IndexStart, 0);
443+
YDB_READONLY(ui32, RecordsCount, 0);
444+
YDB_READONLY(ui64, MemoryUsage, 0);
445+
446+
public:
447+
TReadPage(const ui32 indexStart, const ui32 recordsCount, const ui64 memoryUsage)
448+
: IndexStart(indexStart)
449+
, RecordsCount(recordsCount)
450+
, MemoryUsage(memoryUsage) {
451+
AFL_VERIFY(RecordsCount);
452+
}
453+
};
454+
455+
std::vector<TReadPage> BuildReadPages(const ui64 memoryLimit, const std::set<ui32>& entityIds) const;
439456
};
440457

441458
} // namespace NKikimr::NOlap

0 commit comments

Comments
 (0)