@@ -34,7 +34,7 @@ namespace NYql {
34
34
std::optional<NConnector::NApi::TDescribeTableResponse> Response;
35
35
};
36
36
37
- using TMapType =
37
+ using TDescribeTableMap =
38
38
std::unordered_map<TGenericState::TTableAddress, TTableDescription::TPtr, THash<TGenericState::TTableAddress>>;
39
39
40
40
public:
@@ -50,7 +50,7 @@ namespace NYql {
50
50
return TStatus::Ok;
51
51
}
52
52
53
- std::unordered_set<TMapType ::key_type, TMapType ::hasher> pendingTables;
53
+ std::unordered_set<TDescribeTableMap ::key_type, TDescribeTableMap ::hasher> pendingTables;
54
54
const auto & reads = FindNodes (input, [&](const TExprNode::TPtr& node) {
55
55
if (const auto maybeRead = TMaybeNode<TGenRead>(node)) {
56
56
return maybeRead.Cast ().DataSource ().Category ().Value () == GenericProviderName;
@@ -146,75 +146,16 @@ namespace NYql {
146
146
});
147
147
148
148
TNodeOnNodeOwnedMap replaces (reads.size ());
149
- bool hasErrors = false ;
150
149
151
150
for (const auto & r : reads) {
152
- const TGenRead read (r);
153
- const auto clusterName = read.DataSource ().Cluster ().StringValue ();
154
- const auto & keyArg = TExprBase (read.FreeArgs ().Get (2 ).Ref ().HeadPtr ()).Cast <TCoKey>().Ref ().Head ();
155
- const auto tableName = TString (keyArg.Tail ().Head ().Content ());
156
-
157
- const auto it = Results_.find (TGenericState::TTableAddress (clusterName, tableName));
158
- if (Results_.cend () != it) {
159
- const auto & response = it->second ->Response ;
160
-
161
- if (NConnector::IsSuccess (*response)) {
162
- TGenericState::TTableMeta tableMeta;
163
- tableMeta.Schema = response->schema ();
164
- tableMeta.DataSourceInstance = it->second ->DataSourceInstance ;
165
-
166
- const auto & parse = ParseTableMeta (tableMeta.Schema , clusterName, tableName, ctx, tableMeta.ColumnOrder );
167
-
168
- if (parse) {
169
- tableMeta.ItemType = parse;
170
- if (const auto ins = replaces.emplace (read.Raw (), TExprNode::TPtr ()); ins.second ) {
171
- // clang-format off
172
- auto row = Build<TCoArgument>(ctx, read.Pos ())
173
- .Name (" row" )
174
- .Done ();
175
-
176
- auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos ())
177
- .Args ({row})
178
- .Body <TCoBool>()
179
- .Literal ().Build (" true" )
180
- .Build ()
181
- .Done ().Ptr ();
182
-
183
- auto table = Build<TGenTable>(ctx, read.Pos ())
184
- .Name ().Value (tableName).Build ()
185
- .Splits <TCoVoid>().Build ().Done ();
186
-
187
- ins.first ->second = Build<TGenReadTable>(ctx, read.Pos ())
188
- .World (read.World ())
189
- .DataSource (read.DataSource ())
190
- .Table (table)
191
- .Columns <TCoVoid>().Build ()
192
- .FilterPredicate (emptyPredicate)
193
- .Done ().Ptr ();
194
- // clang-format on
195
- }
196
- State_->AddTable (clusterName, tableName, std::move (tableMeta));
197
- } else {
198
- hasErrors = true ;
199
- break ;
200
- }
201
- } else {
202
- const auto & error = response->error ();
203
- NConnector::ErrorToExprCtx (error, ctx, ctx.GetPosition (read.Pos ()),
204
- TStringBuilder () << " Loading metadata for table: " << clusterName << ' .' << tableName);
205
- hasErrors = true ;
206
- break ;
151
+ auto issues = HandleDescribeTableResponse (r, ctx, replaces);
152
+ if (issues) {
153
+ for (const auto & issue : *issues) {
154
+ ctx.AddError (issue);
207
155
}
208
- } else {
209
- ctx.AddError (TIssue (ctx.GetPosition (read.Pos ()), TStringBuilder ()
210
- << " Not found result for " << clusterName << ' .' << tableName));
211
- hasErrors = true ;
212
- break ;
213
- }
214
- }
215
156
216
- if (hasErrors) {
217
- return TStatus::Error;
157
+ return TStatus::Error;
158
+ }
218
159
}
219
160
220
161
return RemapExpr (input, output, replaces, ctx, TOptimizeExprSettings (nullptr ));
@@ -226,14 +167,64 @@ namespace NYql {
226
167
}
227
168
228
169
private:
229
- const TStructExprType* ParseTableMeta (const NConnector::NApi::TSchema& schema, const std::string_view& cluster,
230
- const std::string_view& table, TExprContext& ctx, TVector<TString>& columnOrder) try {
170
+ std::optional<TIssues> HandleDescribeTableResponse (
171
+ const TIntrusivePtr<TExprNode>& read,
172
+ TExprContext& ctx,
173
+ TNodeOnNodeOwnedMap& replaces
174
+ ) {
175
+ const TGenRead genRead (read);
176
+ const auto clusterName = genRead.DataSource ().Cluster ().StringValue ();
177
+ const auto & keyArg = TExprBase (genRead.FreeArgs ().Get (2 ).Ref ().HeadPtr ()).Cast <TCoKey>().Ref ().Head ();
178
+ const auto tableName = TString (keyArg.Tail ().Head ().Content ());
179
+
180
+ const auto it = Results_.find (TGenericState::TTableAddress (clusterName, tableName));
181
+
182
+ if (it == Results_.cend ()) {
183
+ TIssues issues;
184
+ issues.AddIssue (TIssue (ctx.GetPosition (genRead.Pos ()), TStringBuilder ()
185
+ << " Not found result for " << clusterName << ' .' << tableName));
186
+ return issues;
187
+ }
188
+
189
+ const auto & response = it->second ->Response ;
190
+
191
+ if (!NConnector::IsSuccess (*response)) {
192
+ return NConnector::ErrorToIssues (
193
+ response->error (),
194
+ TStringBuilder () << " Loading metadata for table: " << clusterName << ' .' << tableName
195
+ );
196
+ }
197
+
198
+ TGenericState::TTableMeta tableMeta;
199
+ tableMeta.Schema = response->schema ();
200
+ tableMeta.DataSourceInstance = it->second ->DataSourceInstance ;
201
+
202
+ auto issue = ParseTableMeta (ctx, clusterName, tableName, tableMeta);
203
+ if (issue) {
204
+ TIssues issues;
205
+ issues.AddIssue (std::move (*issue));
206
+ return issues;
207
+ }
208
+
209
+ if (const auto ins = replaces.emplace (genRead.Raw (), TExprNode::TPtr ()); ins.second ) {
210
+ ins.first ->second = MakeTableMetaNode (ctx, genRead, tableName);
211
+ }
212
+
213
+ State_->AddTable (clusterName, tableName, std::move (tableMeta));
214
+ return std::nullopt;
215
+ }
216
+
217
+ std::optional<TIssue> ParseTableMeta (
218
+ TExprContext& ctx,
219
+ const std::string_view& cluster,
220
+ const std::string_view& table,
221
+ TGenericState::TTableMeta& tableMeta
222
+ ) try {
231
223
TVector<const TItemExprType*> items;
232
224
233
- auto columns = schema .columns ();
225
+ auto columns = tableMeta. Schema .columns ();
234
226
if (columns.empty ()) {
235
- ctx.AddError (TIssue ({}, TStringBuilder () << " Table " << cluster << ' .' << table << " doesn't exist." ));
236
- return nullptr ;
227
+ return TIssue ({}, TStringBuilder () << " Table " << cluster << ' .' << table << " doesn't exist." );
237
228
}
238
229
239
230
for (auto i = 0 ; i < columns.size (); i++) {
@@ -243,13 +234,44 @@ namespace NYql {
243
234
244
235
// Create items from graph
245
236
items.emplace_back (ctx.MakeType <TItemExprType>(columns.Get (i).name (), typeAnnotation));
246
- columnOrder .emplace_back (columns.Get (i).name ());
237
+ tableMeta. ColumnOrder .emplace_back (columns.Get (i).name ());
247
238
}
248
- // FIXME: handle on Connector's side?
249
- return ctx.MakeType <TStructExprType>(items);
239
+
240
+ tableMeta.ItemType = ctx.MakeType <TStructExprType>(items);
241
+ return std::nullopt;
250
242
} catch (std::exception&) {
251
- ctx.AddError (TIssue ({}, TStringBuilder () << " Failed to parse table metadata: " << CurrentExceptionMessage ()));
252
- return nullptr ;
243
+ return TIssue ({}, TStringBuilder () << " Failed to parse table metadata: " << CurrentExceptionMessage ());
244
+ }
245
+
246
+ TExprNode::TPtr MakeTableMetaNode (
247
+ TExprContext& ctx,
248
+ const TGenRead& read,
249
+ const TString& tableName
250
+ ) {
251
+ // clang-format off
252
+ auto row = Build<TCoArgument>(ctx, read.Pos ())
253
+ .Name (" row" )
254
+ .Done ();
255
+
256
+ auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos ())
257
+ .Args ({row})
258
+ .Body <TCoBool>()
259
+ .Literal ().Build (" true" )
260
+ .Build ()
261
+ .Done ().Ptr ();
262
+
263
+ auto table = Build<TGenTable>(ctx, read.Pos ())
264
+ .Name ().Value (tableName).Build ()
265
+ .Splits <TCoVoid>().Build ().Done ();
266
+
267
+ return Build<TGenReadTable>(ctx, read.Pos ())
268
+ .World (read.World ())
269
+ .DataSource (read.DataSource ())
270
+ .Table (table)
271
+ .Columns <TCoVoid>().Build ()
272
+ .FilterPredicate (emptyPredicate)
273
+ .Done ().Ptr ();
274
+ // clang-format on
253
275
}
254
276
255
277
void FillDescribeTableRequest (NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig,
@@ -402,7 +424,7 @@ namespace NYql {
402
424
private:
403
425
const TGenericState::TPtr State_;
404
426
405
- TMapType Results_;
427
+ TDescribeTableMap Results_;
406
428
NThreading::TFuture<void > AsyncFuture_;
407
429
};
408
430
0 commit comments