@@ -1256,12 +1256,72 @@ void TColumnShard::Handle(NOlap::NDataSharing::NEvents::TEvFinishedFromSource::T
12561256 }
12571257};
12581258
1259+ class TPortionConstructorV2 {
1260+ private:
1261+ NOlap::TPortionInfo::TConstPtr PortionInfo;
1262+ std::optional<NOlap::TColumnChunkLoadContextV2> Records;
1263+ std::optional<std::vector<NOlap::TIndexChunkLoadContext>> Indexes;
1264+
1265+ public:
1266+ TPortionConstructorV2 (const NOlap::TPortionInfo::TConstPtr& portionInfo)
1267+ : PortionInfo(portionInfo) {
1268+ }
1269+
1270+ void SetRecords (NOlap::TColumnChunkLoadContextV2&& records) {
1271+ AFL_VERIFY (!Records);
1272+ Records = std::move (records);
1273+ }
1274+
1275+ void SetIndexes (std::vector<NOlap::TIndexChunkLoadContext>&& indexes) {
1276+ AFL_VERIFY (!Indexes);
1277+ Indexes = std::move (indexes);
1278+ }
1279+
1280+ NOlap::TPortionDataAccessor BuildAccessor () {
1281+ AFL_VERIFY (PortionInfo && Records && Indexes);
1282+ std::vector<NOlap::TColumnChunkLoadContextV1> records = Records->BuildRecordsV1 ();
1283+ return NOlap::TPortionAccessorConstructor::BuildForLoading (std::move (PortionInfo), std::move (records), std::move (*Indexes));
1284+ }
1285+ };
1286+
1287+ class TAccessorsParsingTask : public NConveyor ::ITask {
1288+ private:
1289+ std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback> FetchCallback;
1290+ std::vector<TPortionConstructorV2> Portions;
1291+
1292+ virtual TConclusionStatus DoExecute (const std::shared_ptr<ITask>& /* taskPtr*/ ) override {
1293+ std::vector<NOlap::TPortionDataAccessor> accessors;
1294+ accessors.reserve (Portions.size ());
1295+ for (auto && i : Portions) {
1296+ accessors.emplace_back (i.BuildAccessor ());
1297+ }
1298+ FetchCallback->OnAccessorsFetched (std::move (accessors));
1299+ return TConclusionStatus::Success ();
1300+ }
1301+ virtual void DoOnCannotExecute (const TString& reason) override {
1302+ AFL_VERIFY (false )(" cannot parse metadata" , reason);
1303+ }
1304+
1305+ public:
1306+ virtual TString GetTaskClassIdentifier () const override {
1307+ return " ASKED_METADATA_PARSER" ;
1308+ }
1309+
1310+ TAccessorsParsingTask (
1311+ const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& callback, std::vector<TPortionConstructorV2>&& portions)
1312+ : FetchCallback(callback)
1313+ , Portions(std::move(portions))
1314+ {
1315+
1316+ }
1317+ };
1318+
12591319class TTxAskPortionChunks : public TTransactionBase <TColumnShard> {
12601320private:
12611321 using TBase = TTransactionBase<TColumnShard>;
12621322 std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback> FetchCallback;
12631323 THashMap<ui64, std::vector<NOlap::TPortionInfo::TConstPtr>> PortionsByPath;
1264- std::vector<NOlap::TPortionDataAccessor > FetchedAccessors;
1324+ std::vector<TPortionConstructorV2 > FetchedAccessors;
12651325
12661326public:
12671327 TTxAskPortionChunks (TColumnShard* self, const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& fetchCallback,
@@ -1275,6 +1335,7 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
12751335
12761336 bool Execute (TTransactionContext& txc, const TActorContext& /* ctx*/ ) override {
12771337 NIceDb::TNiceDb db (txc.DB );
1338+
12781339 TBlobGroupSelector selector (Self->Info ());
12791340 bool reask = false ;
12801341 for (auto && i : PortionsByPath) {
@@ -1302,21 +1363,22 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
13021363 AFL_INFO (NKikimrServices::TX_COLUMNSHARD)(" event" , " TTxAskPortionChunks::Execute" )(" stage" , " processing" )(" size" , i.second .size ())(" path_id" , i.first );
13031364 while (i.second .size ()) {
13041365 auto p = i.second .back ();
1305- std::vector<NOlap::TColumnChunkLoadContextV1> records;
1306- std::vector<NOlap::TIndexChunkLoadContext> indexes;
1366+ TPortionConstructorV2 constructor (p);
13071367 {
13081368 auto rowset = db.Table <NColumnShard::Schema::IndexColumnsV2>().Prefix (p->GetPathId (), p->GetPortionId ()).Select ();
13091369 if (!rowset.IsReady ()) {
13101370 return false ;
13111371 }
13121372 while (!rowset.EndOfSet ()) {
1313- NOlap::TColumnChunkLoadContextV1::BuildFromDBV2 (rowset, records);
1373+ NOlap::TColumnChunkLoadContextV2 info (rowset);
1374+ constructor.SetRecords (std::move (info));
13141375 if (!rowset.Next ()) {
13151376 return false ;
13161377 }
13171378 }
13181379 }
13191380 {
1381+ std::vector<NOlap::TIndexChunkLoadContext> indexes;
13201382 auto rowset = db.Table <NColumnShard::Schema::IndexIndexes>().Prefix (p->GetPathId (), p->GetPortionId ()).Select ();
13211383 if (!rowset.IsReady ()) {
13221384 return false ;
@@ -1327,16 +1389,17 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
13271389 return false ;
13281390 }
13291391 }
1392+ constructor.SetIndexes (std::move (indexes));
13301393 }
1331- FetchedAccessors.emplace_back (NOlap::TPortionAccessorConstructor::BuildForLoading (p, std::move (records), std::move (indexes) ));
1394+ FetchedAccessors.emplace_back (std::move (constructor ));
13321395 i.second .pop_back ();
13331396 }
13341397 AFL_INFO (NKikimrServices::TX_COLUMNSHARD)(" event" , " TTxAskPortionChunks::Execute" )(" stage" , " finished" )(" size" , i.second .size ())(
13351398 " path_id" , i.first );
13361399 }
13371400
13381401 AFL_INFO (NKikimrServices::TX_COLUMNSHARD)(" event" , " TTxAskPortionChunks::Execute" )(" stage" , " finished" );
1339- FetchCallback-> OnAccessorsFetched ( std::move (FetchedAccessors));
1402+ NConveyor::TInsertServiceOperator::AsyncTaskToExecute (std::make_shared<TAccessorsParsingTask>(FetchCallback, std::move (FetchedAccessors) ));
13401403 return true ;
13411404 }
13421405 void Complete (const TActorContext& /* ctx*/ ) override {
0 commit comments