Skip to content

Commit 977805d

Browse files
Merge 5eced34 into 1dd2b99
2 parents 1dd2b99 + 5eced34 commit 977805d

File tree

79 files changed

+4213
-249
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+4213
-249
lines changed

ydb/core/formats/arrow/common/container.cpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,27 +148,32 @@ std::shared_ptr<NKikimr::NArrow::TGeneralContainer> TGeneralContainer::BuildEmpt
148148
return std::make_shared<TGeneralContainer>(Schema, std::move(columns));
149149
}
150150

151-
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
151+
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const TTableConstructionContext& context) const {
152152
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
153153
std::vector<std::shared_ptr<arrow::Field>> fields;
154154
for (i32 i = 0; i < Schema->num_fields(); ++i) {
155-
if (columnNames && !columnNames->contains(Schema->field(i)->name())) {
155+
if (context.GetColumnNames() && !context.GetColumnNames()->contains(Schema->field(i)->name())) {
156156
continue;
157157
}
158-
columns.emplace_back(Columns[i]->GetChunkedArray());
158+
if (context.GetRecordsCount() || context.GetStartIndex()) {
159+
columns.emplace_back(Columns[i]->Slice(context.GetStartIndex().value_or(0),
160+
context.GetRecordsCount().value_or(GetRecordsCount() - context.GetStartIndex().value_or(0))));
161+
} else {
162+
columns.emplace_back(Columns[i]->GetChunkedArray());
163+
}
159164
fields.emplace_back(Schema->field(i));
160165
}
161166
if (fields.empty()) {
162167
return nullptr;
163168
}
164169
AFL_VERIFY(RecordsCount);
165-
return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns, *RecordsCount);
170+
return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns, context.GetRecordsCount().value_or(*RecordsCount));
166171
}
167172

168-
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
169-
auto result = BuildTableOptional(columnNames);
173+
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const TTableConstructionContext& context) const {
174+
auto result = BuildTableOptional(context);
170175
AFL_VERIFY(result);
171-
AFL_VERIFY(!columnNames || result->schema()->num_fields() == (i32)columnNames->size());
176+
AFL_VERIFY(!context.GetColumnNames() || result->schema()->num_fields() == (i32)context.GetColumnNames()->size());
172177
return result;
173178
}
174179

ydb/core/formats/arrow/common/container.h

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,29 @@ class TGeneralContainer {
6262
return Columns[idx];
6363
}
6464

65-
std::shared_ptr<arrow::Table> BuildTableVerified(const std::optional<std::set<std::string>>& columnNames = {}) const;
66-
std::shared_ptr<arrow::Table> BuildTableOptional(const std::optional<std::set<std::string>>& columnNames = {}) const;
65+
class TTableConstructionContext {
66+
private:
67+
YDB_ACCESSOR_DEF(std::optional<std::set<std::string>>, ColumnNames);
68+
YDB_ACCESSOR_DEF(std::optional<ui32>, StartIndex);
69+
YDB_ACCESSOR_DEF(std::optional<ui32>, RecordsCount);
70+
71+
public:
72+
TTableConstructionContext() = default;
73+
TTableConstructionContext(std::set<std::string>&& columnNames)
74+
: ColumnNames(std::move(columnNames)) {
75+
}
76+
77+
TTableConstructionContext(const std::set<std::string>& columnNames)
78+
: ColumnNames(columnNames) {
79+
}
80+
81+
void SetColumnNames(const std::vector<TString>& names) {
82+
ColumnNames = std::set<std::string>(names.begin(), names.end());
83+
}
84+
};
85+
86+
std::shared_ptr<arrow::Table> BuildTableVerified(const TTableConstructionContext& context = Default<TTableConstructionContext>()) const;
87+
std::shared_ptr<arrow::Table> BuildTableOptional(const TTableConstructionContext& context = Default<TTableConstructionContext>()) const;
6788

6889
std::shared_ptr<TGeneralContainer> BuildEmptySame() const;
6990

ydb/core/kqp/compute_actor/kqp_compute_events.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ struct TEvScanData: public NActors::TEventLocal<TEvScanData, TKqpComputeEvents::
5353
std::vector<std::vector<ui32>> SplittedBatches;
5454

5555
TOwnedCellVec LastKey;
56+
NKikimrKqp::TEvKqpScanCursor LastCursorProto;
5657
TDuration CpuTime;
5758
TDuration WaitTime;
5859
ui32 PageFaults = 0; // number of page faults occurred when filling in this message
@@ -120,6 +121,7 @@ struct TEvScanData: public NActors::TEventLocal<TEvScanData, TKqpComputeEvents::
120121
ev->Finished = pbEv->Record.GetFinished();
121122
ev->RequestedBytesLimitReached = pbEv->Record.GetRequestedBytesLimitReached();
122123
ev->LastKey = TOwnedCellVec(TSerializedCellVec(pbEv->Record.GetLastKey()).GetCells());
124+
ev->LastCursorProto = pbEv->Record.GetLastCursor();
123125
if (pbEv->Record.HasAvailablePacks()) {
124126
ev->AvailablePacks = pbEv->Record.GetAvailablePacks();
125127
}
@@ -153,6 +155,7 @@ struct TEvScanData: public NActors::TEventLocal<TEvScanData, TKqpComputeEvents::
153155
Remote->Record.SetPageFaults(PageFaults);
154156
Remote->Record.SetPageFault(PageFault);
155157
Remote->Record.SetLastKey(TSerializedCellVec::Serialize(LastKey));
158+
*Remote->Record.MutableLastCursor() = LastCursorProto;
156159
if (AvailablePacks) {
157160
Remote->Record.SetAvailablePacks(*AvailablePacks);
158161
}

ydb/core/kqp/compute_actor/kqp_compute_state.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ struct TShardState: public TCommonRetriesState {
3838
bool SubscribedOnTablet = false;
3939
TActorId ActorId;
4040
TOwnedCellVec LastKey;
41+
std::optional<NKikimrKqp::TEvKqpScanCursor> LastCursorProto;
4142
std::optional<ui32> AvailablePacks;
4243

4344
TString PrintLastKey(TConstArrayRef<NScheme::TTypeInfo> keyTypes) const;

ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ namespace NKikimr::NKqp::NScanPrivate {
1414

1515
class IExternalObjectsProvider {
1616
public:
17-
virtual std::unique_ptr<TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges) const = 0;
17+
virtual std::unique_ptr<TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges,
18+
const std::optional<NKikimrKqp::TEvKqpScanCursor>& cursor) const = 0;
1819
virtual const TVector<NScheme::TTypeInfo>& GetKeyColumnTypes() const = 0;
1920
};
2021

@@ -61,7 +62,7 @@ class TShardScannerInfo {
6162

6263
const auto& keyColumnTypes = externalObjectsProvider.GetKeyColumnTypes();
6364
auto ranges = state.GetScanRanges(keyColumnTypes);
64-
auto ev = externalObjectsProvider.BuildEvKqpScan(ScanId, Generation, ranges);
65+
auto ev = externalObjectsProvider.BuildEvKqpScan(ScanId, Generation, ranges, state.LastCursorProto);
6566

6667
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "start_scanner")("tablet_id", TabletId)("generation", Generation)
6768
("info", state.ToString(keyColumnTypes))("range", DebugPrintRanges(keyColumnTypes, ranges, *AppData()->TypeRegistry))

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps
3737
, ShardsScanningPolicy(shardsScanningPolicy)
3838
, Counters(counters)
3939
, InFlightShards(ScanId, *this)
40-
, InFlightComputes(ComputeActorIds)
41-
{
40+
, InFlightComputes(ComputeActorIds) {
4241
Y_UNUSED(traceId);
4342
AFL_ENSURE(!Meta.GetReads().empty());
4443
AFL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView);
@@ -47,7 +46,7 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps
4746
for (size_t i = 0; i < Meta.KeyColumnTypesSize(); i++) {
4847
NScheme::TTypeId typeId = Meta.GetKeyColumnTypes().at(i);
4948
NScheme::TTypeInfo typeInfo = NScheme::NTypeIds::IsParametrizedType(typeId) ?
50-
NScheme::TypeInfoFromProto(typeId,Meta.GetKeyColumnTypeInfos().at(i)) :
49+
NScheme::TypeInfoFromProto(typeId, Meta.GetKeyColumnTypeInfos().at(i)) :
5150
NScheme::TTypeInfo(typeId);
5251
KeyColumnTypes.push_back(typeInfo);
5352
}
@@ -127,19 +126,19 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) {
127126
("ScanId", ev->Get()->ScanId)
128127
("Finished", ev->Get()->Finished)
129128
("Lock", [&]() {
130-
TStringBuilder builder;
131-
for (const auto& lock : ev->Get()->LocksInfo.Locks) {
132-
builder << lock.ShortDebugString();
133-
}
134-
return builder;
135-
}())
129+
TStringBuilder builder;
130+
for (const auto& lock : ev->Get()->LocksInfo.Locks) {
131+
builder << lock.ShortDebugString();
132+
}
133+
return builder;
134+
}())
136135
("BrokenLocks", [&]() {
137-
TStringBuilder builder;
138-
for (const auto& lock : ev->Get()->LocksInfo.BrokenLocks) {
139-
builder << lock.ShortDebugString();
140-
}
141-
return builder;
142-
}());
136+
TStringBuilder builder;
137+
for (const auto& lock : ev->Get()->LocksInfo.BrokenLocks) {
138+
builder << lock.ShortDebugString();
139+
}
140+
return builder;
141+
}());
143142

144143
TInstant startTime = TActivationContext::Now();
145144
if (ev->Get()->Finished) {
@@ -347,11 +346,12 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet
347346

348347
if (!state.LastKey.empty()) {
349348
PendingShards.front().LastKey = std::move(state.LastKey);
350-
while(!PendingShards.empty() && PendingShards.front().GetScanRanges(KeyColumnTypes).empty()) {
349+
while (!PendingShards.empty() && PendingShards.front().GetScanRanges(KeyColumnTypes).empty()) {
351350
CA_LOG_D("Nothing to read " << PendingShards.front().ToString(KeyColumnTypes));
352351
auto readShard = std::move(PendingShards.front());
353352
PendingShards.pop_front();
354353
PendingShards.front().LastKey = std::move(readShard.LastKey);
354+
PendingShards.front().LastCursorProto = std::move(readShard.LastCursorProto);
355355
}
356356

357357
AFL_ENSURE(!PendingShards.empty());
@@ -409,7 +409,8 @@ bool TKqpScanFetcherActor::SendScanFinished() {
409409
return true;
410410
}
411411

412-
std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges) const {
412+
std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEvKqpScan(const ui32 scanId, const ui32 gen,
413+
const TSmallVec<TSerializedTableRange>& ranges, const std::optional<NKikimrKqp::TEvKqpScanCursor>& cursor) const {
413414
auto ev = std::make_unique<TEvDataShard::TEvKqpScan>();
414415
ev->Record.SetLocalPathId(ScanDataMeta.TableId.PathId.LocalPathId);
415416
for (auto& column : ScanDataMeta.GetColumns()) {
@@ -423,6 +424,9 @@ std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEv
423424
}
424425
}
425426
ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys());
427+
if (cursor) {
428+
*ev->Record.MutableScanCursor() = *cursor;
429+
}
426430

427431
auto protoRanges = ev->Record.MutableRanges();
428432
protoRanges->Reserve(ranges.size());
@@ -489,10 +493,11 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData
489493
AFL_ENSURE(state->ActorId == ev->Sender)("expected", state->ActorId)("got", ev->Sender);
490494

491495
state->LastKey = std::move(msg.LastKey);
496+
state->LastCursorProto = std::move(msg.LastCursorProto);
492497
const ui64 rowsCount = msg.GetRowsCount();
493498
AFL_ENSURE(!LockTxId || !msg.LocksInfo.Locks.empty() || !msg.LocksInfo.BrokenLocks.empty());
494499
AFL_ENSURE(LockTxId || (msg.LocksInfo.Locks.empty() && msg.LocksInfo.BrokenLocks.empty()));
495-
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("action","got EvScanData")("rows", rowsCount)("finished", msg.Finished)("exceeded", msg.RequestedBytesLimitReached)
500+
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("action", "got EvScanData")("rows", rowsCount)("finished", msg.Finished)("exceeded", msg.RequestedBytesLimitReached)
496501
("scan", ScanId)("packs_to_send", InFlightComputes.GetPacksToSendCount())
497502
("from", ev->Sender)("shards remain", PendingShards.size())
498503
("in flight scans", InFlightShards.GetScansCount())

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped<TKqpScanFetcherAc
108108

109109
bool SendScanFinished();
110110

111-
virtual std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges) const override;
111+
virtual std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen,
112+
const TSmallVec<TSerializedTableRange>& ranges, const std::optional<NKikimrKqp::TEvKqpScanCursor>& cursor) const override;
112113
virtual const TVector<NScheme::TTypeInfo>& GetKeyColumnTypes() const override {
113114
return KeyColumnTypes;
114115
}

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1699,6 +1699,7 @@ message TColumnShardConfig {
16991699
optional bool ColumnChunksV0Usage = 25 [default = true];
17001700
optional bool ColumnChunksV1Usage = 26 [default = true];
17011701
optional uint64 MemoryLimitScanPortion = 27 [default = 100000000];
1702+
optional string ReaderClassName = 28;
17021703
}
17031704

17041705
message TSchemeShardConfig {

ydb/core/protos/kqp.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,19 @@ message TEvScanError {
642642
optional uint64 TabletId = 4;
643643
}
644644

645+
message TEvKqpScanCursor {
646+
message TColumnShardScanPlain {
647+
}
648+
message TColumnShardScanSimple {
649+
optional uint64 SourceId = 1;
650+
optional uint32 StartRecordIndex = 2;
651+
}
652+
oneof Implementation {
653+
TColumnShardScanPlain ColumnShardPlain = 10;
654+
TColumnShardScanSimple ColumnShardSimple = 11;
655+
}
656+
}
657+
645658
message TEvRemoteScanData {
646659
optional uint32 ScanId = 1;
647660
optional uint64 CpuTimeUs = 2;
@@ -665,6 +678,7 @@ message TEvRemoteScanData {
665678

666679
optional bool RequestedBytesLimitReached = 11 [default = false];
667680
optional uint32 AvailablePacks = 12;
681+
optional TEvKqpScanCursor LastCursor = 13;
668682
}
669683

670684
message TEvRemoteScanDataAck {

ydb/core/protos/tx_datashard.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1700,6 +1700,8 @@ message TEvKqpScan {
17001700
optional TComputeShardingPolicy ComputeShardingPolicy = 23;
17011701
optional uint64 LockTxId = 24;
17021702
optional uint32 LockNodeId = 25;
1703+
optional string CSScanPolicy = 26;
1704+
optional NKikimrKqp.TEvKqpScanCursor ScanCursor = 27;
17031705
}
17041706

17051707
message TEvCompactTable {

0 commit comments

Comments
 (0)