Skip to content

Commit 3227fd0

Browse files
Merge f670aec into 0a53ed1
2 parents 0a53ed1 + f670aec commit 3227fd0

File tree

1 file changed

+30
-36
lines changed

1 file changed

+30
-36
lines changed

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1406,6 +1406,7 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14061406
THashMap<ui64, std::vector<NOlap::TPortionInfo::TConstPtr>> PortionsByPath;
14071407
std::vector<TPortionConstructorV2> FetchedAccessors;
14081408
const TString Consumer;
1409+
THashMap<NOlap::TPortionAddress, TPortionConstructorV2> Constructors;
14091410

14101411
public:
14111412
TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& fetchCallback,
@@ -1428,13 +1429,42 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14281429
for (auto&& i : PortionsByPath) {
14291430
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("size", i.second.size())("path_id", i.first);
14301431
for (auto&& p : i.second) {
1432+
auto itPortionConstructor = Constructors.find(p->GetAddress());
1433+
if (itPortionConstructor == Constructors.end()) {
1434+
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Key(p->GetPathId(), p->GetPortionId()).Select();
1435+
if (!rowset.IsReady()) {
1436+
reask = true;
1437+
} else {
1438+
TPortionConstructorV2 constructor(p);
1439+
AFL_VERIFY(!rowset.EndOfSet())("path_id", p->GetPathId())("portion_id", p->GetPortionId())(
1440+
"debug", p->DebugString(true));
1441+
NOlap::TColumnChunkLoadContextV2 info(rowset);
1442+
constructor.SetRecords(std::move(info));
1443+
itPortionConstructor = Constructors.emplace(p->GetAddress(), std::move(constructor)).first;
1444+
}
1445+
}
14311446
if (!p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
14321447
continue;
14331448
}
14341449
{
14351450
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
14361451
if (!rowset.IsReady()) {
14371452
reask = true;
1453+
} else {
1454+
std::vector<NOlap::TIndexChunkLoadContext> indexes;
1455+
bool localReask = false;
1456+
while (!localReask && !rowset.EndOfSet()) {
1457+
indexes.emplace_back(NOlap::TIndexChunkLoadContext(rowset, &selector));
1458+
if (!rowset.Next()) {
1459+
reask = true;
1460+
localReask = true;
1461+
}
1462+
}
1463+
if (!localReask) {
1464+
itPortionConstructor->second.SetIndexes(std::move(indexes));
1465+
FetchedAccessors.emplace_back(std::move(itPortionConstructor->second));
1466+
Constructors.erase(itPortionConstructor);
1467+
}
14381468
}
14391469
}
14401470
}
@@ -1443,42 +1473,6 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14431473
return false;
14441474
}
14451475

1446-
for (auto&& i : PortionsByPath) {
1447-
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "processing")("size", i.second.size())(
1448-
"path_id", i.first);
1449-
while (i.second.size()) {
1450-
auto p = i.second.back();
1451-
TPortionConstructorV2 constructor(p);
1452-
{
1453-
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Key(p->GetPathId(), p->GetPortionId()).Select();
1454-
if (!rowset.IsReady()) {
1455-
return false;
1456-
}
1457-
AFL_VERIFY(!rowset.EndOfSet())("path_id", p->GetPathId())("portion_id", p->GetPortionId())("debug", p->DebugString(true));
1458-
NOlap::TColumnChunkLoadContextV2 info(rowset);
1459-
constructor.SetRecords(std::move(info));
1460-
}
1461-
std::vector<NOlap::TIndexChunkLoadContext> indexes;
1462-
if (p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
1463-
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1464-
if (!rowset.IsReady()) {
1465-
return false;
1466-
}
1467-
while (!rowset.EndOfSet()) {
1468-
indexes.emplace_back(NOlap::TIndexChunkLoadContext(rowset, &selector));
1469-
if (!rowset.Next()) {
1470-
return false;
1471-
}
1472-
}
1473-
}
1474-
constructor.SetIndexes(std::move(indexes));
1475-
FetchedAccessors.emplace_back(std::move(constructor));
1476-
i.second.pop_back();
1477-
}
1478-
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished")("size", i.second.size())(
1479-
"path_id", i.first);
1480-
}
1481-
14821476
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished");
14831477
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(std::make_shared<TAccessorsParsingTask>(FetchCallback, std::move(FetchedAccessors)));
14841478
return true;

0 commit comments

Comments
 (0)