Skip to content

Commit bddf31a

Browse files
authored
Merge 3f0ff79 into 18686b5
2 parents 18686b5 + 3f0ff79 commit bddf31a

File tree

4 files changed

+107
-99
lines changed

4 files changed

+107
-99
lines changed

ydb/library/yql/providers/generic/connector/libcpp/error.cpp

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ namespace NYql::NConnector {
1212
return error;
1313
}
1414

15-
TIssues ErrorToIssues(const NApi::TError& error) {
15+
TIssues ErrorToIssues(const NApi::TError& error, TString prefix) {
1616
TIssues issues;
1717
issues.Reserve(error.get_arr_issues().size() + 1);
1818

1919
// add high-level error
20-
issues.AddIssue(TIssue(error.message()));
20+
issues.AddIssue(TIssue(TStringBuilder() << prefix << error.message()));
2121

2222
// convert detailed errors
2323
for (auto& subIssue : error.get_arr_issues()) {
@@ -44,20 +44,6 @@ namespace NYql::NConnector {
4444
}
4545
}
4646

47-
void ErrorToExprCtx(const NApi::TError& error, TExprContext& ctx, const TPosition& position, const TString& summary) {
48-
// add high-level error
49-
TStringBuilder ss;
50-
ss << summary << ": status=" << Ydb::StatusIds_StatusCode_Name(error.status()) << ", message=" << error.message();
51-
ctx.AddError(TIssue(position, ss));
52-
53-
// convert detailed errors
54-
TIssues issues;
55-
IssuesFromMessage(error.get_arr_issues(), issues);
56-
for (const auto& issue : issues) {
57-
ctx.AddError(issue);
58-
}
59-
}
60-
6147
NApi::TError ErrorFromGRPCStatus(const NYdbGrpc::TGrpcStatus& status) {
6248
NApi::TError result;
6349

ydb/library/yql/providers/generic/connector/libcpp/error.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
#include <grpcpp/support/status.h>
44

5-
#include <yql/essentials/ast/yql_expr.h>
65
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
76
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
87
#include <ydb/public/sdk/cpp/src/library/grpc/client/grpc_client_low.h>
8+
#include <yql/essentials/public/issue/yql_issue.h>
9+
#include <yql/essentials/utils/yql_panic.h>
910

1011
namespace NYql::NConnector {
1112
NApi::TError NewSuccess();
@@ -29,12 +30,10 @@ namespace NYql::NConnector {
2930
return ok;
3031
}
3132

32-
TIssues ErrorToIssues(const NApi::TError& error);
33+
TIssues ErrorToIssues(const NApi::TError& error, TString prefix = "");
3334

3435
NDqProto::StatusIds::StatusCode ErrorToDqStatus(const NApi::TError& error);
3536

36-
void ErrorToExprCtx(const NApi::TError& error, TExprContext& ctx, const TPosition& position, const TString& summary);
37-
3837
NApi::TError ErrorFromGRPCStatus(const NYdbGrpc::TGrpcStatus& status);
3938

4039
inline bool GrpcStatusEndOfStream(const NYdbGrpc::TGrpcStatus& status) noexcept {

ydb/library/yql/providers/generic/connector/libcpp/ya.make

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ PEERDIR(
1111
contrib/libs/grpc
1212
ydb/core/formats/arrow/serializer
1313
ydb/public/sdk/cpp/src/library/grpc/client
14-
yql/essentials/ast
1514
ydb/library/yql/dq/actors/protos
1615
yql/essentials/providers/common/proto
1716
yql/essentials/providers/common/proto

ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp

Lines changed: 102 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ namespace NYql {
3434
std::optional<NConnector::NApi::TDescribeTableResponse> Response;
3535
};
3636

37-
using TMapType =
37+
using TDescribeTableMap =
3838
std::unordered_map<TGenericState::TTableAddress, TTableDescription::TPtr, THash<TGenericState::TTableAddress>>;
3939

4040
public:
@@ -50,7 +50,7 @@ namespace NYql {
5050
return TStatus::Ok;
5151
}
5252

53-
std::unordered_set<TMapType::key_type, TMapType::hasher> pendingTables;
53+
std::unordered_set<TDescribeTableMap::key_type, TDescribeTableMap::hasher> pendingTables;
5454
const auto& reads = FindNodes(input, [&](const TExprNode::TPtr& node) {
5555
if (const auto maybeRead = TMaybeNode<TGenRead>(node)) {
5656
return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName;
@@ -146,75 +146,16 @@ namespace NYql {
146146
});
147147

148148
TNodeOnNodeOwnedMap replaces(reads.size());
149-
bool hasErrors = false;
150149

151150
for (const auto& r : reads) {
152-
const TGenRead read(r);
153-
const auto clusterName = read.DataSource().Cluster().StringValue();
154-
const auto& keyArg = TExprBase(read.FreeArgs().Get(2).Ref().HeadPtr()).Cast<TCoKey>().Ref().Head();
155-
const auto tableName = TString(keyArg.Tail().Head().Content());
156-
157-
const auto it = Results_.find(TGenericState::TTableAddress(clusterName, tableName));
158-
if (Results_.cend() != it) {
159-
const auto& response = it->second->Response;
160-
161-
if (NConnector::IsSuccess(*response)) {
162-
TGenericState::TTableMeta tableMeta;
163-
tableMeta.Schema = response->schema();
164-
tableMeta.DataSourceInstance = it->second->DataSourceInstance;
165-
166-
const auto& parse = ParseTableMeta(tableMeta.Schema, clusterName, tableName, ctx, tableMeta.ColumnOrder);
167-
168-
if (parse) {
169-
tableMeta.ItemType = parse;
170-
if (const auto ins = replaces.emplace(read.Raw(), TExprNode::TPtr()); ins.second) {
171-
// clang-format off
172-
auto row = Build<TCoArgument>(ctx, read.Pos())
173-
.Name("row")
174-
.Done();
175-
176-
auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos())
177-
.Args({row})
178-
.Body<TCoBool>()
179-
.Literal().Build("true")
180-
.Build()
181-
.Done().Ptr();
182-
183-
auto table = Build<TGenTable>(ctx, read.Pos())
184-
.Name().Value(tableName).Build()
185-
.Splits<TCoVoid>().Build().Done();
186-
187-
ins.first->second = Build<TGenReadTable>(ctx, read.Pos())
188-
.World(read.World())
189-
.DataSource(read.DataSource())
190-
.Table(table)
191-
.Columns<TCoVoid>().Build()
192-
.FilterPredicate(emptyPredicate)
193-
.Done().Ptr();
194-
// clang-format on
195-
}
196-
State_->AddTable(clusterName, tableName, std::move(tableMeta));
197-
} else {
198-
hasErrors = true;
199-
break;
200-
}
201-
} else {
202-
const auto& error = response->error();
203-
NConnector::ErrorToExprCtx(error, ctx, ctx.GetPosition(read.Pos()),
204-
TStringBuilder() << "Loading metadata for table: " << clusterName << '.' << tableName);
205-
hasErrors = true;
206-
break;
151+
TIssues issues = HandleDescribeTableResponse(r, ctx, replaces);
152+
if (issues) {
153+
for (const auto& issue : issues) {
154+
ctx.AddError(issue);
207155
}
208-
} else {
209-
ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder()
210-
<< "Not found result for " << clusterName << '.' << tableName));
211-
hasErrors = true;
212-
break;
213-
}
214-
}
215156

216-
if (hasErrors) {
217-
return TStatus::Error;
157+
return TStatus::Error;
158+
}
218159
}
219160

220161
return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr));
@@ -226,14 +167,64 @@ namespace NYql {
226167
}
227168

228169
private:
229-
const TStructExprType* ParseTableMeta(const NConnector::NApi::TSchema& schema, const std::string_view& cluster,
230-
const std::string_view& table, TExprContext& ctx, TVector<TString>& columnOrder) try {
170+
TIssues HandleDescribeTableResponse(
171+
const TIntrusivePtr<TExprNode>& read,
172+
TExprContext& ctx,
173+
TNodeOnNodeOwnedMap& replaces
174+
) {
175+
const TGenRead genRead(read);
176+
const auto clusterName = genRead.DataSource().Cluster().StringValue();
177+
const auto& keyArg = TExprBase(genRead.FreeArgs().Get(2).Ref().HeadPtr()).Cast<TCoKey>().Ref().Head();
178+
const auto tableName = TString(keyArg.Tail().Head().Content());
179+
180+
const auto it = Results_.find(TGenericState::TTableAddress(clusterName, tableName));
181+
182+
if (it == Results_.cend()) {
183+
TIssues issues;
184+
issues.AddIssue(TIssue(ctx.GetPosition(genRead.Pos()), TStringBuilder()
185+
<< "Not found result for " << clusterName << '.' << tableName));
186+
return issues;
187+
}
188+
189+
const auto& response = it->second->Response;
190+
191+
if (!NConnector::IsSuccess(*response)) {
192+
return NConnector::ErrorToIssues(
193+
response->error(),
194+
TStringBuilder() << "Loading metadata for table: " << clusterName << '.' << tableName
195+
);
196+
}
197+
198+
TGenericState::TTableMeta tableMeta;
199+
tableMeta.Schema = response->schema();
200+
tableMeta.DataSourceInstance = it->second->DataSourceInstance;
201+
202+
auto issues = ParseTableMeta(ctx, ctx.GetPosition(read->Pos()), clusterName, tableName, tableMeta);
203+
if (issues) {
204+
return issues;
205+
}
206+
207+
if (const auto ins = replaces.emplace(genRead.Raw(), TExprNode::TPtr()); ins.second) {
208+
ins.first->second = MakeTableMetaNode(ctx, genRead, tableName);
209+
}
210+
211+
State_->AddTable(clusterName, tableName, std::move(tableMeta));
212+
return TIssues{};
213+
}
214+
215+
TIssues ParseTableMeta(
216+
TExprContext& ctx,
217+
const TPosition& pos,
218+
const std::string_view& cluster,
219+
const std::string_view& table,
220+
TGenericState::TTableMeta& tableMeta
221+
) try {
231222
TVector<const TItemExprType*> items;
232223

233-
auto columns = schema.columns();
224+
auto columns = tableMeta.Schema.columns();
234225
if (columns.empty()) {
235-
ctx.AddError(TIssue({}, TStringBuilder() << "Table " << cluster << '.' << table << " doesn't exist."));
236-
return nullptr;
226+
TIssues issues;
227+
issues.AddIssue(TIssue(pos, TStringBuilder() << "Table " << cluster << '.' << table << " doesn't exist."));
237228
}
238229

239230
for (auto i = 0; i < columns.size(); i++) {
@@ -243,13 +234,46 @@ namespace NYql {
243234

244235
// Create items from graph
245236
items.emplace_back(ctx.MakeType<TItemExprType>(columns.Get(i).name(), typeAnnotation));
246-
columnOrder.emplace_back(columns.Get(i).name());
237+
tableMeta.ColumnOrder.emplace_back(columns.Get(i).name());
247238
}
248-
// FIXME: handle on Connector's side?
249-
return ctx.MakeType<TStructExprType>(items);
239+
240+
tableMeta.ItemType = ctx.MakeType<TStructExprType>(items);
241+
return TIssues{};
250242
} catch (std::exception&) {
251-
ctx.AddError(TIssue({}, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage()));
252-
return nullptr;
243+
TIssues issues;
244+
issues.AddIssue(TIssue(pos, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage()));
245+
return issues;
246+
}
247+
248+
TExprNode::TPtr MakeTableMetaNode(
249+
TExprContext& ctx,
250+
const TGenRead& read,
251+
const TString& tableName
252+
) {
253+
// clang-format off
254+
auto row = Build<TCoArgument>(ctx, read.Pos())
255+
.Name("row")
256+
.Done();
257+
258+
auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos())
259+
.Args({row})
260+
.Body<TCoBool>()
261+
.Literal().Build("true")
262+
.Build()
263+
.Done().Ptr();
264+
265+
auto table = Build<TGenTable>(ctx, read.Pos())
266+
.Name().Value(tableName).Build()
267+
.Splits<TCoVoid>().Build().Done();
268+
269+
return Build<TGenReadTable>(ctx, read.Pos())
270+
.World(read.World())
271+
.DataSource(read.DataSource())
272+
.Table(table)
273+
.Columns<TCoVoid>().Build()
274+
.FilterPredicate(emptyPredicate)
275+
.Done().Ptr();
276+
// clang-format on
253277
}
254278

255279
void FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig,
@@ -402,7 +426,7 @@ namespace NYql {
402426
private:
403427
const TGenericState::TPtr State_;
404428

405-
TMapType Results_;
429+
TDescribeTableMap Results_;
406430
NThreading::TFuture<void> AsyncFuture_;
407431
};
408432

0 commit comments

Comments
 (0)