Skip to content

Commit 1660046

Browse files
Merge a4155b3 into 198242d
2 parents 198242d + a4155b3 commit 1660046

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+3696
-58
lines changed

ydb/core/formats/arrow/common/container.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,19 @@ std::shared_ptr<NKikimr::NArrow::TGeneralContainer> TGeneralContainer::BuildEmpt
148148
return std::make_shared<TGeneralContainer>(Schema, std::move(columns));
149149
}
150150

151-
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
151+
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const TTableConstructionContext& context) const {
152152
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
153153
std::vector<std::shared_ptr<arrow::Field>> fields;
154154
for (i32 i = 0; i < Schema->num_fields(); ++i) {
155-
if (columnNames && !columnNames->contains(Schema->field(i)->name())) {
155+
if (context.GetColumnNames() && !context.GetColumnNames()->contains(Schema->field(i)->name())) {
156156
continue;
157157
}
158-
columns.emplace_back(Columns[i]->GetChunkedArray());
158+
if (context.GetRecordsCount() || context.GetStartIndex()) {
159+
columns.emplace_back(Columns[i]->Slice(context.GetStartIndex().value_or(0),
160+
context.GetRecordsCount().value_or(GetRecordsCount() - context.GetStartIndex().value_or(0))));
161+
} else {
162+
columns.emplace_back(Columns[i]->GetChunkedArray());
163+
}
159164
fields.emplace_back(Schema->field(i));
160165
}
161166
if (fields.empty()) {
@@ -165,10 +170,10 @@ std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const std::o
165170
return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns, *RecordsCount);
166171
}
167172

168-
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
169-
auto result = BuildTableOptional(columnNames);
173+
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const TTableConstructionContext& context) const {
174+
auto result = BuildTableOptional(context);
170175
AFL_VERIFY(result);
171-
AFL_VERIFY(!columnNames || result->schema()->num_fields() == (i32)columnNames->size());
176+
AFL_VERIFY(!context.GetColumnNames() || result->schema()->num_fields() == (i32)context.GetColumnNames()->size());
172177
return result;
173178
}
174179

ydb/core/formats/arrow/common/container.h

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,25 @@ class TGeneralContainer {
6262
return Columns[idx];
6363
}
6464

65-
std::shared_ptr<arrow::Table> BuildTableVerified(const std::optional<std::set<std::string>>& columnNames = {}) const;
66-
std::shared_ptr<arrow::Table> BuildTableOptional(const std::optional<std::set<std::string>>& columnNames = {}) const;
65+
class TTableConstructionContext {
66+
private:
67+
YDB_ACCESSOR_DEF(std::optional<std::set<std::string>>, ColumnNames);
68+
YDB_ACCESSOR_DEF(std::optional<ui32>, StartIndex);
69+
YDB_ACCESSOR_DEF(std::optional<ui32>, RecordsCount);
70+
71+
public:
72+
TTableConstructionContext() = default;
73+
TTableConstructionContext(std::set<std::string>&& columnNames)
74+
: ColumnNames(std::move(columnNames)) {
75+
}
76+
77+
TTableConstructionContext(const std::set<std::string>& columnNames)
78+
: ColumnNames(columnNames) {
79+
}
80+
};
81+
82+
std::shared_ptr<arrow::Table> BuildTableVerified(const TTableConstructionContext& context = Default<TTableConstructionContext>()) const;
83+
std::shared_ptr<arrow::Table> BuildTableOptional(const TTableConstructionContext& context = Default<TTableConstructionContext>()) const;
6784

6885
std::shared_ptr<TGeneralContainer> BuildEmptySame() const;
6986

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1699,6 +1699,7 @@ message TColumnShardConfig {
16991699
optional bool ColumnChunksV0Usage = 25 [default = true];
17001700
optional bool ColumnChunksV1Usage = 26 [default = true];
17011701
optional uint64 MemoryLimitScanPortion = 27 [default = 100000000];
1702+
optional string ReaderClassName = 28;
17021703
}
17031704

17041705
message TSchemeShardConfig {

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ class TTxInternalScan;
5959
namespace NPlain {
6060
class TIndexScannerConstructor;
6161
}
62+
namespace NSimple {
63+
class TIndexScannerConstructor;
64+
}
6265
} // namespace NReader
6366

6467
namespace NDataSharing {
@@ -109,7 +112,7 @@ class TSharingSessionsInitializer;
109112
class TInFlightReadsInitializer;
110113
class TSpecialValuesInitializer;
111114
class TTablesManagerInitializer;
112-
}
115+
} // namespace NLoading
113116

114117
extern bool gAllowLogBatchingDefaultValue;
115118

@@ -198,6 +201,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
198201
friend class NOlap::NReader::TTxScan;
199202
friend class NOlap::NReader::TTxInternalScan;
200203
friend class NOlap::NReader::NPlain::TIndexScannerConstructor;
204+
friend class NOlap::NReader::NSimple::TIndexScannerConstructor;
201205

202206
class TStoragesManager;
203207
friend class TTxController;
@@ -246,7 +250,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
246250
void Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActorContext& ctx);
247251
void Handle(TEvPrivate::TEvStartCompaction::TPtr& ev, const TActorContext& ctx);
248252
void Handle(TEvPrivate::TEvMetadataAccessorsInfo::TPtr& ev, const TActorContext& ctx);
249-
253+
250254
void Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& ev, const TActorContext& ctx);
251255

252256
void Handle(TEvPrivate::TEvScanStats::TPtr& ev, const TActorContext& ctx);

ydb/core/tx/columnshard/engines/portions/data_accessor.cpp

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,106 @@ std::vector<const TColumnRecord*> TPortionDataAccessor::GetColumnChunksPointers(
350350
return result;
351351
}
352352

353+
std::vector<TPortionDataAccessor::TReadPage> TPortionDataAccessor::BuildReadPages(const ui64 memoryLimit, const std::set<ui32>& entityIds) const {
354+
class TEntityDelimiter {
355+
private:
356+
YDB_READONLY(ui32, IndexStart, 0);
357+
YDB_READONLY(ui32, EntityId, 0);
358+
YDB_READONLY(ui32, ChunkIdx, 0);
359+
YDB_READONLY(ui64, MemoryStartChunk, 0);
360+
YDB_READONLY(ui64, MemoryFinishChunk, 0);
361+
362+
public:
363+
TEntityDelimiter(const ui32 indexStart, const ui32 entityId, const ui32 chunkIdx, const ui64 memStartChunk, const ui64 memFinishChunk)
364+
: IndexStart(indexStart)
365+
, EntityId(entityId)
366+
, ChunkIdx(chunkIdx)
367+
, MemoryStartChunk(memStartChunk)
368+
, MemoryFinishChunk(memFinishChunk) {
369+
}
370+
371+
bool operator<(const TEntityDelimiter& item) const {
372+
return std::tie(IndexStart, EntityId, ChunkIdx) < std::tie(item.IndexStart, item.EntityId, item.ChunkIdx);
373+
}
374+
};
375+
376+
class TGlobalDelimiter {
377+
private:
378+
YDB_READONLY(ui32, IndexStart, 0);
379+
YDB_ACCESSOR(ui64, UsedMemory, 0);
380+
YDB_ACCESSOR(ui64, WholeChunksMemory, 0);
381+
382+
public:
383+
TGlobalDelimiter(const ui32 indexStart)
384+
: IndexStart(indexStart) {
385+
}
386+
};
387+
388+
std::vector<TEntityDelimiter> delimiters;
389+
390+
ui32 lastAppliedId = 0;
391+
ui32 currentRecordIdx = 0;
392+
bool needOne = false;
393+
const TColumnRecord* lastRecord = nullptr;
394+
for (auto&& i : GetRecordsVerified()) {
395+
if (lastAppliedId != i.GetEntityId()) {
396+
if (delimiters.size()) {
397+
AFL_VERIFY(delimiters.back().GetIndexStart() == PortionInfo->GetRecordsCount());
398+
}
399+
needOne = entityIds.contains(i.GetEntityId());
400+
currentRecordIdx = 0;
401+
lastAppliedId = i.GetEntityId();
402+
lastRecord = nullptr;
403+
}
404+
if (!needOne) {
405+
continue;
406+
}
407+
delimiters.emplace_back(
408+
currentRecordIdx, i.GetEntityId(), i.GetChunkIdx(), i.GetMeta().GetRawBytes(), lastRecord ? lastRecord->GetMeta().GetRawBytes() : 0);
409+
currentRecordIdx += i.GetMeta().GetRecordsCount();
410+
if (currentRecordIdx == PortionInfo->GetRecordsCount()) {
411+
delimiters.emplace_back(currentRecordIdx, i.GetEntityId(), i.GetChunkIdx() + 1, 0, i.GetMeta().GetRawBytes());
412+
}
413+
lastRecord = &i;
414+
}
415+
std::sort(delimiters.begin(), delimiters.end());
416+
std::vector<TGlobalDelimiter> sumDelimiters;
417+
for (auto&& i : delimiters) {
418+
if (sumDelimiters.empty()) {
419+
sumDelimiters.emplace_back(i.GetIndexStart());
420+
} else if (sumDelimiters.back().GetIndexStart() != i.GetIndexStart()) {
421+
AFL_VERIFY(sumDelimiters.back().GetIndexStart() < i.GetIndexStart());
422+
TGlobalDelimiter backDelimiter(i.GetIndexStart());
423+
backDelimiter.MutableWholeChunksMemory() = sumDelimiters.back().GetWholeChunksMemory();
424+
backDelimiter.MutableUsedMemory() = sumDelimiters.back().GetUsedMemory();
425+
sumDelimiters.emplace_back(std::move(backDelimiter));
426+
}
427+
sumDelimiters.back().MutableWholeChunksMemory() += i.GetMemoryFinishChunk();
428+
sumDelimiters.back().MutableUsedMemory() += i.GetMemoryStartChunk();
429+
}
430+
std::vector<ui32> recordIdx = { 0 };
431+
std::vector<ui64> packMemorySize;
432+
const TGlobalDelimiter* lastBorder = &sumDelimiters.front();
433+
for (auto&& i : sumDelimiters) {
434+
const i64 sumMemory = (i64)i.GetUsedMemory() - (i64)lastBorder->GetWholeChunksMemory();
435+
AFL_VERIFY(sumMemory > 0);
436+
if (((ui64)sumMemory >= memoryLimit || i.GetIndexStart() == PortionInfo->GetRecordsCount()) && i.GetIndexStart()) {
437+
AFL_VERIFY(lastBorder->GetIndexStart() < i.GetIndexStart());
438+
recordIdx.emplace_back(i.GetIndexStart());
439+
packMemorySize.emplace_back(sumMemory);
440+
lastBorder = &i;
441+
}
442+
}
443+
AFL_VERIFY(recordIdx.front() == 0);
444+
AFL_VERIFY(recordIdx.back() == PortionInfo->GetRecordsCount());
445+
AFL_VERIFY(recordIdx.size() == packMemorySize.size() + 1);
446+
std::vector<TReadPage> pages;
447+
for (ui32 i = 0; i < packMemorySize.size(); ++i) {
448+
pages.emplace_back(recordIdx[i], recordIdx[i + 1] - recordIdx[i], packMemorySize[i]);
449+
}
450+
return pages;
451+
}
452+
353453
std::vector<TPortionDataAccessor::TPage> TPortionDataAccessor::BuildPages() const {
354454
std::vector<TPage> pages;
355455
struct TPart {

ydb/core/tx/columnshard/engines/portions/data_accessor.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,23 @@ class TPortionDataAccessor {
436436

437437
std::vector<TPage> BuildPages() const;
438438
ui64 GetMinMemoryForReadColumns(const std::optional<std::set<ui32>>& columnIds) const;
439+
440+
class TReadPage {
441+
private:
442+
YDB_READONLY(ui32, IndexStart, 0);
443+
YDB_READONLY(ui32, RecordsCount, 0);
444+
YDB_READONLY(ui64, MemoryUsage, 0);
445+
446+
public:
447+
TReadPage(const ui32 indexStart, const ui32 recordsCount, const ui64 memoryUsage)
448+
: IndexStart(indexStart)
449+
, RecordsCount(recordsCount)
450+
, MemoryUsage(memoryUsage) {
451+
AFL_VERIFY(RecordsCount);
452+
}
453+
};
454+
455+
std::vector<TReadPage> BuildReadPages(const ui64 memoryLimit, const std::set<ui32>& entityIds) const;
439456
};
440457

441458
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/reader/abstract/constructor.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
#include "constructor.h"
2+
3+
#include <ydb/core/protos/kqp.pb.h>
24
#include <ydb/core/tx/columnshard/engines/reader/sys_view/abstract/policy.h>
35
#include <ydb/core/tx/program/program.h>
46

57
namespace NKikimr::NOlap::NReader {
68

7-
NKikimr::TConclusionStatus IScannerConstructor::ParseProgram(const TVersionedIndex* vIndex,
8-
const NKikimrSchemeOp::EOlapProgramType programType, const TString& serializedProgram, TReadDescription& read, const IColumnResolver& columnResolver) const {
9+
NKikimr::TConclusionStatus IScannerConstructor::ParseProgram(const TVersionedIndex* vIndex, const NKikimrSchemeOp::EOlapProgramType programType,
10+
const TString& serializedProgram, TReadDescription& read, const IColumnResolver& columnResolver) const {
911
AFL_VERIFY(!read.ColumnIds.size() || !read.ColumnNames.size());
1012
std::vector<TString> names;
1113
std::set<TString> namesChecker;
@@ -47,7 +49,8 @@ NKikimr::TConclusionStatus IScannerConstructor::ParseProgram(const TVersionedInd
4749
}
4850

4951
const auto getDiffColumnsMessage = [&]() {
50-
return TStringBuilder() << "ssa program has different columns with kqp request: kqp_columns=" << JoinSeq(",", namesChecker) << " vs program_columns=" << JoinSeq(",", programColumns);
52+
return TStringBuilder() << "ssa program has different columns with kqp request: kqp_columns=" << JoinSeq(",", namesChecker)
53+
<< " vs program_columns=" << JoinSeq(",", programColumns);
5154
};
5255

5356
if (namesChecker.size() != programColumns.size()) {
@@ -66,7 +69,8 @@ NKikimr::TConclusionStatus IScannerConstructor::ParseProgram(const TVersionedInd
6669
}
6770
}
6871

69-
NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> IScannerConstructor::BuildReadMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const {
72+
NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> IScannerConstructor::BuildReadMetadata(
73+
const NColumnShard::TColumnShard* self, const TReadDescription& read) const {
7074
TConclusion<std::shared_ptr<TReadMetadataBase>> result = DoBuildReadMetadata(self, read);
7175
if (result.IsFail()) {
7276
return result;
@@ -78,4 +82,4 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> IScannerConstructor::Bu
7882
}
7983
}
8084

81-
}
85+
} // namespace NKikimr::NOlap::NReader

ydb/core/tx/columnshard/engines/reader/abstract/constructor.h

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,22 @@
88

99
namespace NKikimr::NOlap::NReader {
1010

11+
class TScannerConstructorContext {
12+
private:
13+
YDB_READONLY(TSnapshot, Snapshot, TSnapshot::Zero());
14+
YDB_READONLY(ui32, ItemsLimit, 0);
15+
YDB_READONLY(bool, Reverse, false);
16+
17+
public:
18+
TScannerConstructorContext(const TSnapshot& snapshot, const ui32 itemsLimit, const bool reverse)
19+
: Snapshot(snapshot)
20+
, ItemsLimit(itemsLimit)
21+
, Reverse(reverse)
22+
{
23+
24+
}
25+
};
26+
1127
class IScannerConstructor {
1228
protected:
1329
const TSnapshot Snapshot;
@@ -18,12 +34,13 @@ class IScannerConstructor {
1834
private:
1935
virtual TConclusion<std::shared_ptr<TReadMetadataBase>> DoBuildReadMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const = 0;
2036
public:
37+
using TFactory = NObjectFactory::TParametrizedObjectFactory<IScannerConstructor, TString, TScannerConstructorContext>;
2138
virtual ~IScannerConstructor() = default;
2239

23-
IScannerConstructor(const TSnapshot& snapshot, const ui64 itemsLimit, const bool reverse)
24-
: Snapshot(snapshot)
25-
, ItemsLimit(itemsLimit)
26-
, IsReverse(reverse)
40+
IScannerConstructor(const TScannerConstructorContext& context)
41+
: Snapshot(context.GetSnapshot())
42+
, ItemsLimit(context.GetItemsLimit())
43+
, IsReverse(context.GetReverse())
2744
{
2845

2946
}

ydb/core/tx/columnshard/engines/reader/abstract/read_context.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class TReadContext {
5353
const TActorId ReadCoordinatorActorId;
5454
const TComputeShardingPolicy ComputeShardingPolicy;
5555
TAtomic AbortFlag = 0;
56-
5756
public:
5857
template <class T>
5958
std::shared_ptr<const T> GetReadMetadataPtrVerifiedAs() const {
@@ -62,6 +61,10 @@ class TReadContext {
6261
return result;
6362
}
6463

64+
const std::shared_ptr<IScanCursor>& GetScanCursor() const {
65+
return ReadMetadata->GetScanCursor();
66+
}
67+
6568
void AbortWithError(const TString& errorMessage) {
6669
if (AtomicCas(&AbortFlag, 1, 0)) {
6770
NActors::TActivationContext::Send(

ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ struct TReadMetadataBase {
4545
std::shared_ptr<TVersionedIndex> IndexVersionsPointer;
4646
TSnapshot RequestSnapshot;
4747
std::optional<TGranuleShardingInfo> RequestShardingInfo;
48+
std::shared_ptr<IScanCursor> ScanCursor;
4849
virtual void DoOnReadFinished(NColumnShard::TColumnShard& /*owner*/) const {
4950
}
5051
virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& /*owner*/) const {
@@ -68,6 +69,10 @@ struct TReadMetadataBase {
6869
return TxId;
6970
}
7071

72+
const std::shared_ptr<IScanCursor>& GetScanCursor() const {
73+
return ScanCursor;
74+
}
75+
7176
std::optional<ui64> GetLockId() const {
7277
return LockId;
7378
}
@@ -135,12 +140,14 @@ struct TReadMetadataBase {
135140
}
136141

137142
TReadMetadataBase(const std::shared_ptr<TVersionedIndex> index, const ESorting sorting, const TProgramContainer& ssaProgram,
138-
const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot)
143+
const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot, const std::shared_ptr<IScanCursor>& scanCursor)
139144
: Sorting(sorting)
140145
, Program(ssaProgram)
141146
, IndexVersionsPointer(index)
142147
, RequestSnapshot(requestSnapshot)
143-
, ResultIndexSchema(schema) {
148+
, ScanCursor(scanCursor)
149+
, ResultIndexSchema(schema)
150+
{
144151
}
145152
virtual ~TReadMetadataBase() = default;
146153

0 commit comments

Comments
 (0)