Skip to content

Commit fa9addb

Browse files
Merge c5888fe into e89b8d6
2 parents e89b8d6 + c5888fe commit fa9addb

File tree

16 files changed

+101
-51
lines changed

16 files changed

+101
-51
lines changed

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,7 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
526526
SetupLogLevelFromTestParam(NKikimrServices::KQP_BLOBS_STORAGE);
527527
SetupLogLevelFromTestParam(NKikimrServices::KQP_WORKLOAD_SERVICE);
528528
SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD);
529+
SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD_SCAN);
529530
SetupLogLevelFromTestParam(NKikimrServices::LOCAL_PGWIRE);
530531

531532
RunCall([this, domain = settings.DomainRoot]{

ydb/core/tx/columnshard/counters/scan.h

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,19 +291,40 @@ class TConcreteScanCounters: public TScanCounters {
291291
private:
292292
using TBase = TScanCounters;
293293
std::shared_ptr<TAtomicCounter> FetchAccessorsCount;
294+
std::shared_ptr<TAtomicCounter> FetchBlobsCount;
294295
std::shared_ptr<TAtomicCounter> MergeTasksCount;
295296
std::shared_ptr<TAtomicCounter> AssembleTasksCount;
296297
std::shared_ptr<TAtomicCounter> ReadTasksCount;
297298
std::shared_ptr<TAtomicCounter> ResourcesAllocationTasksCount;
298299
std::shared_ptr<TAtomicCounter> ResultsForSourceCount;
300+
std::shared_ptr<TAtomicCounter> ResultsForReplyGuard;
299301

300302
public:
301303
TScanAggregations Aggregations;
302304

305+
TString DebugString() const {
306+
return TStringBuilder() << "FetchAccessorsCount:" << FetchAccessorsCount->Val() << ";"
307+
<< "FetchBlobsCount:" << FetchBlobsCount->Val() << ";"
308+
<< "MergeTasksCount:" << MergeTasksCount->Val() << ";"
309+
<< "AssembleTasksCount:" << AssembleTasksCount->Val() << ";"
310+
<< "ReadTasksCount:" << ReadTasksCount->Val() << ";"
311+
<< "ResourcesAllocationTasksCount:" << ResourcesAllocationTasksCount->Val() << ";"
312+
<< "ResultsForSourceCount:" << ResultsForSourceCount->Val() << ";"
313+
<< "ResultsForReplyGuard:" << ResultsForReplyGuard->Val() << ";";
314+
}
315+
316+
TCounterGuard GetResultsForReplyGuard() const {
317+
return TCounterGuard(ResultsForReplyGuard);
318+
}
319+
303320
TCounterGuard GetFetcherAcessorsGuard() const {
304321
return TCounterGuard(FetchAccessorsCount);
305322
}
306323

324+
TCounterGuard GetFetchBlobsGuard() const {
325+
return TCounterGuard(FetchBlobsCount);
326+
}
327+
307328
TCounterGuard GetResultsForSourceGuard() const {
308329
return TCounterGuard(ResultsForSourceCount);
309330
}
@@ -326,7 +347,7 @@ class TConcreteScanCounters: public TScanCounters {
326347

327348
bool InWaiting() const {
328349
return MergeTasksCount->Val() || AssembleTasksCount->Val() || ReadTasksCount->Val() || ResourcesAllocationTasksCount->Val() ||
329-
FetchAccessorsCount->Val() || ResultsForSourceCount->Val();
350+
FetchAccessorsCount->Val() || ResultsForSourceCount->Val() || FetchBlobsCount->Val() || ResultsForReplyGuard->Val();
330351
}
331352

332353
void OnBlobsWaitDuration(const TDuration d, const TDuration fullScanDuration) const {
@@ -337,11 +358,13 @@ class TConcreteScanCounters: public TScanCounters {
337358
TConcreteScanCounters(const TScanCounters& counters)
338359
: TBase(counters)
339360
, FetchAccessorsCount(std::make_shared<TAtomicCounter>())
361+
, FetchBlobsCount(std::make_shared<TAtomicCounter>())
340362
, MergeTasksCount(std::make_shared<TAtomicCounter>())
341363
, AssembleTasksCount(std::make_shared<TAtomicCounter>())
342364
, ReadTasksCount(std::make_shared<TAtomicCounter>())
343365
, ResourcesAllocationTasksCount(std::make_shared<TAtomicCounter>())
344366
, ResultsForSourceCount(std::make_shared<TAtomicCounter>())
367+
, ResultsForReplyGuard(std::make_shared<TAtomicCounter>())
345368
, Aggregations(TBase::BuildAggregations())
346369
{
347370

ydb/core/tx/columnshard/engines/reader/abstract/read_context.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
66
#include <ydb/core/tx/columnshard/counters/scan.h>
77
#include <ydb/core/tx/columnshard/data_accessor/manager.h>
8-
#include <ydb/core/tx/columnshard/engines/reader/common/result.h>
98
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
109

1110
#include <ydb/library/accessor/accessor.h>
1211

1312
namespace NKikimr::NOlap::NReader {
1413

14+
class TPartialReadResult;
15+
1516
class TComputeShardingPolicy {
1617
private:
1718
YDB_READONLY(ui32, ShardsCount, 0);

ydb/core/tx/columnshard/engines/reader/actor/actor.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,9 @@ void TColumnShardScan::ContinueProcessing() {
304304
}
305305
}
306306
}
307-
// AFL_VERIFY(!ScanIterator || !ChunksLimiter.HasMore() || ScanCountersPool.InWaiting())("scan_actor_id", ScanActorId)("tx_id", TxId)("scan_id", ScanId)(
308-
// "gen", ScanGen)("tablet", TabletId)("debug", ScanIterator->DebugString());
307+
AFL_VERIFY(!ScanIterator || !ChunksLimiter.HasMore() || ScanCountersPool.InWaiting())("scan_actor_id", ScanActorId)("tx_id", TxId)(
308+
"scan_id", ScanId)("gen", ScanGen)("tablet", TabletId)("debug",
309+
ScanIterator->DebugString())("counters", ScanCountersPool.DebugString());
309310
}
310311

311312
void TColumnShardScan::MakeResult(size_t reserveRows /*= 0*/) {

ydb/core/tx/columnshard/engines/reader/common/result.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
#include "result.h"
22

3+
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
4+
35
namespace NKikimr::NOlap::NReader {
46

57
class TCurrentBatch {
68
private:
79
std::vector<std::shared_ptr<TPartialReadResult>> Results;
810
ui64 RecordsCount = 0;
11+
912
public:
1013
ui64 GetRecordsCount() const {
1114
return RecordsCount;
@@ -49,4 +52,18 @@ std::vector<std::shared_ptr<TPartialReadResult>> TPartialReadResult::SplitResult
4952
return result;
5053
}
5154

52-
}
55+
TPartialReadResult::TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards,
56+
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch,
57+
const std::shared_ptr<IScanCursor>& scanCursor, const std::shared_ptr<TReadContext>& context,
58+
const std::optional<ui32> notFinishedIntervalIdx)
59+
: ResourceGuards(resourceGuards)
60+
, GroupGuard(gGuard)
61+
, ResultBatch(batch)
62+
, ScanCursor(scanCursor)
63+
, NotFinishedIntervalIdx(notFinishedIntervalIdx)
64+
, Guard(TValidator::CheckNotNull(context)->GetCounters().GetResultsForReplyGuard()) {
65+
Y_ABORT_UNLESS(ResultBatch.GetRecordsCount());
66+
Y_ABORT_UNLESS(ScanCursor);
67+
}
68+
69+
} // namespace NKikimr::NOlap::NReader

ydb/core/tx/columnshard/engines/reader/common/result.h

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@
77
#include <ydb/core/tx/program/program.h>
88

99
#include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h>
10+
1011
namespace NKikimr::NOlap::NReader {
1112

13+
class TReadContext;
14+
1215
// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
1316
class TPartialReadResult: public TNonCopyable {
1417
private:
@@ -20,6 +23,7 @@ class TPartialReadResult: public TNonCopyable {
2023
// NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit
2124
std::shared_ptr<IScanCursor> ScanCursor;
2225
YDB_READONLY_DEF(std::optional<ui32>, NotFinishedIntervalIdx);
26+
const NColumnShard::TCounterGuard Guard;
2327

2428
public:
2529
void Cut(const ui32 limit) {
@@ -56,19 +60,12 @@ class TPartialReadResult: public TNonCopyable {
5660

5761
explicit TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards,
5862
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch,
59-
const std::shared_ptr<IScanCursor>& scanCursor, const std::optional<ui32> notFinishedIntervalIdx)
60-
: ResourceGuards(resourceGuards)
61-
, GroupGuard(gGuard)
62-
, ResultBatch(batch)
63-
, ScanCursor(scanCursor)
64-
, NotFinishedIntervalIdx(notFinishedIntervalIdx) {
65-
Y_ABORT_UNLESS(ResultBatch.GetRecordsCount());
66-
Y_ABORT_UNLESS(ScanCursor);
67-
}
63+
const std::shared_ptr<IScanCursor>& scanCursor, const std::shared_ptr<TReadContext>& context,
64+
const std::optional<ui32> notFinishedIntervalIdx);
6865

6966
explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, const std::shared_ptr<IScanCursor>& scanCursor,
70-
const std::optional<ui32> notFinishedIntervalIdx)
71-
: TPartialReadResult({}, nullptr, batch, scanCursor, notFinishedIntervalIdx) {
67+
const std::shared_ptr<TReadContext>& context, const std::optional<ui32> notFinishedIntervalIdx)
68+
: TPartialReadResult({}, nullptr, batch, scanCursor, context, notFinishedIntervalIdx) {
7269
}
7370
};
7471

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
#include "plain_read_data.h"
22

3+
#include <ydb/core/tx/columnshard/engines/reader/common/result.h>
4+
35
namespace NKikimr::NOlap::NReader::NPlain {
46

57
TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
68
: TBase(context)
7-
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context))
8-
{
9+
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context)) {
910
ui32 sourceIdx = 0;
1011
std::deque<std::shared_ptr<IDataSource>> sources;
1112
const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
@@ -28,8 +29,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
2829
if (GetReadMetadata()->IsMyUncommitted(i.GetInsertWriteId())) {
2930
continue;
3031
}
31-
if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirst()) ||
32-
GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLast())) {
32+
if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirst()) || GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLast())) {
3333
GetReadMetadata()->SetConflictedWriteId(i.GetInsertWriteId());
3434
}
3535
}
@@ -56,21 +56,21 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
5656
stats->CommittedPortionsBytes = committedPortionsBytes;
5757
stats->InsertedPortionsBytes = insertedPortionsBytes;
5858
stats->CompactedPortionsBytes = compactedPortionsBytes;
59-
6059
}
6160

6261
std::vector<std::shared_ptr<TPartialReadResult>> TPlainReadData::DoExtractReadyResults(const int64_t /*maxRowsInBatch*/) {
6362
auto result = std::move(PartialResults);
6463
PartialResults.clear();
65-
// auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch);
64+
// auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch);
6665
ui32 count = 0;
67-
for (auto&& r: result) {
66+
for (auto&& r : result) {
6867
count += r->GetRecordsCount();
6968
}
7069
AFL_VERIFY(count == ReadyResultsCount);
7170
ReadyResultsCount = 0;
7271

73-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractReadyResults")("result", result.size())("count", count)("finished", Scanner->IsFinished());
72+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractReadyResults")("result", result.size())("count", count)(
73+
"finished", Scanner->IsFinished());
7474
return result;
7575
}
7676

@@ -79,9 +79,9 @@ TConclusion<bool> TPlainReadData::DoReadNextInterval() {
7979
}
8080

8181
void TPlainReadData::OnIntervalResult(const std::shared_ptr<TPartialReadResult>& result) {
82-
// result->GetResourcesGuardOnly()->Update(result->GetMemorySize());
82+
// result->GetResourcesGuardOnly()->Update(result->GetMemorySize());
8383
ReadyResultsCount += result->GetRecordsCount();
8484
PartialResults.emplace_back(result);
8585
}
8686

87-
}
87+
} // namespace NKikimr::NOlap::NReader::NPlain

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "scanner.h"
33

44
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
5+
#include <ydb/core/tx/columnshard/engines/reader/common/result.h>
56

67
#include <ydb/library/actors/core/log.h>
78

@@ -29,7 +30,7 @@ void TScanHead::OnIntervalResult(std::shared_ptr<NGroupedMemoryManager::TAllocat
2930
}
3031
std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>> guards = { std::move(allocationGuard) };
3132
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(guards, std::move(gGuard), *newBatch,
32-
std::make_shared<TPlainScanCursor>(lastPK), callbackIdxSubscriver)).second);
33+
std::make_shared<TPlainScanCursor>(lastPK), Context->GetCommonContext(), callbackIdxSubscriver)).second);
3334
} else {
3435
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, nullptr).second);
3536
}

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber
214214
private:
215215
TFetchingScriptCursor Step;
216216
std::shared_ptr<IDataSource> Source;
217+
const NColumnShard::TCounterGuard Guard;
217218
virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override {
218219
AFL_VERIFY(!result.HasErrors());
219220
AFL_VERIFY(result.GetPortions().size() == 1)("count", result.GetPortions().size());
@@ -226,10 +227,10 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber
226227
public:
227228
TPortionAccessorFetchingSubscriber(const TFetchingScriptCursor& step, const std::shared_ptr<IDataSource>& source)
228229
: Step(step)
229-
, Source(source) {
230+
, Source(source)
231+
, Guard(Source->GetContext()->GetCommonContext()->GetCounters().GetFetcherAcessorsGuard()) {
230232
}
231233
};
232-
233234
} // namespace
234235

235236
bool TPortionDataSource::DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) {

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,14 @@ bool TBlobsFetcherTask::DoOnError(const TString& storageId, const TBlobRange& ra
1919
return false;
2020
}
2121

22+
TBlobsFetcherTask::TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions,
23+
const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr<TSpecialReadContext>& context,
24+
const TString& taskCustomer, const TString& externalTaskId)
25+
: TBase(readActions, taskCustomer, externalTaskId)
26+
, Source(sourcePtr)
27+
, Step(step)
28+
, Context(context)
29+
, Guard(Context->GetCommonContext()->GetCounters().GetFetchBlobsGuard()) {
30+
}
31+
2232
}

0 commit comments

Comments
 (0)