Skip to content

Add type white-list for supported types on blockreader (#4268) #4836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions ydb/library/yql/providers/yt/common/yql_configuration.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#pragma once

#include <ydb/library/yql/public/udf/udf_data_type.h>

#include <util/system/types.h>
#include <util/datetime/base.h>
#include <util/generic/size_literals.h>
#include <util/generic/set.h>

namespace NYql {

Expand Down Expand Up @@ -55,6 +58,17 @@ constexpr bool DEFAULT_JOIN_COMMON_USE_MULTI_OUT = false;
constexpr bool DEFAULT_USE_RPC_READER_IN_DQ = false;
constexpr size_t DEFAULT_RPC_READER_INFLIGHT = 1;
constexpr TDuration DEFAULT_RPC_READER_TIMEOUT = TDuration::Seconds(120);
const TSet<TString> DEFAULT_BLOCK_READER_SUPPORTED_TYPES = {"pg", "tuple"};
const TSet<NUdf::EDataSlot> DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES =
{
NUdf::EDataSlot::Int8, NUdf::EDataSlot::Uint8,
NUdf::EDataSlot::Int16, NUdf::EDataSlot::Uint16,
NUdf::EDataSlot::Int32, NUdf::EDataSlot::Uint32,
NUdf::EDataSlot::Int64, NUdf::EDataSlot::Uint64,
NUdf::EDataSlot::Bool, NUdf::EDataSlot::Double,
NUdf::EDataSlot::String, NUdf::EDataSlot::Json,
NUdf::EDataSlot::Yson, NUdf::EDataSlot::Utf8
};

constexpr auto DEFAULT_SWITCH_MEMORY_LIMIT = 128_MB;

Expand Down
12 changes: 12 additions & 0 deletions ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/library/yql/providers/common/codec/yql_codec_type_flags.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/public/udf/udf_data_type.h>

#include <library/cpp/yson/node/node_io.h>

Expand Down Expand Up @@ -460,6 +461,17 @@ TYtConfiguration::TYtConfiguration()
REGISTER_SETTING(*this, UseRPCReaderInDQ);
REGISTER_SETTING(*this, DQRPCReaderInflight).Lower(1);
REGISTER_SETTING(*this, DQRPCReaderTimeout);
REGISTER_SETTING(*this, BlockReaderSupportedTypes);
REGISTER_SETTING(*this, BlockReaderSupportedDataTypes)
.Parser([](const TString& v) {
TSet<TString> vec;
StringSplitter(v).SplitBySet(",").AddTo(&vec);
TSet<NUdf::EDataSlot> res;
for (auto& s: vec) {
res.emplace(NUdf::GetDataSlot(s));
}
return res;
});
REGISTER_SETTING(*this, MaxCpuUsageToFuseMultiOuts).Lower(1.0);
REGISTER_SETTING(*this, MaxReplicationFactorToFuseMultiOuts).Lower(1.0);
REGISTER_SETTING(*this, ApplyStoredConstraints)
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/yt/common/yql_yt_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ struct TYtSettings {
NCommon::TConfSetting<bool, true> UseRPCReaderInDQ;
NCommon::TConfSetting<size_t, true> DQRPCReaderInflight;
NCommon::TConfSetting<TDuration, true> DQRPCReaderTimeout;
NCommon::TConfSetting<TSet<TString>, true> BlockReaderSupportedTypes;
NCommon::TConfSetting<TSet<NUdf::EDataSlot>, true> BlockReaderSupportedDataTypes;

// Optimizers
NCommon::TConfSetting<bool, true> _EnableDq;
Expand Down
113 changes: 112 additions & 1 deletion ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,108 @@ static const THashSet<TStringBuf> POOL_TREES_WHITELIST = {"physical", "cloud",

using namespace NNodes;

namespace {
void BlockReaderAddInfo(TExprContext& ctx, const TPosition& pos, const TString& msg) {
ctx.IssueManager.RaiseIssue(YqlIssue(pos, EYqlIssueCode::TIssuesIds_EIssueCode_INFO, "Can't use block reader: " + msg));
}

bool CheckBlockReaderSupportedTypes(const TSet<TString>& list, const TSet<NUdf::EDataSlot>& dataTypesSupported, const TStructExprType* types, TExprContext& ctx, const TPosition& pos) {
TSet<ETypeAnnotationKind> supported;
for (const auto &e: list) {
if (e == "pg") {
supported.insert(ETypeAnnotationKind::Pg);
} else if (e == "tuple") {
supported.emplace(ETypeAnnotationKind::Tuple);
} else if (e == "struct") {
supported.emplace(ETypeAnnotationKind::Struct);
} else if (e == "dict") {
supported.emplace(ETypeAnnotationKind::Dict);
} else if (e == "list") {
supported.emplace(ETypeAnnotationKind::List);
} else if (e == "variant") {
supported.emplace(ETypeAnnotationKind::Variant);
} else {
// Unknown type
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unknown type: " << e);
return false;
}
}
if (dataTypesSupported.size()) {
supported.emplace(ETypeAnnotationKind::Data);
}
auto checkType = [&] (const TTypeAnnotationNode* type) {
if (type->GetKind() == ETypeAnnotationKind::Data) {
if (!supported.contains(ETypeAnnotationKind::Data)) {
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported data types");
return false;
}
if (!dataTypesSupported.contains(type->Cast<TDataExprType>()->GetSlot())) {
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported data type: " << type->Cast<TDataExprType>()->GetSlot());
return false;
}
} else if (type->GetKind() == ETypeAnnotationKind::Pg) {
if (!supported.contains(ETypeAnnotationKind::Pg)) {
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported pg");
return false;
}
auto name = type->Cast<TPgExprType>()->GetName();
if (name == "float4" && !dataTypesSupported.contains(NUdf::EDataSlot::Float)) {
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "PgFloat4 unsupported yet since float is no supported");
return false;
}
} else {
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported annotation kind: " << type->GetKind());
return false;
}
return true;
};

TVector<const TTypeAnnotationNode*> stack;

for (auto sub: types->GetItems()) {
auto subT = sub->GetItemType();
stack.push_back(subT);
}
while (!stack.empty()) {
auto el = stack.back();
stack.pop_back();
if (el->GetKind() == ETypeAnnotationKind::Optional) {
stack.push_back(el->Cast<TOptionalExprType>()->GetItemType());
continue;
}
if (!supported.contains(el->GetKind())) {
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported " << el->GetKind());
return false;
}
if (el->GetKind() == ETypeAnnotationKind::Tuple) {
for (auto e: el->Cast<TTupleExprType>()->GetItems()) {
stack.push_back(e);
}
continue;
} else if (el->GetKind() == ETypeAnnotationKind::Struct) {
for (auto e: el->Cast<TStructExprType>()->GetItems()) {
stack.push_back(e->GetItemType());
}
continue;
} else if (el->GetKind() == ETypeAnnotationKind::List) {
stack.push_back(el->Cast<TListExprType>()->GetItemType());
continue;
} else if (el->GetKind() == ETypeAnnotationKind::Dict) {
stack.push_back(el->Cast<TDictExprType>()->GetKeyType());
stack.push_back(el->Cast<TDictExprType>()->GetPayloadType());
continue;
} else if (el->GetKind() == ETypeAnnotationKind::Variant) {
stack.push_back(el->Cast<TVariantExprType>()->GetUnderlyingType());
continue;
}
if (!checkType(el)) {
return false;
}
}
return true;
}
};

class TYtDqIntegration: public TDqIntegrationBase {
public:
TYtDqIntegration(TYtState* state)
Expand Down Expand Up @@ -375,25 +477,34 @@ class TYtDqIntegration: public TDqIntegrationBase {
if (!State_->Configuration->UseRPCReaderInDQ.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_USE_RPC_READER_IN_DQ)) {
return false;
}


auto supportedTypes = State_->Configuration->BlockReaderSupportedTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_TYPES);
auto supportedDataTypes = State_->Configuration->BlockReaderSupportedDataTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES);
const auto structType = GetSeqItemType(maybeRead.Raw()->GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back())->Cast<TStructExprType>();
if (!CheckBlockReaderSupportedTypes(supportedTypes, supportedDataTypes, structType, ctx, ctx.GetPosition(node.Pos()))) {
return false;
}

TVector<const TTypeAnnotationNode*> subTypeAnn(Reserve(structType->GetItems().size()));
for (const auto& type: structType->GetItems()) {
subTypeAnn.emplace_back(type->GetItemType());
}

if (!State_->Types->ArrowResolver) {
BlockReaderAddInfo(ctx, ctx.GetPosition(node.Pos()), "no arrow resolver provided");
return false;
}

if (State_->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(node.Pos()), subTypeAnn, ctx) != IArrowResolver::EStatus::OK) {
BlockReaderAddInfo(ctx, ctx.GetPosition(node.Pos()), "arrow resolver don't support these types");
return false;
}

const TYtSectionList& sectionList = wrap.Input().Cast<TYtReadTable>().Input();
for (size_t i = 0; i < sectionList.Size(); ++i) {
auto section = sectionList.Item(i);
if (!NYql::GetSettingAsColumnList(section.Settings().Ref(), EYtSettingType::SysColumns).empty()) {
BlockReaderAddInfo(ctx, ctx.GetPosition(node.Pos()), "system column");
return false;
}
}
Expand Down
Loading