Skip to content

Commit f756c0f

Browse files
EgorkaZCyberROFLkruallshmel1kgridnevvvit
authored
Fq gather stable 24-1 (#3517)
Co-authored-by: Ilnaz Nizametdinov <ilnaz@ydb.tech> Co-authored-by: kruall <kruall@ydb.tech> Co-authored-by: Alexander Petrukhin <shmel1k@ydb.tech> Co-authored-by: Vitalii Gridnev <gridnevvvit@gmail.com> Co-authored-by: vporyadke <zalyalov@ydb.tech> Co-authored-by: Nikolay Shestakov <tesseract@ydb.tech> Co-authored-by: niksaveliev <nik@saveliev.me> Co-authored-by: Vitaly Isaev <vitalyisaev@ydb.tech> Co-authored-by: uzhastik <uzhas@ydb.tech> Co-authored-by: Pisarenko Grigoriy <79596613+GrigoriyPA@users.noreply.github.com> Co-authored-by: Yaroslav Plishan <80714170+MetaGigachad@users.noreply.github.com> Co-authored-by: Hor911 <hor911@ydb.tech> Co-authored-by: Dmitry Kardymon <kardymon-d@ydb.tech> Co-authored-by: Oleg Doronin <dorooleg@yandex.ru> Co-authored-by: Ivan Blinkov <ivan@blinkov.ru> Co-authored-by: uzhastik <uzhastik@gmail.com> Co-authored-by: Daniil Cherednik <dcherednik@ydb.tech> Co-authored-by: Andrey Kulaga <aakulaga@ydb.tech>
1 parent 20a513d commit f756c0f

File tree

329 files changed

+7224
-3920
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

329 files changed

+7224
-3920
lines changed

.github/config/muted_ya.txt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
1818
ydb/core/kafka_proxy/ut KafkaProtocol.CreatePartitionsScenario
1919
ydb/core/kafka_proxy/ut KafkaProtocol.ProduceScenario
2020
ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata
21-
ydb/core/kqp/ut/federated_query/generic *
22-
ydb/core/kqp/ut/olap *
21+
ydb/core/kqp/ut/olap KqpOlap.IndexesActualization
22+
ydb/core/kqp/ut/olap KqpOlap.BlobsSharing*
23+
ydb/core/kqp/ut/olap KqpOlap.ScanQueryOltpAndOlap
24+
ydb/core/kqp/ut/olap KqpOlap.StatsUsageWithTTL
25+
ydb/core/kqp/ut/olap KqpOlap.YqlScriptOltpAndOlap
2326
ydb/core/kqp/ut/pg KqpPg.CreateIndex
2427
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
2528
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
@@ -29,7 +32,6 @@ ydb/core/kqp/ut/scheme KqpScheme.QueryWithAlter
2932
ydb/core/kqp/ut/scheme [44/50]*
3033
ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect
3134
ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
32-
ydb/core/kqp/ut/service KqpQueryServiceScripts.ForgetScriptExecutionRace
3335
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
3436
ydb/core/kqp/ut/service [38/50]*
3537
ydb/core/tx/columnshard/ut_schema TColumnShardTestSchema.ForgetAfterFail

ydb/core/base/events.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ struct TKikimrEvents : TEvents {
173173
ES_GRAPH,
174174
ES_REPLICATION_SERVICE,
175175
ES_CHANGE_EXCHANGE,
176+
ES_S3_FILE_QUEUE,
176177
};
177178
};
178179

ydb/core/driver_lib/run/config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ union TBasicKikimrServicesMask {
125125

126126
TBasicKikimrServicesMask() {
127127
EnableAll();
128+
EnableDatabaseMetadataCache = false;
128129
}
129130
};
130131

ydb/core/external_sources/object_storage.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,20 @@ struct TObjectStorageExternalSource : public IExternalSource {
2828
const NKikimrExternalSources::TGeneral& general) const override {
2929
NKikimrExternalSources::TObjectStorage objectStorage;
3030
for (const auto& [key, value]: general.attributes()) {
31-
if (key == "format") {
31+
auto lowerKey = to_lower(key);
32+
if (lowerKey == "format") {
3233
objectStorage.set_format(value);
33-
} else if (key == "compression") {
34+
} else if (lowerKey == "compression") {
3435
objectStorage.set_compression(value);
3536
} else if (key.StartsWith("projection.") || key == "storage.location.template") {
3637
objectStorage.mutable_projection()->insert({key, value});
37-
} else if (key == "partitioned_by") {
38+
} else if (lowerKey == "partitioned_by") {
3839
auto json = NSc::TValue::FromJsonThrow(value);
3940
for (const auto& column: json.GetArray()) {
4041
*objectStorage.add_partitioned_by() = column;
4142
}
42-
} else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "csv_delimiter"sv}, key)) {
43-
objectStorage.mutable_format_setting()->insert({key, value});
43+
} else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "csv_delimiter"sv}, lowerKey)) {
44+
objectStorage.mutable_format_setting()->insert({lowerKey, value});
4445
} else {
4546
ythrow TExternalSourceException() << "Unknown attribute " << key;
4647
}

ydb/core/fq/libs/actors/clusters_from_connections.cpp

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -216,17 +216,14 @@ void AddClustersFromConnections(
216216
switch (conn.content().setting().connection_case()) {
217217
case FederatedQuery::ConnectionSetting::kYdbDatabase: {
218218
const auto& db = conn.content().setting().ydb_database();
219-
auto* clusterCfg = gatewaysConfig.MutableYdb()->AddClusterMapping();
219+
auto* clusterCfg = gatewaysConfig.MutableGeneric()->AddClusterMapping();
220+
clusterCfg->SetKind(NYql::NConnector::NApi::EDataSourceKind::YDB);
221+
clusterCfg->SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
220222
clusterCfg->SetName(connectionName);
221-
clusterCfg->SetId(db.database_id());
222-
if (db.database())
223-
clusterCfg->SetDatabase(db.database());
224-
if (db.endpoint())
225-
clusterCfg->SetEndpoint(db.endpoint());
226-
clusterCfg->SetSecure(db.secure());
227-
clusterCfg->SetAddBearerToToken(common.GetUseBearerForYdb());
223+
clusterCfg->SetDatabaseId(db.database_id());
224+
clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources());
228225
FillClusterAuth(*clusterCfg, db.auth(), authToken, accountIdSignatures);
229-
clusters.emplace(connectionName, YdbProviderName);
226+
clusters.emplace(connectionName, GenericProviderName);
230227
break;
231228
}
232229
case FederatedQuery::ConnectionSetting::kClickhouseCluster: {

ydb/core/fq/libs/actors/database_resolver.cpp

Lines changed: 55 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "database_resolver.h"
22

3+
#include <util/string/split.h>
34
#include <ydb/core/fq/libs/common/cache.h>
45
#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
56
#include <ydb/core/fq/libs/events/events.h>
@@ -98,8 +99,6 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
9899
}
99100

100101
void DieOnTtl() {
101-
Success = false;
102-
103102
auto errorMsg = TStringBuilder() << "Could not resolve database ids: ";
104103
bool firstUnresolvedDbId = true;
105104
for (const auto& [_, params]: Requests) {
@@ -112,46 +111,41 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
112111
}
113112
errorMsg << " in " << ResolvingTtl << " seconds.";
114113
LOG_E("ResponseProcessor::DieOnTtl: errorMsg=" << errorMsg);
115-
116-
SendResolvedEndpointsAndDie(errorMsg);
114+
Issues.AddIssue(errorMsg);
115+
SendResolvedEndpointsAndDie();
117116
}
118117

119-
void SendResolvedEndpointsAndDie(const TString& errorMsg) {
120-
NYql::TIssues issues;
121-
if (errorMsg) {
122-
issues.AddIssue(errorMsg);
123-
}
124-
118+
void SendResolvedEndpointsAndDie() {
125119
Send(Sender,
126120
new TEvents::TEvEndpointResponse(
127-
NYql::TDatabaseResolverResponse(std::move(DatabaseId2Description), Success, issues)));
121+
NYql::TDatabaseResolverResponse(std::move(DatabaseId2Description), Issues.Empty(), Issues)));
128122
PassAway();
129123
LOG_D("ResponseProcessor::SendResolvedEndpointsAndDie: passed away");
130124
}
131125

132126
void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev)
133127
{
134-
TString errorMessage;
135128
TMaybe<TDatabaseDescription> result;
136129
const auto requestIter = Requests.find(ev->Get()->Request);
137130
HandledIds++;
138131

139-
LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got MDB API response: code=" << ev->Get()->Response->Status);
132+
LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got API response: code=" << ev->Get()->Response->Status);
140133

141134
try {
142-
HandleResponse(ev, requestIter, errorMessage, result);
135+
HandleResponse(ev, requestIter, result);
143136
} catch (...) {
144137
const TString msg = TStringBuilder() << "error while response processing, params "
145138
<< ((requestIter != Requests.end()) ? requestIter->second.ToDebugString() : TString{"unknown"})
146139
<< ", details: " << CurrentExceptionMessage();
147140
LOG_E("ResponseProccessor::Handle(TEvHttpIncomingResponse): " << msg);
141+
Issues.AddIssue(msg);
148142
}
149143

150144
LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): progress: "
151145
<< DatabaseId2Description.size() << " of " << Requests.size() << " requests are done");
152146

153147
if (HandledIds == Requests.size()) {
154-
SendResolvedEndpointsAndDie(errorMessage);
148+
SendResolvedEndpointsAndDie();
155149
}
156150
}
157151

@@ -160,18 +154,25 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
160154
void HandleResponse(
161155
NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev,
162156
const TRequestMap::const_iterator& requestIter,
163-
TString& errorMessage,
164157
TMaybe<TDatabaseDescription>& result)
165-
{
166-
if (ev->Get()->Error.empty() && (ev->Get()->Response && ev->Get()->Response->Status == "200")) {
167-
errorMessage = HandleSuccessfulResponse(ev, requestIter, result);
158+
{
159+
TString errorMessage;
160+
161+
if (requestIter == Requests.end()) {
162+
// Requests are guaranteed to be kept in within TResponseProcessor until the response arrives.
163+
// If there is no appropriate request, it's a fatal error.
164+
errorMessage = "Invariant violation: unknown request";
168165
} else {
169-
errorMessage = HandleFailedResponse(ev, requestIter);
166+
if (ev->Get()->Error.empty() && (ev->Get()->Response && ev->Get()->Response->Status == "200")) {
167+
errorMessage = HandleSuccessfulResponse(ev, *requestIter, result);
168+
} else {
169+
errorMessage = HandleFailedResponse(ev, *requestIter);
170+
}
170171
}
171172

172173
if (errorMessage) {
174+
Issues.AddIssue(errorMessage);
173175
LOG_E("ResponseProcessor::Handle(HttpIncomingResponse): error=" << errorMessage);
174-
Success = false;
175176
} else {
176177
const auto& params = requestIter->second;
177178
auto key = std::make_tuple(params.Id, params.DatabaseType, params.DatabaseAuth);
@@ -191,17 +192,13 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
191192

192193
TString HandleSuccessfulResponse(
193194
NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev,
194-
const TRequestMap::const_iterator& requestIter,
195+
const TRequestMap::value_type& requestWithParams,
195196
TMaybe<TDatabaseDescription>& result
196197
) {
197-
if (requestIter == Requests.end()) {
198-
return "unknown request";
199-
}
200-
201198
NJson::TJsonReaderConfig jsonConfig;
202199
NJson::TJsonValue databaseInfo;
203200

204-
const auto& params = requestIter->second;
201+
const auto& params = requestWithParams.second;
205202
const bool parseJsonOk = NJson::ReadJsonTree(ev->Get()->Response->Body, &jsonConfig, &databaseInfo);
206203
TParsers::const_iterator parserIt;
207204
if (parseJsonOk && (parserIt = Parsers.find(params.DatabaseType)) != Parsers.end()) {
@@ -232,37 +229,37 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
232229

233230
TString HandleFailedResponse(
234231
NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev,
235-
const TRequestMap::const_iterator& requestIter
232+
const TRequestMap::value_type& requestWithParams
236233
) const {
237-
if (requestIter == Requests.end()) {
238-
return "unknown request";
239-
}
234+
auto sb = TStringBuilder()
235+
<< "Error while trying to resolve managed " << ToString(requestWithParams.second.DatabaseType)
236+
<< " database with id " << requestWithParams.second.Id << " via HTTP request to"
237+
<< ": endpoint '" << requestWithParams.first->Host << "'"
238+
<< ", url '" << requestWithParams.first->URL << "'"
239+
<< ": ";
240+
241+
// Handle network error (when the response is empty)
242+
if (!ev->Get()->Response) {
243+
return sb << ev->Get()->Error;
244+
}
240245

246+
// Handle unauthenticated error
241247
const auto& status = ev->Get()->Response->Status;
242-
243248
if (status == "403") {
244-
return TStringBuilder() << "You have no permission to resolve database id into database endpoint. " + DetailedPermissionsError(requestIter->second);
249+
return sb << "you have no permission to resolve database id into database endpoint." + DetailedPermissionsError(requestWithParams.second);
245250
}
246251

247-
auto errorMessage = ev->Get()->Error;
248-
249-
const TString error = TStringBuilder()
250-
<< "Cannot resolve database id (status = " << status << "). "
251-
<< "Response body from " << ev->Get()->Request->URL << ": " << (ev->Get()->Response ? ev->Get()->Response->Body : "empty");
252-
if (!errorMessage.empty()) {
253-
errorMessage += '\n';
254-
}
255-
errorMessage += error;
256-
257-
return errorMessage;
252+
// Unexpected error. Add response body for debug
253+
return sb << Endl
254+
<< "Status: " << status << Endl
255+
<< "Response body: " << ev->Get()->Response->Body;
258256
}
259257

260258

261259
TString DetailedPermissionsError(const TResolveParams& params) const {
262-
263260
if (params.DatabaseType == EDatabaseType::ClickHouse || params.DatabaseType == EDatabaseType::PostgreSQL) {
264261
auto mdbTypeStr = NYql::DatabaseTypeLowercase(params.DatabaseType);
265-
return TStringBuilder() << "Please check that your service account has role " <<
262+
return TStringBuilder() << " Please check that your service account has role " <<
266263
"`managed-" << mdbTypeStr << ".viewer`.";
267264
}
268265
return {};
@@ -275,7 +272,7 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
275272
const NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
276273
TDatabaseResolverResponse::TDatabaseDescriptionMap DatabaseId2Description;
277274
size_t HandledIds = 0;
278-
bool Success = true;
275+
NYql::TIssues Issues;
279276
const TParsers& Parsers;
280277
TDuration ResolvingTtl = TDuration::Seconds(30); //TODO: Use cfg
281278
};
@@ -312,7 +309,12 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
312309
}
313310

314311
Y_ENSURE(endpoint);
315-
return TDatabaseDescription{endpoint, "", 0, database, secure};
312+
313+
TVector<TString> split = StringSplitter(endpoint).Split(':');
314+
315+
Y_ENSURE(split.size() == 2);
316+
317+
return TDatabaseDescription{endpoint, split[0], FromString(split[1]), database, secure};
316318
};
317319
Parsers[NYql::EDatabaseType::Ydb] = ydbParser;
318320
Parsers[NYql::EDatabaseType::DataStreams] = [ydbParser](
@@ -327,9 +329,11 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
327329
if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) {
328330
// Replace "ydb." -> "yds."
329331
ret.Endpoint[2] = 's';
332+
ret.Host[2] = 's';
330333
}
331334
if (isDedicatedDb) {
332335
ret.Endpoint = "u-" + ret.Endpoint;
336+
ret.Host = "u-" + ret.Host;
333337
}
334338
return ret;
335339
};
@@ -486,6 +490,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
486490
try {
487491
TString url;
488492
if (IsIn({NYql::EDatabaseType::Ydb, NYql::EDatabaseType::DataStreams }, databaseType)) {
493+
YQL_ENSURE(ev->Get()->YdbMvpEndpoint.Size() > 0, "empty YDB MVP Endpoint");
489494
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
490495
.AddUrlParam("databaseId", databaseId)
491496
.Build();
@@ -497,7 +502,6 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
497502
.AddPathComponent("hosts")
498503
.Build();
499504
}
500-
LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << url);
501505

502506
NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet(url);
503507

@@ -507,6 +511,8 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
507511
httpRequest->Set("Authorization", token);
508512
}
509513

514+
LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << "url: " << httpRequest->URL);
515+
510516
requests[httpRequest] = TResolveParams{databaseId, databaseType, databaseAuth};
511517
} catch (const std::exception& e) {
512518
const TString msg = TStringBuilder() << "error while preparing to resolve database id: " << databaseId

ydb/core/fq/libs/actors/proxy_private.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ namespace NKikimr {
2222

2323
namespace NFq {
2424

25-
NActors::TActorId MakeYqPrivateProxyId();
26-
2725
NActors::IActor* CreateYqlAnalyticsPrivateProxy(
2826
const NConfig::TPrivateProxyConfig& privateProxyConfig,
2927
TIntrusivePtr<ITimeProvider> timeProvider,

ydb/core/fq/libs/actors/run_actor.cpp

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
3131
#include <ydb/library/yql/providers/pq/task_meta/task_meta.h>
3232
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
33-
#include <ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h>
3433
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
3534
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
3635
#include <ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h>
@@ -1940,11 +1939,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
19401939
}
19411940

19421941
{
1943-
dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver));
1944-
}
1945-
1946-
{
1947-
dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver));
1942+
dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver, Params.CredentialsFactory));
19481943
}
19491944

19501945
{

0 commit comments

Comments
 (0)