Skip to content

Commit f0046d9

Browse files
fixes
1 parent db5fc18 commit f0046d9

File tree

4 files changed

+48
-7
lines changed

4 files changed

+48
-7
lines changed

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
1818
Context->GetCommonContext()->GetCounters().OnSourceFinished(
1919
source->GetRecordsCount(), source->GetUsedRawBytes(), tableExt ? tableExt->num_rows() : 0);
2020

21-
if (/*(!tableExt || !tableExt->num_rows()) &&*/ Context->GetCommonContext()->GetReadMetadata()->HasLimit() && InFlightLimit < MaxInFlight) {
22-
InFlightLimit = 2 * InFlightLimit;
23-
}
2421
source->MutableStageResult().SetResultChunk(std::move(tableExt), startIndex, recordsCount);
2522
while (FetchingSources.size()) {
2623
auto frontSource = FetchingSources.front();
@@ -60,11 +57,17 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
6057
FetchingSources.pop_front();
6158
frontSource->ClearResult();
6259
if (Context->GetCommonContext()->GetReadMetadata()->HasLimit() && SortedSources.size() && frontSource->GetResultRecordsCount()) {
63-
FinishedSources.emplace(frontSource);
60+
AFL_VERIFY(FetchingInFlightSources.erase(frontSource));
61+
AFL_VERIFY(FinishedSources.emplace(frontSource).second);
6462
while (FinishedSources.size() && (*FinishedSources.begin())->GetFinish() < SortedSources.front()->GetStart()) {
6563
auto finishedSource = *FinishedSources.begin();
64+
if (!finishedSource->GetResultRecordsCount() && Context->GetCommonContext()->GetReadMetadata()->HasLimit() &&
65+
InFlightLimit < MaxInFlight) {
66+
InFlightLimit = 2 * InFlightLimit;
67+
}
6668
FetchedCount += finishedSource->GetResultRecordsCount();
6769
FinishedSources.erase(FinishedSources.begin());
70+
--IntervalsInFlightCount;
6871
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "source_finished")("source_id", finishedSource->GetSourceId())(
6972
"source_idx", finishedSource->GetSourceIdx())("limit", Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust())(
7073
"fetched", finishedSource->GetResultRecordsCount());
@@ -118,14 +121,38 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
118121
if (!Context->IsActive()) {
119122
return false;
120123
}
124+
if (InFlightLimit <= IntervalsInFlightCount) {
125+
return false;
126+
}
121127
bool changed = false;
122-
while (SortedSources.size() && FetchingSources.size() < InFlightLimit) {
128+
ui32 inFlightCountLocal = 0;
129+
if (SortedSources.size()) {
130+
for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) {
131+
if ((*it)->GetFinish() < SortedSources.front()->GetStart()) {
132+
++inFlightCountLocal;
133+
}
134+
}
135+
}
136+
AFL_VERIFY(IntervalsInFlightCount == inFlightCountLocal)("count_global", IntervalsInFlightCount)("count_local", inFlightCountLocal);
137+
while (SortedSources.size() && inFlightCountLocal < InFlightLimit) {
123138
SortedSources.front()->StartProcessing(SortedSources.front());
124139
FetchingSources.emplace_back(SortedSources.front());
125140
FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front());
141+
AFL_VERIFY(FetchingInFlightSources.emplace(SortedSources.front()).second);
126142
SortedSources.pop_front();
143+
if (SortedSources.size()) {
144+
ui32 inFlightCountLocalNew = 0;
145+
for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) {
146+
if ((*it)->GetFinish() < SortedSources.front()->GetStart()) {
147+
++inFlightCountLocalNew;
148+
}
149+
}
150+
AFL_VERIFY(inFlightCountLocal <= inFlightCountLocalNew);
151+
inFlightCountLocal = inFlightCountLocalNew;
152+
}
127153
changed = true;
128154
}
155+
IntervalsInFlightCount = inFlightCountLocal;
129156
return changed;
130157
}
131158

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class TScanHead {
2929
std::deque<std::shared_ptr<IDataSource>> SortedSources;
3030
std::deque<std::shared_ptr<IDataSource>> FetchingSources;
3131
std::set<std::shared_ptr<IDataSource>, IDataSource::TCompareFinishForScanSequence> FinishedSources;
32+
std::set<std::shared_ptr<IDataSource>, IDataSource::TCompareFinishForScanSequence> FetchingInFlightSources;
33+
TPositiveControlInteger IntervalsInFlightCount;
3234
ui64 FetchedCount = 0;
3335
ui64 InFlightLimit = 1;
3436
ui64 MaxInFlight = 256;

ydb/library/accessor/positive_integer.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,10 @@ ui64 TPositiveControlInteger::Val() const {
3535
return Value;
3636
}
3737

38-
}
38+
}
39+
40+
template<>
41+
void Out<NKikimr::TPositiveControlInteger>(IOutputStream& o,
42+
typename TTypeTraits<NKikimr::TPositiveControlInteger>::TFuncParam x) {
43+
o << x.Val();
44+
}

ydb/library/accessor/positive_integer.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#pragma once
22
#include <util/system/types.h>
3+
#include <util/stream/output.h>
4+
#include <util/generic/typetraits.h>
35

46
namespace NKikimr {
57

@@ -11,6 +13,10 @@ class TPositiveControlInteger {
1113
TPositiveControlInteger(const ui64 value)
1214
: Value(value) {
1315

16+
}
17+
TPositiveControlInteger(const ui32 value)
18+
: Value(value) {
19+
1420
}
1521
TPositiveControlInteger(const i64 value);
1622
ui64 Add(const ui64 value);
@@ -37,4 +43,4 @@ class TPositiveControlInteger {
3743
}
3844
};
3945

40-
}
46+
}

0 commit comments

Comments
 (0)