Skip to content

YDB FQ: simplify yql_generic_load_meta.cpp code #14307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions ydb/library/yql/providers/generic/connector/libcpp/error.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ namespace NYql::NConnector {
return error;
}

TIssues ErrorToIssues(const NApi::TError& error) {
TIssues ErrorToIssues(const NApi::TError& error, TString prefix) {
TIssues issues;
issues.Reserve(error.get_arr_issues().size() + 1);

// add high-level error
issues.AddIssue(TIssue(error.message()));
issues.AddIssue(TIssue(TStringBuilder() << prefix << error.message()));

// convert detailed errors
for (auto& subIssue : error.get_arr_issues()) {
Expand All @@ -44,20 +44,6 @@ namespace NYql::NConnector {
}
}

void ErrorToExprCtx(const NApi::TError& error, TExprContext& ctx, const TPosition& position, const TString& summary) {
// add high-level error
TStringBuilder ss;
ss << summary << ": status=" << Ydb::StatusIds_StatusCode_Name(error.status()) << ", message=" << error.message();
ctx.AddError(TIssue(position, ss));

// convert detailed errors
TIssues issues;
IssuesFromMessage(error.get_arr_issues(), issues);
for (const auto& issue : issues) {
ctx.AddError(issue);
}
}

NApi::TError ErrorFromGRPCStatus(const NYdbGrpc::TGrpcStatus& status) {
NApi::TError result;

Expand Down
7 changes: 3 additions & 4 deletions ydb/library/yql/providers/generic/connector/libcpp/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

#include <grpcpp/support/status.h>

#include <yql/essentials/ast/yql_expr.h>
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
#include <ydb/public/sdk/cpp/src/library/grpc/client/grpc_client_low.h>
#include <yql/essentials/public/issue/yql_issue.h>
#include <yql/essentials/utils/yql_panic.h>

namespace NYql::NConnector {
NApi::TError NewSuccess();
Expand All @@ -29,12 +30,10 @@ namespace NYql::NConnector {
return ok;
}

TIssues ErrorToIssues(const NApi::TError& error);
TIssues ErrorToIssues(const NApi::TError& error, TString prefix = "");

NDqProto::StatusIds::StatusCode ErrorToDqStatus(const NApi::TError& error);

void ErrorToExprCtx(const NApi::TError& error, TExprContext& ctx, const TPosition& position, const TString& summary);

NApi::TError ErrorFromGRPCStatus(const NYdbGrpc::TGrpcStatus& status);

inline bool GrpcStatusEndOfStream(const NYdbGrpc::TGrpcStatus& status) noexcept {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ PEERDIR(
contrib/libs/grpc
ydb/core/formats/arrow/serializer
ydb/public/sdk/cpp/src/library/grpc/client
yql/essentials/ast
ydb/library/yql/dq/actors/protos
yql/essentials/providers/common/proto
yql/essentials/providers/common/proto
Expand Down
187 changes: 106 additions & 81 deletions ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace NYql {
std::optional<NConnector::NApi::TDescribeTableResponse> Response;
};

using TMapType =
using TDescribeTableMap =
std::unordered_map<TGenericState::TTableAddress, TTableDescription::TPtr, THash<TGenericState::TTableAddress>>;

public:
Expand All @@ -50,7 +50,7 @@ namespace NYql {
return TStatus::Ok;
}

std::unordered_set<TMapType::key_type, TMapType::hasher> pendingTables;
std::unordered_set<TDescribeTableMap::key_type, TDescribeTableMap::hasher> pendingTables;
const auto& reads = FindNodes(input, [&](const TExprNode::TPtr& node) {
if (const auto maybeRead = TMaybeNode<TGenRead>(node)) {
return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName;
Expand Down Expand Up @@ -146,75 +146,16 @@ namespace NYql {
});

TNodeOnNodeOwnedMap replaces(reads.size());
bool hasErrors = false;

for (const auto& r : reads) {
const TGenRead read(r);
const auto clusterName = read.DataSource().Cluster().StringValue();
const auto& keyArg = TExprBase(read.FreeArgs().Get(2).Ref().HeadPtr()).Cast<TCoKey>().Ref().Head();
const auto tableName = TString(keyArg.Tail().Head().Content());

const auto it = Results_.find(TGenericState::TTableAddress(clusterName, tableName));
if (Results_.cend() != it) {
const auto& response = it->second->Response;

if (NConnector::IsSuccess(*response)) {
TGenericState::TTableMeta tableMeta;
tableMeta.Schema = response->schema();
tableMeta.DataSourceInstance = it->second->DataSourceInstance;

const auto& parse = ParseTableMeta(tableMeta.Schema, clusterName, tableName, ctx, tableMeta.ColumnOrder);

if (parse) {
tableMeta.ItemType = parse;
if (const auto ins = replaces.emplace(read.Raw(), TExprNode::TPtr()); ins.second) {
// clang-format off
auto row = Build<TCoArgument>(ctx, read.Pos())
.Name("row")
.Done();

auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos())
.Args({row})
.Body<TCoBool>()
.Literal().Build("true")
.Build()
.Done().Ptr();

auto table = Build<TGenTable>(ctx, read.Pos())
.Name().Value(tableName).Build()
.Splits<TCoVoid>().Build().Done();

ins.first->second = Build<TGenReadTable>(ctx, read.Pos())
.World(read.World())
.DataSource(read.DataSource())
.Table(table)
.Columns<TCoVoid>().Build()
.FilterPredicate(emptyPredicate)
.Done().Ptr();
// clang-format on
}
State_->AddTable(clusterName, tableName, std::move(tableMeta));
} else {
hasErrors = true;
break;
}
} else {
const auto& error = response->error();
NConnector::ErrorToExprCtx(error, ctx, ctx.GetPosition(read.Pos()),
TStringBuilder() << "Loading metadata for table: " << clusterName << '.' << tableName);
hasErrors = true;
break;
TIssues issues = HandleDescribeTableResponse(r, ctx, replaces);
if (issues) {
for (const auto& issue : issues) {
ctx.AddError(issue);
}
} else {
ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder()
<< "Not found result for " << clusterName << '.' << tableName));
hasErrors = true;
break;
}
}

if (hasErrors) {
return TStatus::Error;
return TStatus::Error;
}
}

return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr));
Expand All @@ -226,30 +167,114 @@ namespace NYql {
}

private:
const TStructExprType* ParseTableMeta(const NConnector::NApi::TSchema& schema, const std::string_view& cluster,
const std::string_view& table, TExprContext& ctx, TVector<TString>& columnOrder) try {
TIssues HandleDescribeTableResponse(
const TIntrusivePtr<TExprNode>& read,
TExprContext& ctx,
TNodeOnNodeOwnedMap& replaces
) {
const TGenRead genRead(read);
const auto clusterName = genRead.DataSource().Cluster().StringValue();
const auto& keyArg = TExprBase(genRead.FreeArgs().Get(2).Ref().HeadPtr()).Cast<TCoKey>().Ref().Head();
const auto tableName = TString(keyArg.Tail().Head().Content());

const auto it = Results_.find(TGenericState::TTableAddress(clusterName, tableName));

if (it == Results_.cend()) {
TIssues issues;
issues.AddIssue(TIssue(ctx.GetPosition(genRead.Pos()), TStringBuilder()
<< "Not found result for " << clusterName << '.' << tableName));
return issues;
}

const auto& response = it->second->Response;

if (!NConnector::IsSuccess(*response)) {
return NConnector::ErrorToIssues(
response->error(),
TStringBuilder() << "Loading metadata for table: " << clusterName << '.' << tableName
);
}

TGenericState::TTableMeta tableMeta;
tableMeta.Schema = response->schema();
tableMeta.DataSourceInstance = it->second->DataSourceInstance;

auto issues = ParseTableMeta(ctx, ctx.GetPosition(read->Pos()), clusterName, tableName, tableMeta);
if (issues) {
return issues;
}

if (const auto ins = replaces.emplace(genRead.Raw(), TExprNode::TPtr()); ins.second) {
ins.first->second = MakeTableMetaNode(ctx, genRead, tableName);
}

State_->AddTable(clusterName, tableName, std::move(tableMeta));
return TIssues{};
}

TIssues ParseTableMeta(
TExprContext& ctx,
const TPosition& pos,
const std::string_view& cluster,
const std::string_view& table,
TGenericState::TTableMeta& tableMeta
) try {
TVector<const TItemExprType*> items;

auto columns = schema.columns();
const auto& columns = tableMeta.Schema.columns();
if (columns.empty()) {
ctx.AddError(TIssue({}, TStringBuilder() << "Table " << cluster << '.' << table << " doesn't exist."));
return nullptr;
TIssues issues;
issues.AddIssue(TIssue(pos, TStringBuilder() << "Table " << cluster << '.' << table << " doesn't exist."));
return issues;
}

for (auto i = 0; i < columns.size(); i++) {
for (const auto& column: columns) {
// Make type annotation
NYdb::TTypeParser parser(columns.Get(i).type());
NYdb::TTypeParser parser(column.type());
auto typeAnnotation = NFq::MakeType(parser, ctx);

// Create items from graph
items.emplace_back(ctx.MakeType<TItemExprType>(columns.Get(i).name(), typeAnnotation));
columnOrder.emplace_back(columns.Get(i).name());
items.emplace_back(ctx.MakeType<TItemExprType>(column.name(), typeAnnotation));
tableMeta.ColumnOrder.emplace_back(column.name());
}
// FIXME: handle on Connector's side?
return ctx.MakeType<TStructExprType>(items);

tableMeta.ItemType = ctx.MakeType<TStructExprType>(items);
return TIssues{};
} catch (std::exception&) {
ctx.AddError(TIssue({}, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage()));
return nullptr;
TIssues issues;
issues.AddIssue(TIssue(pos, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage()));
return issues;
}

TExprNode::TPtr MakeTableMetaNode(
TExprContext& ctx,
const TGenRead& read,
const TString& tableName
) {
// clang-format off
auto row = Build<TCoArgument>(ctx, read.Pos())
.Name("row")
.Done();

auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos())
.Args({row})
.Body<TCoBool>()
.Literal().Build("true")
.Build()
.Done().Ptr();

auto table = Build<TGenTable>(ctx, read.Pos())
.Name().Value(tableName).Build()
.Splits<TCoVoid>().Build().Done();

return Build<TGenReadTable>(ctx, read.Pos())
.World(read.World())
.DataSource(read.DataSource())
.Table(table)
.Columns<TCoVoid>().Build()
.FilterPredicate(emptyPredicate)
.Done().Ptr();
// clang-format on
}

void FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig,
Expand Down Expand Up @@ -402,7 +427,7 @@ namespace NYql {
private:
const TGenericState::TPtr State_;

TMapType Results_;
TDescribeTableMap Results_;
NThreading::TFuture<void> AsyncFuture_;
};

Expand Down
Loading