Skip to content

Commit 79ae444

Browse files
ivanmorozov333iddqdex
authored andcommitted
fix tx ask hard processing (ydb-platform#12648)
1 parent 8566ddd commit 79ae444

File tree

1 file changed

+48
-37
lines changed

1 file changed

+48
-37
lines changed

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,6 +1348,18 @@ class TPortionConstructorV2 {
13481348
: PortionInfo(portionInfo) {
13491349
}
13501350

1351+
bool IsReady() const {
1352+
return HasRecords() && HasIndexes();
1353+
}
1354+
1355+
bool HasRecords() const {
1356+
return !!Records;
1357+
}
1358+
1359+
bool HasIndexes() const {
1360+
return !!Indexes;
1361+
}
1362+
13511363
void SetRecords(NOlap::TColumnChunkLoadContextV2&& records) {
13521364
AFL_VERIFY(!Records);
13531365
Records = std::move(records);
@@ -1406,6 +1418,7 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14061418
THashMap<ui64, std::vector<NOlap::TPortionInfo::TConstPtr>> PortionsByPath;
14071419
std::vector<TPortionConstructorV2> FetchedAccessors;
14081420
const TString Consumer;
1421+
THashMap<NOlap::TPortionAddress, TPortionConstructorV2> Constructors;
14091422

14101423
public:
14111424
TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& fetchCallback,
@@ -1428,13 +1441,43 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14281441
for (auto&& i : PortionsByPath) {
14291442
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("size", i.second.size())("path_id", i.first);
14301443
for (auto&& p : i.second) {
1431-
if (!p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
1444+
auto itPortionConstructor = Constructors.find(p->GetAddress());
1445+
if (itPortionConstructor == Constructors.end()) {
1446+
TPortionConstructorV2 constructor(p);
1447+
itPortionConstructor = Constructors.emplace(p->GetAddress(), std::move(constructor)).first;
1448+
} else if (itPortionConstructor->second.IsReady()) {
14321449
continue;
14331450
}
1434-
{
1435-
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1451+
if (!itPortionConstructor->second.HasRecords()) {
1452+
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Key(p->GetPathId(), p->GetPortionId()).Select();
14361453
if (!rowset.IsReady()) {
14371454
reask = true;
1455+
} else {
1456+
AFL_VERIFY(!rowset.EndOfSet())("path_id", p->GetPathId())("portion_id", p->GetPortionId())(
1457+
"debug", p->DebugString(true));
1458+
NOlap::TColumnChunkLoadContextV2 info(rowset);
1459+
itPortionConstructor->second.SetRecords(std::move(info));
1460+
}
1461+
}
1462+
if (!itPortionConstructor->second.HasIndexes()) {
1463+
if (!p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
1464+
itPortionConstructor->second.SetIndexes({});
1465+
} else {
1466+
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1467+
if (!rowset.IsReady()) {
1468+
reask = true;
1469+
} else {
1470+
std::vector<NOlap::TIndexChunkLoadContext> indexes;
1471+
bool localReask = false;
1472+
while (!localReask && !rowset.EndOfSet()) {
1473+
indexes.emplace_back(NOlap::TIndexChunkLoadContext(rowset, &selector));
1474+
if (!rowset.Next()) {
1475+
reask = true;
1476+
localReask = true;
1477+
}
1478+
}
1479+
itPortionConstructor->second.SetIndexes(std::move(indexes));
1480+
}
14381481
}
14391482
}
14401483
}
@@ -1443,40 +1486,8 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14431486
return false;
14441487
}
14451488

1446-
for (auto&& i : PortionsByPath) {
1447-
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "processing")("size", i.second.size())(
1448-
"path_id", i.first);
1449-
while (i.second.size()) {
1450-
auto p = i.second.back();
1451-
TPortionConstructorV2 constructor(p);
1452-
{
1453-
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Key(p->GetPathId(), p->GetPortionId()).Select();
1454-
if (!rowset.IsReady()) {
1455-
return false;
1456-
}
1457-
AFL_VERIFY(!rowset.EndOfSet())("path_id", p->GetPathId())("portion_id", p->GetPortionId())("debug", p->DebugString(true));
1458-
NOlap::TColumnChunkLoadContextV2 info(rowset);
1459-
constructor.SetRecords(std::move(info));
1460-
}
1461-
std::vector<NOlap::TIndexChunkLoadContext> indexes;
1462-
if (p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
1463-
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1464-
if (!rowset.IsReady()) {
1465-
return false;
1466-
}
1467-
while (!rowset.EndOfSet()) {
1468-
indexes.emplace_back(NOlap::TIndexChunkLoadContext(rowset, &selector));
1469-
if (!rowset.Next()) {
1470-
return false;
1471-
}
1472-
}
1473-
}
1474-
constructor.SetIndexes(std::move(indexes));
1475-
FetchedAccessors.emplace_back(std::move(constructor));
1476-
i.second.pop_back();
1477-
}
1478-
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished")("size", i.second.size())(
1479-
"path_id", i.first);
1489+
for (auto&& i : Constructors) {
1490+
FetchedAccessors.emplace_back(std::move(i.second));
14801491
}
14811492

14821493
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished");

0 commit comments

Comments
 (0)