@@ -16,16 +16,16 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
1616private:
1717 THashMap<NKikimr::NOlap::TBlobRange, TString> Blobs;
1818 TDataContainer Portions;
19- THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
19+ std::shared_ptr< THashMap<ui64, ISnapshotSchema::TPtr> > Schemas;
2020 TNormalizationContext NormContext;
2121protected:
2222 virtual bool DoExecute () override {
23- Y_ABORT_UNLESS (!Schemas. empty ());
24- auto pkColumnIds = Schemas. begin ()->second ->GetPkColumnsIds ();
23+ Y_ABORT_UNLESS (!Schemas-> empty ());
24+ auto pkColumnIds = Schemas-> begin ()->second ->GetPkColumnsIds ();
2525 pkColumnIds.insert (TIndexInfo::GetSpecialColumnIds ().begin (), TIndexInfo::GetSpecialColumnIds ().end ());
2626
2727 for (auto && portionInfo : Portions) {
28- auto blobSchema = Schemas. FindPtr (portionInfo->GetPortionId ());
28+ auto blobSchema = Schemas-> FindPtr (portionInfo->GetPortionId ());
2929 THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> blobsDataAssemble;
3030 for (auto && i : portionInfo->Records ) {
3131 auto blobIt = Blobs.find (i.BlobRange );
@@ -47,10 +47,10 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
4747 }
4848
4949public:
50- TMinMaxSnapshotChangesTask (THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
50+ TMinMaxSnapshotChangesTask (THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, std::shared_ptr< THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
5151 : Blobs(std::move(blobs))
5252 , Portions(std::move(portions))
53- , Schemas(std::move( schemas) )
53+ , Schemas(schemas)
5454 , NormContext(nCtx)
5555 {}
5656
@@ -135,7 +135,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
135135 }
136136
137137 THashMap<ui64, std::shared_ptr<TPortionInfo>> portions;
138- THashMap<ui64, ISnapshotSchema::TPtr> schemas ;
138+ auto schemas = std::make_shared< THashMap<ui64, ISnapshotSchema::TPtr>>() ;
139139 auto pkColumnIds = TMinMaxSnapshotChangesTask::GetColumnsFilter (tablesManager.GetPrimaryIndexSafe ().GetVersionedIndex ().GetLastSchema ());
140140
141141 {
@@ -161,7 +161,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
161161 auto portionMeta = loadContext.GetPortionMeta ();
162162 if (it == portions.end ()) {
163163 Y_ABORT_UNLESS (portion.Records .empty ());
164- schemas[portion.GetPortionId ()] = currentSchema;
164+ (* schemas) [portion.GetPortionId ()] = currentSchema;
165165 auto portionNew = std::make_shared<TPortionInfo>(portion);
166166 portionNew->AddRecord (currentSchema->GetIndexInfo (), rec, portionMeta);
167167 it = portions.emplace (portion.GetPortion (), portionNew).first ;
@@ -202,7 +202,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
202202 }
203203 ++brokenPortioncCount;
204204 package.emplace_back (portion.second );
205- if (package.size () == 100 ) {
205+ if (package.size () == 1000 ) {
206206 std::vector<std::shared_ptr<TPortionInfo>> local;
207207 local.swap (package);
208208 tasks.emplace_back (std::make_shared<TPortionsNormalizerTask<TMinMaxSnapshotChangesTask>>(std::move (local), schemas));
0 commit comments