Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/library/yql/core/issue/protos/issue_id.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ message TIssuesIds {
CORE_LEGACY_RANK_FOR_NULLABLE_KEYS = 1109;
CORE_LEGACY_REGEX_ENGINE = 1110;
CORE_ALIAS_SHADOWS_COLUMN = 1111;
CORE_LINEAGE_INTERNAL_ERROR = 1112;

// core errors
CORE_GC_NODES_LIMIT_EXCEEDED = 1500;
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/core/issue/yql_issue.txt
Original file line number Diff line number Diff line change
Expand Up @@ -651,3 +651,7 @@ ids {
code: YQL_UNTYPED_STRING_LITERALS
severity: S_WARNING
}
ids {
code: CORE_LINEAGE_INTERNAL_ERROR
severity: S_WARNING
}
26 changes: 22 additions & 4 deletions ydb/library/yql/core/services/yql_lineage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ namespace {

class TLineageScanner {
public:
TLineageScanner(const TExprNode& root, const TTypeAnnotationContext& ctx)
TLineageScanner(const TExprNode& root, const TTypeAnnotationContext& ctx, TExprContext& exprCtx)
: Root_(root)
, Ctx_(ctx)
, ExprCtx_(exprCtx)
{}

TString Process() {
Expand Down Expand Up @@ -209,7 +210,15 @@ class TLineageScanner {
return &lineage;
}

if (node.IsCallable({"Unordered", "UnorderedSubquery", "Right!", "Skip", "Take", "Sort", "AssumeSorted"})) {
if (node.IsCallable({
"Unordered",
"UnorderedSubquery",
"Right!",
"Skip",
"Take",
"Sort",
"AssumeSorted",
"SkipNullMembers"})) {
lineage = *CollectLineage(node.Head());
return &lineage;
} else if (node.IsCallable("ExtractMembers")) {
Expand All @@ -224,11 +233,19 @@ class TLineageScanner {
HandleWindow(lineage, node);
} else if (node.IsCallable("EquiJoin")) {
HandleEquiJoin(lineage, node);
} else {
Warning(node, TStringBuilder() << node.Content() << " is not supported");
}

return &lineage;
}

void Warning(const TExprNode& node, const TString& message) {
auto issue = TIssue(ExprCtx_.GetPosition(node.Pos()), message);
SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_LINEAGE_INTERNAL_ERROR, issue);
ExprCtx_.AddWarning(issue);
}

void HandleExtractMembers(TLineage& lineage, const TExprNode& node) {
auto innerLineage = *CollectLineage(node.Head());
if (innerLineage.Fields.Defined()) {
Expand Down Expand Up @@ -763,6 +780,7 @@ class TLineageScanner {
private:
const TExprNode& Root_;
const TTypeAnnotationContext& Ctx_;
TExprContext& ExprCtx_;
TNodeMap<IDataProvider*> Reads_, Writes_;
ui32 NextReadId_ = 0;
ui32 NextWriteId_ = 0;
Expand All @@ -773,8 +791,8 @@ class TLineageScanner {

}

TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx) {
TLineageScanner scanner(root, ctx);
TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx, TExprContext& exprCtx) {
TLineageScanner scanner(root, ctx, exprCtx);
return scanner.Process();
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/core/services/yql_lineage.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
namespace NYql {

struct TTypeAnnotationContext;
struct TExprContext;

TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx);
TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx, TExprContext& exprCtx);

}
3 changes: 1 addition & 2 deletions ydb/library/yql/core/services/yql_transform_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,8 @@ TTransformationPipeline& TTransformationPipeline::AddLineageOptimization(TMaybe<
Transformers_.push_back(TTransformStage(
CreateFunctorTransformer(
[typeCtx = TypeAnnotationContext_, &lineageOut](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
Y_UNUSED(ctx);
output = input;
lineageOut = CalculateLineage(*input, *typeCtx);
lineageOut = CalculateLineage(*input, *typeCtx, ctx);
return IGraphTransformer::TStatus::Ok;
}
),
Expand Down