@@ -90,8 +90,9 @@ class TInputTransformStreamLookupBase
9090 .MaxKeysInRequest = 1000 // TODO configure me
9191 };
9292 auto guard = Guard (*Alloc);
93- LookupSource = Factory->CreateDqLookupSource (Settings.GetRightSource ().GetProviderName (), std::move (lookupSourceArgs));
94- RegisterWithSameMailbox (LookupSource.second );
93+ auto LookupSource = Factory->CreateDqLookupSource (Settings.GetRightSource ().GetProviderName (), std::move (lookupSourceArgs));
94+ MaxKeysInRequest = LookupSource.first ->GetMaxSupportedKeysInRequest ();
95+ LookupSourceId = RegisterWithSameMailbox (LookupSource.second );
9596 }
9697protected:
9798 virtual NUdf::EFetchStatus FetchWideInputValue (NUdf::TUnboxedValue* inputRowItems) = 0;
@@ -165,7 +166,7 @@ class TInputTransformStreamLookupBase
165166 }
166167
167168 void PassAway () final {
168- Send (LookupSource. second -> SelfId () , new NActors::TEvents::TEvPoison{});
169+ Send (LookupSourceId , new NActors::TEvents::TEvPoison{});
169170 auto guard = BindAllocator ();
170171 // All resources, held by this class, that have been created with mkql allocator, must be deallocated here
171172 KeysForLookup.reset ();
@@ -193,11 +194,10 @@ class TInputTransformStreamLookupBase
193194 NUdf::TUnboxedValue* inputRowItems;
194195 NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder (InputRowType->GetElementsCount (), inputRowItems);
195196 const auto now = std::chrono::steady_clock::now ();
196- const auto maxKeysInRequest = LookupSource.first ->GetMaxSupportedKeysInRequest ();
197- KeysForLookup = std::make_shared<IDqAsyncLookupSource::TUnboxedValueMap>(maxKeysInRequest, KeyTypeHelper->GetValueHash (), KeyTypeHelper->GetValueEqual ());
197+ KeysForLookup = std::make_shared<IDqAsyncLookupSource::TUnboxedValueMap>(MaxKeysInRequest, KeyTypeHelper->GetValueHash (), KeyTypeHelper->GetValueEqual ());
198198 LruCache->Prune (now);
199199 while (
200- (KeysForLookup->size () < maxKeysInRequest ) &&
200+ (KeysForLookup->size () < MaxKeysInRequest ) &&
201201 ((InputFlowFetchStatus = FetchWideInputValue (inputRowItems)) == NUdf::EFetchStatus::Ok)) {
202202 NUdf::TUnboxedValue* keyItems;
203203 NUdf::TUnboxedValue key = HolderFactory.CreateDirectArrayHolder (LookupInputIndexes.size (), keyItems);
@@ -217,7 +217,7 @@ class TInputTransformStreamLookupBase
217217 }
218218 }
219219 if (!KeysForLookup->empty ()) {
220- LookupSource. first -> AsyncLookup ( KeysForLookup);
220+ Send (LookupSourceId, new IDqAsyncLookupSource::TEvLookupRequest ( KeysForLookup) );
221221 } else {
222222 KeysForLookup.reset ();
223223 }
@@ -261,7 +261,8 @@ class TInputTransformStreamLookupBase
261261 IDqAsyncIoFactory::TPtr Factory;
262262 NDqProto::TDqInputTransformLookupSettings Settings;
263263protected:
264- std::pair<IDqAsyncLookupSource*, NActors::IActor*> LookupSource;
264+ NActors::TActorId LookupSourceId;
265+ size_t MaxKeysInRequest;
265266 const TVector<size_t > LookupInputIndexes;
266267 const TVector<size_t > OtherInputIndexes;
267268 const NMiniKQL::TMultiType* const InputRowType;
0 commit comments