Skip to content

Make it possible to return skiff from dq #1765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 162 additions & 28 deletions ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "yql_dq_exectransformer.h"

#include <ydb/library/yql/providers/dq/provider/yql_dq_datasource.h>
#include <ydb/library/yql/providers/dq/provider/yql_dq_state.h>

Expand Down Expand Up @@ -400,12 +402,24 @@ struct TDqsFinalPipelineConfigurator : public IPipelineConfigurator {
TDqStatePtr State_;
};

class TSimpleSkiffConverter : public ISkiffConverter {
public:
TString ConvertNodeToSkiff(const TDqStatePtr /*state*/, const IDataProvider::TFillSettings& /*fillSettings*/, const NYT::TNode& /*rowSpec*/, const NYT::TNode& /*item*/) override {
Y_ABORT("not implemented");
}

TYtType ParseYTType(const TExprNode& /*node*/, TExprContext& /*ctx*/, const TMaybe<NYql::TColumnOrder>& /*columns*/) override {
Y_ABORT("not implemented");
}
};

class TDqExecTransformer: public TExecTransformerBase, TCounters
{
public:
TDqExecTransformer(const TDqStatePtr& state)
TDqExecTransformer(const TDqStatePtr& state, const ISkiffConverterPtr& skiffConverter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second constructor argument looks foreign. But I don't know how to do it better.

: State(state)
, ExecState(MakeIntrusive<TExecState>())
, SkiffConverter(skiffConverter)
{
AddHandler({TStringBuf("Result")}, RequireNone(), Hndl(&TDqExecTransformer::HandleResult));
AddHandler({TStringBuf("Pull")}, RequireNone(), Hndl(&TDqExecTransformer::HandlePull));
Expand Down Expand Up @@ -994,7 +1008,17 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters

FlushStatisticsToState();

return WrapFutureCallback<false>(future, [localRun, startTime, type, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
TString skiffType;
NYT::TNode rowSpec;
if (fillSettings.Format == IDataProvider::EResultFormat::Skiff) {
auto parsedYtType = SkiffConverter->ParseYTType(result.Input().Ref(), ctx, columns);

type = parsedYtType.Type;
rowSpec = parsedYtType.RowSpec;
skiffType = parsedYtType.SkiffType;
}

return WrapFutureCallback<false>(future, [localRun, startTime, type, rowSpec, skiffType, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State, skiffConverter = SkiffConverter](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
YQL_CLOG(DEBUG, ProviderDq) << state->SessionId << " WrapFutureCallback";

auto duration = TInstant::Now() - startTime;
Expand Down Expand Up @@ -1036,6 +1060,20 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
TStringStream out;
NYson::TYsonWriter writer((IOutputStream*)&out);
writer.OnBeginMap();

if (skiffType) {
writer.OnKeyedItem("SkiffType");
writer.OnRaw(skiffType, ::NYson::EYsonType::Node);

writer.OnKeyedItem("Columns");
writer.OnBeginList();
for (auto& column: columns) {
writer.OnListItem();
writer.OnStringScalar(column);
}
writer.OnEndList();
}

if (type) {
writer.OnKeyedItem("Type");
writer.OnRaw(type);
Expand All @@ -1054,21 +1092,34 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
if (truncated && item.IsList()) {
ui64 bytes = 0;
ui64 rows = 0;
writer.OnBeginList();
for (auto& node : item.AsList()) {
raw = NYT::NodeToYsonString(node);
bytes += raw.size();
rows += 1;
writer.OnListItem();
writer.OnRaw(raw);
if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit) {
switch (fillSettings.Format) {
case IDataProvider::EResultFormat::Yson: {
writer.OnBeginList();
for (auto& node : item.AsList()) {
raw = NYT::NodeToYsonString(node);
bytes += raw.size();
rows += 1;
writer.OnListItem();
writer.OnRaw(raw);
if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit) {
break;
}
if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite) {
break;
}
}
writer.OnEndList();
break;
}
if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite) {
case IDataProvider::EResultFormat::Skiff: {
writer.OnStringScalar(skiffConverter->ConvertNodeToSkiff(state, fillSettings, rowSpec, item));
break;
}
default: {
YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format;
}
}
writer.OnEndList();

if (enableFullResultWrite) {
writer.OnKeyedItem("Ref");
writer.OnBeginList();
Expand All @@ -1081,11 +1132,35 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
writer.OnKeyedItem("Truncated");
writer.OnBooleanScalar(true);
} else if (truncated) {
writer.OnRaw("[]");
switch (fillSettings.Format) {
case IDataProvider::EResultFormat::Yson: {
writer.OnRaw("[]");
break;
}
case IDataProvider::EResultFormat::Skiff: {
writer.OnStringScalar("");
break;
}
default: {
YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format;
}
}
writer.OnKeyedItem("Truncated");
writer.OnBooleanScalar(true);
} else {
writer.OnRaw(raw);
switch (fillSettings.Format) {
case IDataProvider::EResultFormat::Yson: {
writer.OnRaw(raw);
break;
}
case IDataProvider::EResultFormat::Skiff: {
writer.OnStringScalar(skiffConverter->ConvertNodeToSkiff(state, fillSettings, rowSpec, item));
break;
}
default: {
YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format;
}
}
}

if (rowsCount) {
Expand Down Expand Up @@ -1373,9 +1448,19 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
});
executionPlanner.Destroy();

TString skiffType;
NYT::TNode rowSpec;
if (fillSettings.Format == IDataProvider::EResultFormat::Skiff) {
auto parsedYtType = SkiffConverter->ParseYTType(pull.Input().Ref(), ctx, columns);

type = parsedYtType.Type;
rowSpec = parsedYtType.RowSpec;
skiffType = parsedYtType.SkiffType;
}

int level = 0;
// TODO: remove copy-paste
return WrapFutureCallback<false>(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
return WrapFutureCallback<false>(future, [settings, startTime, localRun, type, rowSpec, skiffType, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State, skiffConverter = SkiffConverter](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
auto duration = TInstant::Now() - startTime;
YQL_CLOG(INFO, ProviderDq) << "Execution Pull complete, duration: " << duration;
if (state->Metrics) {
Expand Down Expand Up @@ -1406,8 +1491,26 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
input->SetState(TExprNode::EState::ExecutionComplete);

TStringStream out;
NYson::TYsonWriter writer((IOutputStream*)&out, NCommon::GetYsonFormat(fillSettings), ::NYson::EYsonType::Node, false);

IDataProvider::TFillSettings ysonFormatSettings;
ysonFormatSettings.FormatDetails = fillSettings.FormatDetails;
ysonFormatSettings.Format = IDataProvider::EResultFormat::Yson;
NYson::TYsonWriter writer((IOutputStream*)&out, NCommon::GetYsonFormat(ysonFormatSettings), ::NYson::EYsonType::Node, false);
writer.OnBeginMap();

if (skiffType) {
writer.OnKeyedItem("SkiffType");
writer.OnRaw(skiffType, ::NYson::EYsonType::Node);

writer.OnKeyedItem("Columns");
writer.OnBeginList();
for (auto& column: columns) {
writer.OnListItem();
writer.OnStringScalar(column);
}
writer.OnEndList();
}

if (type) {
writer.OnKeyedItem("Type");
writer.OnRaw(type);
Expand Down Expand Up @@ -1447,21 +1550,35 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
// TODO:
ui64 bytes = 0;
ui64 rows = 0;
writer.OnBeginList();
for (auto& node : item.AsList()) {
raw = NYT::NodeToYsonString(node);
bytes += raw.size();
rows += 1;
writer.OnListItem();
writer.OnRaw(raw);
if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit) {
switch (fillSettings.Format) {
case IDataProvider::EResultFormat::Yson: {
writer.OnBeginList();

for (auto& node : item.AsList()) {
raw = NYT::NodeToYsonString(node);
bytes += raw.size();
rows += 1;
writer.OnListItem();
writer.OnRaw(raw);

if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit) {
break;
}
if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite) {
break;
}
}
writer.OnEndList();
break;
}
if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite) {
case IDataProvider::EResultFormat::Skiff: {
writer.OnStringScalar(skiffConverter->ConvertNodeToSkiff(state, fillSettings, rowSpec, item));
break;
}
default: {
YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format;
}
}
writer.OnEndList();

if (enableFullResultWrite) {
writer.OnKeyedItem("Ref");
Expand All @@ -1476,7 +1593,19 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
writer.OnKeyedItem("Truncated");
writer.OnBooleanScalar(true);
} else {
writer.OnRaw(raw);
switch (fillSettings.Format) {
case IDataProvider::EResultFormat::Yson: {
writer.OnRaw(raw);
break;
}
case IDataProvider::EResultFormat::Skiff: {
writer.OnStringScalar(skiffConverter->ConvertNodeToSkiff(state, fillSettings, rowSpec, item));
break;
}
default: {
YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format;
}
}
}

if (rowsCount) {
Expand Down Expand Up @@ -1923,6 +2052,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
private:
TDqStatePtr State;
TExecStatePtr ExecState;
ISkiffConverterPtr SkiffConverter;
mutable THashMap<TString, TFileLinkPtr> FileLinks;
mutable THashMap<TString, TString> ModulesMapping;

Expand All @@ -1932,7 +2062,11 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
}

IGraphTransformer* CreateDqExecTransformer(const TDqStatePtr& state) {
return new TDqExecTransformer(state);
return new TDqExecTransformer(state, MakeIntrusive<TSimpleSkiffConverter>());
}

TExecTransformerFactory CreateDqExecTransformerFactory(const ISkiffConverterPtr& skiffConverter) {
return [skiffConverter] (const TDqStatePtr& state) { return new TDqExecTransformer(state, skiffConverter); };
}

} // namespace NYql
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
#pragma once

#include <ydb/library/yql/core/yql_data_provider.h>
#include <ydb/library/yql/core/yql_type_annotation.h>

#include <util/generic/ptr.h>

namespace NYql {
struct TDqState;
using TDqStatePtr = TIntrusivePtr<TDqState>;

class ISkiffConverter : public TThrRefBase {
public:
struct TYtType {
TString Type;
TString SkiffType;
NYT::TNode RowSpec;
};

virtual TString ConvertNodeToSkiff(const TDqStatePtr state, const IDataProvider::TFillSettings& fillSettings, const NYT::TNode& rowSpec, const NYT::TNode& item) = 0;
virtual TYtType ParseYTType(const TExprNode& node, TExprContext& ctx, const TMaybe<NYql::TColumnOrder>& columns) = 0;
};
using ISkiffConverterPtr = TIntrusivePtr<ISkiffConverter>;

IGraphTransformer* CreateDqExecTransformer(const TDqStatePtr& state);

using TExecTransformerFactory = std::function<IGraphTransformer*(const TDqStatePtr& state)>;
TExecTransformerFactory CreateDqExecTransformerFactory(const ISkiffConverterPtr& skiffConverter);
} // namespace NYql