Skip to content

Commit b0b1457

Browse files
fix states usage for accessors request filling (#12553)
1 parent e7dae09 commit b0b1457

File tree

22 files changed

+109
-46
lines changed

22 files changed

+109
-46
lines changed

ydb/core/tx/columnshard/columnshard__statistics.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ class TColumnPortionsAccumulator {
179179
return;
180180
}
181181
Result->AddWaitingTask();
182-
std::shared_ptr<NOlap::TDataAccessorsRequest> request = std::make_shared<NOlap::TDataAccessorsRequest>();
182+
std::shared_ptr<NOlap::TDataAccessorsRequest> request = std::make_shared<NOlap::TDataAccessorsRequest>("STATISTICS_FLUSH");
183183
for (auto&& i : Portions) {
184184
request->AddPortion(i);
185185
}
@@ -227,7 +227,7 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
227227
columnTagsRequested = std::set<ui32>(allColumnIds.begin(), allColumnIds.end());
228228
}
229229

230-
NOlap::TDataAccessorsRequest request;
230+
NOlap::TDataAccessorsRequest request("STATISTICS");
231231
std::shared_ptr<TResultAccumulator> resultAccumulator =
232232
std::make_shared<TResultAccumulator>(columnTagsRequested, ev->Sender, ev->Cookie, std::move(response));
233233
auto versionedIndex = std::make_shared<NOlap::TVersionedIndex>(index.GetVersionedIndex());

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,12 +1405,15 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14051405
std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback> FetchCallback;
14061406
THashMap<ui64, std::vector<NOlap::TPortionInfo::TConstPtr>> PortionsByPath;
14071407
std::vector<TPortionConstructorV2> FetchedAccessors;
1408+
const TString Consumer;
14081409

14091410
public:
14101411
TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& fetchCallback,
1411-
THashMap<ui64, NOlap::TPortionInfo::TConstPtr>&& portions)
1412+
THashMap<ui64, NOlap::TPortionInfo::TConstPtr>&& portions, const TString& consumer)
14121413
: TBase(self)
1413-
, FetchCallback(fetchCallback) {
1414+
, FetchCallback(fetchCallback)
1415+
, Consumer(consumer)
1416+
{
14141417
for (auto&& i : portions) {
14151418
PortionsByPath[i.second->GetPathId()].emplace_back(i.second);
14161419
}
@@ -1421,8 +1424,9 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14211424

14221425
TBlobGroupSelector selector(Self->Info());
14231426
bool reask = false;
1427+
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("consumer", Consumer)("event", "TTxAskPortionChunks::Execute");
14241428
for (auto&& i : PortionsByPath) {
1425-
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("size", i.second.size())("path_id", i.first);
1429+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("size", i.second.size())("path_id", i.first);
14261430
for (auto&& p : i.second) {
14271431
if (!p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
14281432
continue;
@@ -1440,7 +1444,8 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14401444
}
14411445

14421446
for (auto&& i : PortionsByPath) {
1443-
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "processing")("size", i.second.size())("path_id", i.first);
1447+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "processing")("size", i.second.size())(
1448+
"path_id", i.first);
14441449
while (i.second.size()) {
14451450
auto p = i.second.back();
14461451
TPortionConstructorV2 constructor(p);
@@ -1470,11 +1475,11 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14701475
FetchedAccessors.emplace_back(std::move(constructor));
14711476
i.second.pop_back();
14721477
}
1473-
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished")("size", i.second.size())(
1478+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished")("size", i.second.size())(
14741479
"path_id", i.first);
14751480
}
14761481

1477-
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished");
1482+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished");
14781483
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(std::make_shared<TAccessorsParsingTask>(FetchCallback, std::move(FetchedAccessors)));
14791484
return true;
14801485
}
@@ -1486,7 +1491,7 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14861491
};
14871492

14881493
void TColumnShard::Handle(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors::TPtr& ev, const TActorContext& /*ctx*/) {
1489-
Execute(new TTxAskPortionChunks(this, ev->Get()->GetCallback(), std::move(ev->Get()->MutablePortions())));
1494+
Execute(new TTxAskPortionChunks(this, ev->Get()->GetCallback(), std::move(ev->Get()->MutablePortions()), ev->Get()->GetConsumer()));
14901495
}
14911496

14921497
void TColumnShard::Handle(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator::TPtr& ev, const TActorContext& ctx) {

ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
namespace NKikimr::NOlap::NDataAccessorControl {
77

88
THashMap<ui64, TPortionDataAccessor> IGranuleDataAccessor::AskData(
9-
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback) {
9+
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) {
1010
AFL_VERIFY(portions.size());
11-
return DoAskData(portions, callback);
11+
return DoAskData(portions, callback, consumer);
1212
}
1313

1414
void TActorAccessorsCallback::OnAccessorsFetched(std::vector<TPortionDataAccessor>&& accessors) {

ydb/core/tx/columnshard/data_accessor/abstract/collector.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class IGranuleDataAccessor {
2525
const ui64 PathId;
2626

2727
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(
28-
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback) = 0;
28+
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) = 0;
2929
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) = 0;
3030

3131
public:
@@ -39,7 +39,8 @@ class IGranuleDataAccessor {
3939
: PathId(pathId) {
4040
}
4141

42-
THashMap<ui64, TPortionDataAccessor> AskData(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback);
42+
THashMap<ui64, TPortionDataAccessor> AskData(
43+
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer);
4344
void ModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) {
4445
return DoModifyPortions(add, remove);
4546
}

ydb/core/tx/columnshard/data_accessor/events.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,14 @@ class TEvAskTabletDataAccessors: public NActors::TEventLocal<TEvAskTabletDataAcc
8080
using TPortions = THashMap<ui64, TPortionInfo::TConstPtr>;
8181
YDB_ACCESSOR_DEF(TPortions, Portions);
8282
YDB_READONLY_DEF(std::shared_ptr<NDataAccessorControl::IAccessorCallback>, Callback);
83+
YDB_READONLY_DEF(TString, Consumer);
8384

8485
public:
85-
explicit TEvAskTabletDataAccessors(
86-
const THashMap<ui64, TPortionInfo::TConstPtr>& portions, const std::shared_ptr<NDataAccessorControl::IAccessorCallback>& callback)
86+
explicit TEvAskTabletDataAccessors(const THashMap<ui64, TPortionInfo::TConstPtr>& portions,
87+
const std::shared_ptr<NDataAccessorControl::IAccessorCallback>& callback, const TString& consumer)
8788
: Portions(portions)
88-
, Callback(callback) {
89+
, Callback(callback)
90+
, Consumer(consumer) {
8991
}
9092
};
9193

ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
namespace NKikimr::NOlap::NDataAccessorControl::NInMem {
44

55
THashMap<ui64, TPortionDataAccessor> TCollector::DoAskData(
6-
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& /*callback*/) {
6+
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& /*callback*/, const TString& /*consumer*/) {
77
THashMap<ui64, TPortionDataAccessor> accessors;
88
for (auto&& i : portions) {
99
auto it = Accessors.find(i->GetPortionId());

ydb/core/tx/columnshard/data_accessor/in_mem/collector.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ class TCollector: public IGranuleDataAccessor {
77
using TBase = IGranuleDataAccessor;
88
THashMap<ui64, TPortionDataAccessor> Accessors;
99
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(
10-
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback) override;
10+
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) override;
1111
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add,
1212
const std::vector<ui64>& remove) override;
1313

ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
namespace NKikimr::NOlap::NDataAccessorControl::NLocalDB {
55

66
THashMap<ui64, TPortionDataAccessor> TCollector::DoAskData(
7-
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback) {
7+
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) {
88
THashMap<ui64, TPortionDataAccessor> accessors;
99
THashMap<ui64, TPortionInfo::TConstPtr> portionsToDirectAsk;
1010
for (auto&& p : portions) {
@@ -17,7 +17,7 @@ THashMap<ui64, TPortionDataAccessor> TCollector::DoAskData(
1717
}
1818
if (portionsToDirectAsk.size()) {
1919
NActors::TActivationContext::Send(
20-
TabletActorId, std::make_unique<NDataAccessorControl::TEvAskTabletDataAccessors>(portionsToDirectAsk, callback));
20+
TabletActorId, std::make_unique<NDataAccessorControl::TEvAskTabletDataAccessors>(portionsToDirectAsk, callback, consumer));
2121
}
2222
return accessors;
2323
}

ydb/core/tx/columnshard/data_accessor/local_db/collector.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ class TCollector: public IGranuleDataAccessor {
1717

1818
TLRUCache<ui64, TPortionDataAccessor, TNoopDelete, TMetadataSizeProvider> AccessorsCache;
1919
using TBase = IGranuleDataAccessor;
20-
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(
21-
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback) override;
20+
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(const std::vector<TPortionInfo::TConstPtr>& portions,
21+
const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) override;
2222
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) override;
2323

2424
public:

ydb/core/tx/columnshard/data_accessor/manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class TLocalManager: public IDataAccessorsManager {
115115
if (portionsAsk.empty()) {
116116
continue;
117117
}
118-
auto accessors = it->second->AskData(portionsAsk, AccessorCallback);
118+
auto accessors = it->second->AskData(portionsAsk, AccessorCallback, request->GetConsumer());
119119
for (auto&& p : portionsAsk) {
120120
auto itAccessor = accessors.find(p->GetPortionId());
121121
if (itAccessor == accessors.end()) {

0 commit comments

Comments
 (0)