Skip to content

Commit da3bc00

Browse files
fixed bugs and issues + new grpc solomon emulatot for tests
1 parent 66ee811 commit da3bc00

File tree

46 files changed

+346
-260
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+346
-260
lines changed

ydb/core/external_sources/external_source_factory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
8282
},
8383
{
8484
ToString(NYql::EDatabaseType::Solomon),
85-
CreateExternalDataSource(TString{NYql::SolomonProviderName}, {"NONE", "TOKEN"}, {}, hostnamePatternsRegEx)
85+
CreateExternalDataSource(TString{NYql::SolomonProviderName}, {"NONE", "TOKEN"}, {"use_ssl", "grpc_port"}, hostnamePatternsRegEx)
8686
}
8787
});
8888
}

ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ TString GetOrEmpty(const NYql::TCreateObjectSettings& container, const TString&
8585
"use_tls",
8686
"schema", // managed PG, GP
8787
"service_name", // oracle
88-
"folder_id" // logging
88+
"folder_id", // logging
89+
"use_ssl", // solomon
90+
"grpc_port" // solomon
8991
};
9092

9193
auto& featuresExtractor = settings.GetFeaturesExtractor();

ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped<TDqSolomo
7777
, BatchCountLimit(batchCountLimit)
7878
, ReadParams(std::move(readParams))
7979
, CredentialsProvider(credentialsProvider)
80-
, SolomonClient(NSo::ISolomonAccessorClient::Make(readParams.Source, credentialsProvider))
80+
, SolomonClient(NSo::ISolomonAccessorClient::Make(ReadParams.Source, CredentialsProvider))
8181
{}
8282

8383
void Bootstrap() {
@@ -201,7 +201,7 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped<TDqSolomo
201201
}
202202

203203
void PassAway() override {
204-
LOG_D("TDqSolomonMetricsQueueActor", "PassAway");
204+
LOG_I("TDqSolomonMetricsQueueActor", "PassAway, processed " << ProcessedMetrics << " metrics");
205205
TBase::PassAway();
206206
}
207207

@@ -288,6 +288,7 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped<TDqSolomo
288288
bool handledRequest = true;
289289
while (HasPendingRequests && handledRequest) {
290290
handledRequest = false;
291+
HasPendingRequests = false;
291292

292293
for (auto& [consumer, requests] : PendingRequests) {
293294
if (!CanSendToConsumer(consumer) || (earlyStop && !HasEnoughToSend())) {
@@ -350,11 +351,16 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped<TDqSolomo
350351
while (!Metrics.empty() && result.size() < BatchCountLimit) {
351352
result.push_back(Metrics.back());
352353
Metrics.pop_back();
354+
ProcessedMetrics++;
353355
}
354356

355357
LOG_D("TDqSolomonMetricsQueueActor", "SendMetrics Sending " << result.size() << " metrics to consumer with id " << consumer);
356358
Send(consumer, new TEvSolomonProvider::TEvMetricsBatch(std::move(result), HasNoMoreItems(), transportMeta));
357359

360+
if (HasNoMoreItems()) {
361+
TryFinish(consumer, transportMeta.GetSeqNo());
362+
}
363+
358364
if (!RoundRobinStageFinished) {
359365
if (StartedConsumers.empty()) {
360366
Schedule(RoundRobinStageTimeout, new TEvPrivatePrivate::TEvRoundRobinStageTimeout());
@@ -384,6 +390,7 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped<TDqSolomo
384390

385391
private:
386392
ui64 CurrentPage;
393+
ui64 ProcessedMetrics = 0;
387394
ui64 ConsumersCount;
388395
bool HasMoreMetrics = true;
389396
bool IsRoundRobinFinishScheduled = false;

ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.cpp

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
100100
NKikimr::NMiniKQL::TProgramBuilder& programBuilder,
101101
TDqSolomonReadParams&& readParams,
102102
ui64 maxInflightDataRequests,
103+
ui64 computeActorBatchSize,
103104
NActors::TActorId metricsQueueActor,
104105
const ::NMonitoring::TDynamicCounterPtr& counters,
105106
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider
@@ -112,9 +113,10 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
112113
, LogPrefix(TStringBuilder() << "TxId: " << TxId << ", TDqSolomonReadActor: ")
113114
, ReadParams(std::move(readParams))
114115
, MaxInflightDataRequests(maxInflightDataRequests)
116+
, ComputeActorBatchSize(computeActorBatchSize)
115117
, MetricsQueueActor(metricsQueueActor)
116118
, CredentialsProvider(credentialsProvider)
117-
, SolomonClient(NSo::ISolomonAccessorClient::Make(readParams.Source, credentialsProvider))
119+
, SolomonClient(NSo::ISolomonAccessorClient::Make(ReadParams.Source, CredentialsProvider))
118120
{
119121
Y_UNUSED(counters);
120122
SOURCE_LOG_D("Init");
@@ -206,21 +208,24 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
206208
}
207209

208210
void HandleNewDataBatch(TEvSolomonProvider::TEvNewDataBatch::TPtr& newDataBatch) {
209-
auto& batch = newDataBatch->Get()->Result;
211+
auto& batch = *newDataBatch->Get();
210212
InflightDataRequests--;
211213

212-
if (!batch.Success) {
213-
TIssues issues { TIssue(batch.ErrorMsg) };
214+
if (!batch.Result.Success) {
215+
TIssues issues { TIssue(batch.Result.ErrorMsg) };
214216
SOURCE_LOG_W("Got " << "error response[" << newDataBatch->Cookie << "] from solomon: " << issues.ToOneLineString());
215217
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
216218
return;
217219
}
218220

219-
SOURCE_LOG_D("HandleNewDataBatch new data batch");
220-
MetricsData.insert(MetricsData.end(), batch.Result.begin(), batch.Result.end());
221-
CompletedMetrics += batch.Result.size();
221+
MetricsData.insert(MetricsData.end(), batch.Result.Result.begin(), batch.Result.Result.end());
222+
CompletedMetrics += batch.SelectorsCount;
222223

223-
NotifyComputeActorWithData();
224+
if (!Metrics.empty()) {
225+
while (TryRequestData()) {}
226+
} else if (MetricsData.size() >= ComputeActorBatchSize || LastMetricReceived()) {
227+
NotifyComputeActorWithData();
228+
}
224229
}
225230

226231
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) {
@@ -249,7 +254,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
249254
i64 GetAsyncInputData(TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
250255
Y_UNUSED(freeSpace);
251256
YQL_ENSURE(!buffer.IsWide(), "Wide stream is not supported");
252-
SOURCE_LOG_D("GetAsyncInputData sending data");
257+
SOURCE_LOG_D("GetAsyncInputData sending " << MetricsData.size() << " metrics");
253258

254259
for (const auto& data : MetricsData) {
255260
auto& labels = data.Labels;
@@ -320,27 +325,36 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
320325
private:
321326
// IActor & IDqComputeActorAsyncInput
322327
void PassAway() override { // Is called from Compute Actor
323-
SOURCE_LOG_D("PassAway");
328+
SOURCE_LOG_I("PassAway, processed " << CompletedMetrics << " metrics.");
324329
TActor<TDqSolomonReadActor>::PassAway();
325330
}
326331

327332
private:
328333
TSourceState BuildState() { return {}; }
329334

330335
void NotifyComputeActorWithData() const {
336+
SOURCE_LOG_D("NotifyComputeActorWithData");
331337
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
332338
}
333339

340+
bool LastMetricReceived() const {
341+
if (UseMetricsQueue) {
342+
return IsConfirmedMetricsQueueFinish && CompletedMetrics == ListedMetrics;
343+
} else {
344+
return CompletedMetrics == 1;
345+
}
346+
}
347+
334348
bool LastMetricProcessed() const {
335349
if (UseMetricsQueue) {
336-
return IsConfirmedMetricsQueueFinish && MetricsData.empty() && CompletedMetrics == ListedMetrics;
350+
return IsConfirmedMetricsQueueFinish && CompletedMetrics == ListedMetrics;
337351
} else {
338352
return MetricsData.empty() && CompletedMetrics == 1;
339353
}
340354
}
341355

342356
void TryRequestMetrics() {
343-
if (Metrics.size() < MetricsPerDataQuery * 5 && !IsMetricsQueueEmpty && !IsWaitingMetricsQueueResponse) {
357+
if (Metrics.size() < MetricsPerDataQuery * MaxInflightDataRequests && !IsMetricsQueueEmpty && !IsWaitingMetricsQueueResponse) {
344358
RequestMetrics();
345359
}
346360
}
@@ -379,10 +393,10 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
379393
InflightDataRequests++;
380394

381395
NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem();
382-
getDataFuture.Subscribe([actorSystem, selfId = SelfId()](
396+
getDataFuture.Subscribe([actorSystem, selectorsCount = dataSelectors.size(), selfId = SelfId()](
383397
const NThreading::TFuture<NSo::ISolomonAccessorClient::TGetDataResult>& result) -> void
384398
{
385-
actorSystem->Send(selfId, new TEvSolomonProvider::TEvNewDataBatch(result.GetValue()));
399+
actorSystem->Send(selfId, new TEvSolomonProvider::TEvNewDataBatch(selectorsCount, result.GetValue()));
386400
});
387401
}
388402

@@ -413,6 +427,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
413427
const TString LogPrefix;
414428
const TDqSolomonReadParams ReadParams;
415429
const ui64 MaxInflightDataRequests;
430+
const ui64 ComputeActorBatchSize;
416431

417432
bool UseMetricsQueue;
418433
TRetryEventsQueue MetricsQueueEvents;
@@ -425,7 +440,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
425440
size_t InflightDataRequests = 0;
426441
size_t ListedMetrics = 0;
427442
size_t CompletedMetrics = 0;
428-
const ui64 MetricsPerDataQuery = 25;
443+
const ui64 MetricsPerDataQuery = 15;
429444

430445
TString SourceId;
431446
std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider;
@@ -472,6 +487,11 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom
472487
maxInflightDataRequests = FromString<ui64>(it->second);
473488
}
474489

490+
ui64 computeActorBatchSize = 1;
491+
if (auto it = settings.find("computeActorBatchSize"); it != settings.end()) {
492+
computeActorBatchSize = FromString<ui64>(it->second);
493+
}
494+
475495
auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
476496
auto credentialsProvider = credentialsProviderFactory->CreateProvider();
477497

@@ -484,6 +504,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom
484504
programBuilder,
485505
std::move(params),
486506
maxInflightDataRequests,
507+
computeActorBatchSize,
487508
metricsQueueActor,
488509
counters,
489510
credentialsProvider);

ydb/library/yql/providers/solomon/events/events.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,12 @@ struct TEvSolomonProvider {
5151
};
5252

5353
struct TEvNewDataBatch: public NActors::TEventLocal<TEvNewDataBatch, EvNewDataBatch> {
54+
ui64 SelectorsCount;
5455
NSo::ISolomonAccessorClient::TGetDataResult Result;
55-
TEvNewDataBatch(NSo::ISolomonAccessorClient::TGetDataResult result)
56-
: Result(std::move(result)) {}
56+
TEvNewDataBatch(ui64 selectorsCount, NSo::ISolomonAccessorClient::TGetDataResult result)
57+
: SelectorsCount(selectorsCount)
58+
, Result(std::move(result))
59+
{}
5760
};
5861
};
5962

ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ TSolomonConfiguration::TSolomonConfiguration()
1212
REGISTER_SETTING(*this, MetricsQueueBatchCountLimit);
1313
REGISTER_SETTING(*this, SolomonClientDefaultReplica);
1414
REGISTER_SETTING(*this, MaxInflightDataRequests);
15+
REGISTER_SETTING(*this, ComputeActorBatchSize);
1516
}
1617

1718
TSolomonSettings::TConstPtr TSolomonConfiguration::Snapshot() const {

ydb/library/yql/providers/solomon/provider/yql_solomon_config.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ struct TSolomonSettings {
1616
NCommon::TConfSetting<ui64, false> MetricsQueueBatchCountLimit;
1717
NCommon::TConfSetting<TString, false> SolomonClientDefaultReplica;
1818
NCommon::TConfSetting<ui64, false> MaxInflightDataRequests;
19+
NCommon::TConfSetting<ui64, false> ComputeActorBatchSize;
1920
};
2021

2122
struct TSolomonConfiguration
@@ -39,12 +40,6 @@ struct TSolomonConfiguration
3940
Tokens[cluster.GetName()] = ComposeStructuredTokenJsonForServiceAccount(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature(), authToken);
4041
}
4142

42-
MetricsQueuePageSize = 2000;
43-
MetricsQueuePrefetchSize = 2000;
44-
MetricsQueueBatchCountLimit = 1000;
45-
SolomonClientDefaultReplica = "sas";
46-
MaxInflightDataRequests = 100;
47-
4843
this->SetValidClusters(clusters);
4944

5045
this->Dispatch(config.GetDefaultSettings());

ydb/library/yql/providers/solomon/provider/yql_solomon_datasource.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,14 @@ class TSolomonDataSource : public TDataProviderBase {
4040
cluster.SetName(name);
4141
cluster.SetCluster(properties.Value("location", ""));
4242
cluster.SetToken(token);
43-
cluster.SetUseSsl(true);
43+
cluster.SetUseSsl(properties.Value("use_ssl", "true") == "true"sv);
44+
45+
if (auto value = properties.Value("grpc_port", ""); !value.empty()) {
46+
auto grpcPort = cluster.MutableSettings()->Add();
47+
*grpcPort->MutableName() = "grpcPort";
48+
*grpcPort->MutableValue() = value;
49+
}
50+
4451
State_->Gateway->AddCluster(cluster);
4552

4653
State_->Configuration->AddValidCluster(name);

ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,9 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
8282
{
8383
}
8484

85-
ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings& settings) override {
85+
ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings&) override {
8686
Y_UNUSED(node);
87-
for (size_t i = 0; i < settings.MaxPartitions; ++i) {
88-
partitions.push_back(TStringBuilder() << "partition" << i);
89-
}
87+
partitions.push_back("zz_partition");
9088
return 0;
9189
}
9290

@@ -245,7 +243,7 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
245243
return TSoWrite::Match(&write);
246244
}
247245

248-
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t maxPartitions, TExprContext&) override {
246+
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t, TExprContext&) override {
249247
const TDqSource dqSource(&node);
250248
const auto maybeSettings = dqSource.Settings().Maybe<TSoSourceSettings>();
251249
if (!maybeSettings) {
@@ -264,6 +262,12 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
264262
source.SetUseSsl(clusterDesc->GetUseSsl());
265263
source.SetFrom(TInstant::ParseIso8601(settings.From().StringValue()).Seconds());
266264
source.SetTo(TInstant::ParseIso8601(settings.To().StringValue()).Seconds());
265+
266+
auto& sourceSettings = *source.MutableSettings();
267+
268+
for (const auto& attr : clusterDesc->settings()) {
269+
sourceSettings.insert({ attr.name(), attr.value() });
270+
}
267271

268272
auto selectors = settings.Selectors().StringValue();
269273
if (!selectors.empty()) {
@@ -301,22 +305,24 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
301305
}
302306

303307
auto& solomonSettings = State_->Configuration;
304-
auto& s = *source.MutableSettings();
305308

306-
auto metricsQueuePageSize = solomonSettings->MetricsQueuePageSize.Get();
307-
s.insert({"metricsQueuePageSize", ToString(metricsQueuePageSize)});
309+
auto metricsQueuePageSize = solomonSettings->MetricsQueuePageSize.Get().OrElse(2000);
310+
sourceSettings.insert({"metricsQueuePageSize", ToString(metricsQueuePageSize)});
311+
312+
auto metricsQueuePrefetchSize = solomonSettings->MetricsQueuePrefetchSize.Get().OrElse(2000);
313+
sourceSettings.insert({"metricsQueuePrefetchSize", ToString(metricsQueuePrefetchSize)});
308314

309-
auto metricsQueuePrefetchSize = solomonSettings->MetricsQueuePrefetchSize.Get();
310-
s.insert({"metricsQueuePrefetchSize", ToString(metricsQueuePrefetchSize)});
315+
auto metricsQueueBatchCountLimit = solomonSettings->MetricsQueueBatchCountLimit.Get().OrElse(1000);
316+
sourceSettings.insert({"metricsQueueBatchCountLimit", ToString(metricsQueueBatchCountLimit)});
311317

312-
auto metricsQueueBatchCountLimit = solomonSettings->MetricsQueueBatchCountLimit.Get();
313-
s.insert({"metricsQueueBatchCountLimit", ToString(metricsQueueBatchCountLimit)});
318+
auto solomonClientDefaultReplica = solomonSettings->SolomonClientDefaultReplica.Get().OrElse("sas");
319+
sourceSettings.insert({"solomonClientDefaultReplica", ToString(solomonClientDefaultReplica)});
314320

315-
auto solomonClientDefaultReplica = solomonSettings->SolomonClientDefaultReplica.Get();
316-
s.insert({"solomonClientDefaultReplica", ToString(solomonClientDefaultReplica)});
321+
auto maxInflightDataRequests = solomonSettings->MaxInflightDataRequests.Get().OrElse(100);
322+
sourceSettings.insert({"maxInflightDataRequests", ToString(maxInflightDataRequests)});
317323

318-
auto maxInflightDataRequests = solomonSettings->MaxInflightDataRequests.Get();
319-
s.insert({"maxInflightDataRequests", ToString(maxInflightDataRequests)});
324+
auto computeActorBatchSize = solomonSettings->ComputeActorBatchSize.Get().OrElse(10000);
325+
sourceSettings.insert({"computeActorBatchSize", ToString(computeActorBatchSize)});
320326

321327
if (!selectors.empty()) {
322328
NDq::TDqSolomonReadParams readParams{ .Source = source };
@@ -326,7 +332,7 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
326332

327333
auto metricsQueueActor = NActors::TActivationContext::ActorSystem()->Register(
328334
NDq::CreateSolomonMetricsQueueActor(
329-
maxPartitions,
335+
1,
330336
readParams,
331337
credentialsProvider
332338
),

0 commit comments

Comments
 (0)