4040#include < util/generic/hash.h>
4141#include < util/system/compiler.h>
4242
43- #define SINK_LOG_T (s ) \
43+ #define SOURCE_LOG_T (s ) \
4444 LOG_TRACE_S (*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
45- #define SINK_LOG_D (s ) \
45+ #define SOURCE_LOG_D (s ) \
4646 LOG_DEBUG_S (*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
47- #define SINK_LOG_I (s ) \
47+ #define SOURCE_LOG_I (s ) \
4848 LOG_INFO_S (*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
49- #define SINK_LOG_W (s ) \
49+ #define SOURCE_LOG_W (s ) \
5050 LOG_WARN_S (*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
51- #define SINK_LOG_N (s ) \
51+ #define SOURCE_LOG_N (s ) \
5252 LOG_NOTICE_S (*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
53- #define SINK_LOG_E (s ) \
53+ #define SOURCE_LOG_E (s ) \
5454 LOG_ERROR_S (*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
55- #define SINK_LOG_C (s ) \
55+ #define SOURCE_LOG_C (s ) \
5656 LOG_CRIT_S (*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
57- #define SINK_LOG (prio, s ) \
57+ #define SOURCE_LOG (prio, s ) \
5858 LOG_LOG_S (*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
5959
6060namespace NYql::NDq {
@@ -87,9 +87,6 @@ auto RetryPolicy = NYql::NDq::THttpSenderRetryPolicy::GetExponentialBackoffPolic
8787 return ERetryErrorClass::ShortRetry;
8888 });
8989
90- } // namespace
91-
92-
9390class TDqSolomonReadActor : public NActors ::TActorBootstrapped<TDqSolomonReadActor>, public IDqComputeActorAsyncInput {
9491public:
9592 static constexpr char ActorName[] = " DQ_SOLOMON_READ_ACTOR" ;
@@ -117,21 +114,21 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
117114 , MaxInflightDataRequests(maxInflightDataRequests)
118115 , MetricsQueueActor(metricsQueueActor)
119116 , CredentialsProvider(credentialsProvider)
117+ , SolomonClient(NSo::ISolomonAccessorClient::Make(readParams.Source, credentialsProvider))
120118 {
121119 Y_UNUSED (counters);
122- SINK_LOG_D (" Init" );
120+ SOURCE_LOG_D (" Init" );
123121 IngressStats.Level = statsLevel;
124122
125- SolomonClient = NSo::ISolomonAccessorClient::Make (ReadParams.Source , CredentialsProvider);
126123 UseMetricsQueue = ReadParams.Source .HasSelectors ();
127124
128125 auto stringType = ProgramBuilder.NewDataType (NYql::NUdf::TDataType<char *>::Id);
129126 DictType = ProgramBuilder.NewDictType (stringType, stringType, false );
130127
131- FillSystemColumnPositionindex ();
128+ FillSystemColumnPositionIndex ();
132129 }
133130
134- void FillSystemColumnPositionindex () {
131+ void FillSystemColumnPositionIndex () {
135132 std::vector<TString> names (ReadParams.Source .GetSystemColumns ().begin (), ReadParams.Source .GetSystemColumns ().end ());
136133 names.insert (names.end (), ReadParams.Source .GetLabelNames ().begin (), ReadParams.Source .GetLabelNames ().end ());
137134 std::sort (names.begin (), names.end ());
@@ -172,21 +169,21 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
172169 auto & batch = metricsBatch->Get ()->Record ;
173170 IsMetricsQueueEmpty = batch.GetNoMoreMetrics ();
174171 if (IsMetricsQueueEmpty && !IsConfirmedMetricsQueueFinish) {
175- SINK_LOG_D (" HandleMetricsBatch MetricsQueue empty, sending finish confirmation" );
172+ SOURCE_LOG_D (" HandleMetricsBatch MetricsQueue empty, sending finish confirmation" );
176173 RequestMetrics ();
177174 IsConfirmedMetricsQueueFinish = true ;
178175 }
179176
180177 auto & metrics = batch.GetMetrics ();
181178
182- SINK_LOG_D (" HandleMetricsBatch batch of size " << metrics.size ());
179+ SOURCE_LOG_D (" HandleMetricsBatch batch of size " << metrics.size ());
183180 Metrics.insert (Metrics.end (), metrics.begin (), metrics.end ());
184181 ListedMetrics += metrics.size ();
185182
186183 while (TryRequestData ()) {}
187184
188185 if (LastMetricProcessed ()) {
189- Send (ComputeActorId, new TEvNewAsyncInputDataArrived (InputIndex) );
186+ NotifyComputeActorWithData ( );
190187 }
191188 }
192189
@@ -197,13 +194,13 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
197194
198195 IsMetricsQueueEmpty = true ;
199196 if (!IsConfirmedMetricsQueueFinish) {
200- SINK_LOG_D (" HandleMetricsReadError sending finish confirmation to MetricsQueue" );
197+ SOURCE_LOG_D (" HandleMetricsReadError sending finish confirmation to MetricsQueue" );
201198 RequestMetrics ();
202199 IsConfirmedMetricsQueueFinish = true ;
203200 }
204201
205202 TIssues issues { TIssue (metricsReadError->Get ()->Record .GetIssues ()) };
206- SINK_LOG_W (" Got " << " error response[" << metricsReadError->Cookie << " ] from solomon: " << issues.ToOneLineString ());
203+ SOURCE_LOG_W (" Got " << " error response[" << metricsReadError->Cookie << " ] from solomon: " << issues.ToOneLineString ());
207204 Send (ComputeActorId, new TEvAsyncInputError (InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
208205 return ;
209206 }
@@ -214,35 +211,35 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
214211
215212 if (!batch.Success ) {
216213 TIssues issues { TIssue (batch.ErrorMsg ) };
217- SINK_LOG_W (" Got " << " error response[" << newDataBatch->Cookie << " ] from solomon: " << issues.ToOneLineString ());
214+ SOURCE_LOG_W (" Got " << " error response[" << newDataBatch->Cookie << " ] from solomon: " << issues.ToOneLineString ());
218215 Send (ComputeActorId, new TEvAsyncInputError (InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
219216 return ;
220217 }
221218
222- SINK_LOG_D (" HandleNewDataBatch new data batch" );
219+ SOURCE_LOG_D (" HandleNewDataBatch new data batch" );
223220 MetricsData.insert (MetricsData.end (), batch.Result .begin (), batch.Result .end ());
224221 CompletedMetrics += batch.Result .size ();
225222
226223 NotifyComputeActorWithData ();
227224 }
228225
229226 void Handle (const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) {
230- SINK_LOG_D (" Handle MetricsQueue retry" );
227+ SOURCE_LOG_D (" Handle MetricsQueue retry" );
231228 MetricsQueueEvents.Retry ();
232229 }
233230
234231 void Handle (NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
235- SINK_LOG_D (" Handle MetricsQueue disconnected " << ev->Get ()->NodeId );
232+ SOURCE_LOG_D (" Handle MetricsQueue disconnected " << ev->Get ()->NodeId );
236233 MetricsQueueEvents.HandleNodeDisconnected (ev->Get ()->NodeId );
237234 }
238235
239236 void Handle (NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev) {
240- SINK_LOG_D (" Handle MetricsQueue connected " << ev->Get ()->NodeId );
237+ SOURCE_LOG_D (" Handle MetricsQueue connected " << ev->Get ()->NodeId );
241238 MetricsQueueEvents.HandleNodeConnected (ev->Get ()->NodeId );
242239 }
243240
244241 void Handle (NActors::TEvents::TEvUndelivered::TPtr& ev) {
245- SINK_LOG_D (" Handle MetricsQueue undelivered " );
242+ SOURCE_LOG_D (" Handle MetricsQueue undelivered" );
246243 if (MetricsQueueEvents.HandleUndelivered (ev) != NYql::NDq::TRetryEventsQueue::ESessionState::WrongSession) {
247244 TIssues issues{TIssue{TStringBuilder () << " MetricsQueue was lost" }};
248245 Send (ComputeActorId, new TEvAsyncInputError (InputIndex, issues, NYql::NDqProto::StatusIds::UNAVAILABLE));
@@ -252,7 +249,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
252249 i64 GetAsyncInputData (TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool & finished, i64 freeSpace) final {
253250 Y_UNUSED (freeSpace);
254251 YQL_ENSURE (!buffer.IsWide (), " Wide stream is not supported" );
255- SINK_LOG_D (" GetAsyncInputData sending data" );
252+ SOURCE_LOG_D (" GetAsyncInputData sending data" );
256253
257254 for (const auto & data : MetricsData) {
258255 auto & labels = data.Labels ;
@@ -323,7 +320,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
323320private:
324321 // IActor & IDqComputeActorAsyncInput
325322 void PassAway () override { // Is called from Compute Actor
326- SINK_LOG_D (" PassAway" );
323+ SOURCE_LOG_D (" PassAway" );
327324 TActor<TDqSolomonReadActor>::PassAway ();
328325 }
329326
@@ -418,7 +415,6 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
418415 const ui64 MaxInflightDataRequests;
419416
420417 bool UseMetricsQueue;
421- NSo::ISolomonAccessorClient::TPtr SolomonClient;
422418 TRetryEventsQueue MetricsQueueEvents;
423419 NActors::TActorId MetricsQueueActor;
424420 bool IsWaitingMetricsQueueResponse = false ;
@@ -433,11 +429,15 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
433429
434430 TString SourceId;
435431 std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider;
432+ NSo::ISolomonAccessorClient::TPtr SolomonClient;
436433 TType* DictType = nullptr ;
437434 std::vector<size_t > SystemColumnPositionIndex;
438435 THashMap<TString, size_t > Index;
439436};
440437
438+
439+ } // namespace
440+
441441std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolomonReadActor (
442442 NYql::NSo::NProto::TDqSolomonSource&& source,
443443 ui64 inputIndex,
0 commit comments