Skip to content

Commit a4155b3

Browse files
fixes
1 parent 704f069 commit a4155b3

File tree

17 files changed

+73
-26
lines changed

17 files changed

+73
-26
lines changed

ydb/core/formats/arrow/common/container.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const TTable
173173
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const TTableConstructionContext& context) const {
174174
auto result = BuildTableOptional(context);
175175
AFL_VERIFY(result);
176-
AFL_VERIFY(!context->GetColumnNames() || result->schema()->num_fields() == (i32)context->GetColumnNames()->size());
176+
AFL_VERIFY(!context.GetColumnNames() || result->schema()->num_fields() == (i32)context.GetColumnNames()->size());
177177
return result;
178178
}
179179

ydb/core/formats/arrow/common/container.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,11 @@ class TGeneralContainer {
7171
public:
7272
TTableConstructionContext() = default;
7373
TTableConstructionContext(std::set<std::string>&& columnNames)
74-
: ColumnNames(std::move(columnNames))
75-
{
74+
: ColumnNames(std::move(columnNames)) {
75+
}
7676

77+
TTableConstructionContext(const std::set<std::string>& columnNames)
78+
: ColumnNames(columnNames) {
7779
}
7880
};
7981

ydb/core/tx/columnshard/engines/reader/abstract/read_context.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class TReadContext {
5353
const TActorId ReadCoordinatorActorId;
5454
const TComputeShardingPolicy ComputeShardingPolicy;
5555
TAtomic AbortFlag = 0;
56-
5756
public:
5857
template <class T>
5958
std::shared_ptr<const T> GetReadMetadataPtrVerifiedAs() const {
@@ -62,6 +61,10 @@ class TReadContext {
6261
return result;
6362
}
6463

64+
const std::shared_ptr<IScanCursor>& GetScanCursor() const {
65+
return ReadMetadata->GetScanCursor();
66+
}
67+
6568
void AbortWithError(const TString& errorMessage) {
6669
if (AtomicCas(&AbortFlag, 1, 0)) {
6770
NActors::TActivationContext::Send(

ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ struct TReadMetadataBase {
4545
std::shared_ptr<TVersionedIndex> IndexVersionsPointer;
4646
TSnapshot RequestSnapshot;
4747
std::optional<TGranuleShardingInfo> RequestShardingInfo;
48+
std::shared_ptr<IScanCursor> ScanCursor;
4849
virtual void DoOnReadFinished(NColumnShard::TColumnShard& /*owner*/) const {
4950
}
5051
virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& /*owner*/) const {
@@ -68,6 +69,10 @@ struct TReadMetadataBase {
6869
return TxId;
6970
}
7071

72+
const std::shared_ptr<IScanCursor>& GetScanCursor() const {
73+
return ScanCursor;
74+
}
75+
7176
std::optional<ui64> GetLockId() const {
7277
return LockId;
7378
}
@@ -135,12 +140,14 @@ struct TReadMetadataBase {
135140
}
136141

137142
TReadMetadataBase(const std::shared_ptr<TVersionedIndex> index, const ESorting sorting, const TProgramContainer& ssaProgram,
138-
const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot)
143+
const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot, const std::shared_ptr<IScanCursor>& scanCursor)
139144
: Sorting(sorting)
140145
, Program(ssaProgram)
141146
, IndexVersionsPointer(index)
142147
, RequestSnapshot(requestSnapshot)
143-
, ResultIndexSchema(schema) {
148+
, ScanCursor(scanCursor)
149+
, ResultIndexSchema(schema)
150+
{
144151
}
145152
virtual ~TReadMetadataBase() = default;
146153

ydb/core/tx/columnshard/engines/reader/common/description.h

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,31 @@
66
#include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h>
77
namespace NKikimr::NOlap::NReader {
88

9+
class IScanCursor {
10+
public:
11+
};
12+
13+
class TSimpleScanCursor: public IScanCursor {
14+
private:
15+
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, PrimaryKey);
16+
YDB_READONLY(ui64, PortionId, 0);
17+
YDB_READONLY(ui32, RecordIndex, 0);
18+
19+
public:
20+
TSimpleScanCursor(const std::shared_ptr<arrow::RecordBatch>& pk, const ui64 portionId, const ui32 recordIndex)
21+
: PrimaryKey(pk)
22+
, PortionId(portionId)
23+
, RecordIndex(recordIndex) {
24+
}
25+
};
26+
927
// Describes read/scan request
1028
struct TReadDescription {
1129
private:
1230
TSnapshot Snapshot;
1331
TProgramContainer Program;
32+
std::shared_ptr<IScanCursor> ScanCursor;
33+
1434
public:
1535
// Table
1636
ui64 TxId = 0;
@@ -27,7 +47,17 @@ struct TReadDescription {
2747
// List of columns
2848
std::vector<ui32> ColumnIds;
2949
std::vector<TString> ColumnNames;
30-
50+
51+
const std::shared_ptr<IScanCursor>& GetScanCursor() const {
52+
AFL_VERIFY(ScanCursor);
53+
return ScanCursor;
54+
}
55+
56+
void SetScanCursor(const std::shared_ptr<IScanCursor>& cursor) {
57+
AFL_VERIFY(!ScanCursor);
58+
ScanCursor = cursor;
59+
}
60+
3161
TReadDescription(const TSnapshot& snapshot, const bool isReverse)
3262
: Snapshot(snapshot)
3363
, PKRangesFilter(std::make_shared<NOlap::TPKRangesFilter>(isReverse)) {

ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructo
3535
TDataStorageAccessor dataAccessor(insertTable, index);
3636
AFL_VERIFY(read.PathId);
3737
auto readMetadata = std::make_shared<TReadMetadata>(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(),
38-
IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram());
38+
IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), nullptr);
3939

4040
auto initResult = readMetadata->Init(self, read, dataAccessor);
4141
if (!initResult) {

ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,11 @@ struct TReadMetadata : public TReadMetadataBase {
115115
std::vector<TCommittedBlob> CommittedBlobs;
116116
std::shared_ptr<TReadStats> ReadStats;
117117

118-
TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting, const TProgramContainer& ssaProgram)
119-
: TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot)
118+
TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting,
119+
const TProgramContainer& ssaProgram, const std::shared_ptr<IScanCursor>& scanCursor)
120+
: TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor)
120121
, PathId(pathId)
121-
, ReadStats(std::make_shared<TReadStats>())
122-
{
122+
, ReadStats(std::make_shared<TReadStats>()) {
123123
}
124124

125125
virtual std::vector<TNameTypeInfo> GetKeyYqlSchema() const override {

ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructo
3535
TDataStorageAccessor dataAccessor(insertTable, index);
3636
AFL_VERIFY(read.PathId);
3737
auto readMetadata = std::make_shared<TReadMetadata>(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(),
38-
IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram());
38+
IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), read.GetScanCursor());
3939

4040
auto initResult = readMetadata->Init(self, read, dataAccessor);
4141
if (!initResult) {

ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,9 @@ struct TReadMetadata : public TReadMetadataBase {
114114
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
115115
std::shared_ptr<TReadStats> ReadStats;
116116

117-
TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting, const TProgramContainer& ssaProgram)
118-
: TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot)
117+
TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting,
118+
const TProgramContainer& ssaProgram, const std::shared_ptr<IScanCursor>& scanCursor)
119+
: TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor)
119120
, PathId(pathId)
120121
, ReadStats(std::make_shared<TReadStats>())
121122
{

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
181181
acc.AddFetchingStep(*result, *FFColumns, EStageFeaturesIndexes::Fetching);
182182
acc.AddAssembleStep(*result, *FFColumns, "LAST", EStageFeaturesIndexes::Fetching, false);
183183
}
184-
result->AddStep<TPrepareResult>();
184+
result->AddStep<TPrepareResultStep>();
185185
return result;
186186
}
187187

0 commit comments

Comments
 (0)