Skip to content

Commit 08abadd

Browse files
fix accessors fetching queue processing (#12936)
1 parent bd0e2de commit 08abadd

File tree

5 files changed

+46
-13
lines changed

5 files changed

+46
-13
lines changed

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1764,6 +1764,7 @@ message TColumnShardConfig {
17641764
optional bool AllowNullableColumnsInPK = 29 [default = false];
17651765
optional uint32 RestoreDataOnWriteTimeoutSeconds = 30;
17661766
optional bool UseSlicesFilter = 31 [default = true];
1767+
optional uint32 LimitForPortionsMetadataAsk = 32 [default = 1000];
17671768
}
17681769

17691770
message TSchemeShardConfig {

ydb/core/tx/columnshard/data_accessor/manager.cpp

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
#include "manager.h"
22

3+
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
4+
35
namespace NKikimr::NOlap::NDataAccessorControl {
46

57
void TLocalManager::DrainQueue() {
6-
THashMap<ui64, std::vector<TPortionInfo::TConstPtr>> portionsToAsk;
78
std::optional<ui64> lastPathId;
89
IGranuleDataAccessor* lastDataAccessor = nullptr;
910
ui32 countToFlight = 0;
10-
while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) {
11+
while (PortionsAskInFlight + countToFlight < NYDBTest::TControllers::GetColumnShardController()->GetLimitForPortionsMetadataAsk() &&
12+
PortionsAsk.size()) {
13+
THashMap<ui64, std::vector<TPortionInfo::TConstPtr>> portionsToAsk;
1114
while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) {
12-
if (PortionsAsk.front().GetAbortionFlag() && PortionsAsk.front().GetAbortionFlag()->Val()) {
13-
PortionsAsk.pop_front();
14-
continue;
15-
}
1615
auto p = PortionsAsk.front().ExtractPortion();
1716
PortionsAsk.pop_front();
1817
if (!lastPathId || *lastPathId != p->GetPathId()) {
@@ -24,18 +23,30 @@ void TLocalManager::DrainQueue() {
2423
lastDataAccessor = it->second.get();
2524
}
2625
}
26+
auto it = RequestsByPortion.find(p->GetPortionId());
27+
if (it == RequestsByPortion.end()) {
28+
continue;
29+
}
2730
if (!lastDataAccessor) {
28-
auto it = RequestsByPortion.find(p->GetPortionId());
29-
AFL_VERIFY(it != RequestsByPortion.end());
3031
for (auto&& i : it->second) {
31-
if (!i->IsFetched()) {
32+
if (!i->IsFetched() && !i->IsAborted()) {
3233
i->AddError(p->GetPathId(), "path id absent");
3334
}
3435
}
3536
RequestsByPortion.erase(it);
3637
} else {
37-
portionsToAsk[p->GetPathId()].emplace_back(p);
38-
++countToFlight;
38+
bool toAsk = false;
39+
for (auto&& i : it->second) {
40+
if (!i->IsFetched() && !i->IsAborted()) {
41+
toAsk = true;
42+
}
43+
}
44+
if (!toAsk) {
45+
RequestsByPortion.erase(it);
46+
} else {
47+
portionsToAsk[p->GetPathId()].emplace_back(p);
48+
++countToFlight;
49+
}
3950
}
4051
}
4152
for (auto&& i : portionsToAsk) {
@@ -46,7 +57,7 @@ void TLocalManager::DrainQueue() {
4657
auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId());
4758
AFL_VERIFY(it != RequestsByPortion.end());
4859
for (auto&& i : it->second) {
49-
if (!i->IsFetched()) {
60+
if (!i->IsFetched() && !i->IsAborted()) {
5061
i->AddAccessor(accessor);
5162
}
5263
}
@@ -110,4 +121,4 @@ void TLocalManager::DoAddPortion(const TPortionDataAccessor& accessor) {
110121
DrainQueue();
111122
}
112123

113-
} // namespace NKikimr::NOlap
124+
} // namespace NKikimr::NOlap::NDataAccessorControl

ydb/core/tx/columnshard/data_accessor/request.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,12 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
232232
return result;
233233
}
234234

235+
bool IsAborted() const {
236+
AFL_VERIFY(HasSubscriber());
237+
auto flag = Subscriber->GetAbortionFlag();
238+
return flag && flag->Val();
239+
}
240+
235241
const std::shared_ptr<const TAtomicCounter>& GetAbortionFlag() const {
236242
AFL_VERIFY(HasSubscriber());
237243
return Subscriber->GetAbortionFlag();

ydb/core/tx/columnshard/hooks/abstract/abstract.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ class ICSController {
9292
virtual TDuration DoGetUsedSnapshotLivetime(const TDuration defaultValue) const {
9393
return defaultValue;
9494
}
95+
virtual ui64 DoGetLimitForPortionsMetadataAsk(const ui64 defaultValue) const {
96+
return defaultValue;
97+
}
9598
virtual TDuration DoGetOverridenGCPeriod(const TDuration defaultValue) const {
9699
return defaultValue;
97100
}
@@ -189,6 +192,11 @@ class ICSController {
189192
virtual void OnSelectShardingFilter() {
190193
}
191194

195+
ui64 GetLimitForPortionsMetadataAsk() const {
196+
const ui64 defaultValue = GetConfig().GetLimitForPortionsMetadataAsk();
197+
return DoGetLimitForPortionsMetadataAsk(defaultValue);
198+
}
199+
192200
TDuration GetCompactionActualizationLag() const {
193201
const TDuration defaultValue = TDuration::MilliSeconds(GetConfig().GetCompactionActualizationLagMs());
194202
return DoGetCompactionActualizationLag(defaultValue);

ydb/core/tx/columnshard/hooks/testing/controller.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ class TController: public TReadOnlyController {
2424
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideTasksActualizationLag);
2525
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideMaxReadStaleness);
2626
YDB_ACCESSOR(std::optional<ui64>, OverrideMemoryLimitForPortionReading, 100);
27+
YDB_ACCESSOR(std::optional<ui64>, OverrideLimitForPortionsMetadataAsk, 1);
28+
2729
YDB_ACCESSOR_DEF(std::optional<NKikimrProto::EReplyStatus>, OverrideBlobPutResultOnWriteValue);
2830

2931
EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force;
@@ -135,6 +137,11 @@ class TController: public TReadOnlyController {
135137
protected:
136138
virtual ::NKikimr::NColumnShard::TBlobPutResult::TPtr OverrideBlobPutResultOnCompaction(const ::NKikimr::NColumnShard::TBlobPutResult::TPtr original, const NOlap::TWriteActionsCollection& actions) const override;
137139

140+
virtual ui64 DoGetLimitForPortionsMetadataAsk(const ui64 defaultValue) const override {
141+
return OverrideLimitForPortionsMetadataAsk.value_or(defaultValue);
142+
}
143+
144+
138145
virtual ui64 DoGetMemoryLimitScanPortion(const ui64 defaultValue) const override {
139146
return OverrideMemoryLimitForPortionReading.value_or(defaultValue);
140147
}

0 commit comments

Comments
 (0)