Skip to content

Commit 56a6e23

Browse files
committed
Add type white-list for supported types on blockreader
1 parent f5945a4 commit 56a6e23

File tree

4 files changed

+128
-0
lines changed

4 files changed

+128
-0
lines changed

ydb/library/yql/providers/yt/common/yql_configuration.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <util/system/types.h>
44
#include <util/datetime/base.h>
55
#include <util/generic/size_literals.h>
6+
#include <util/generic/set.h>
67

78
namespace NYql {
89

@@ -55,6 +56,7 @@ constexpr bool DEFAULT_JOIN_COMMON_USE_MULTI_OUT = false;
5556
constexpr bool DEFAULT_USE_RPC_READER_IN_DQ = false;
5657
constexpr size_t DEFAULT_RPC_READER_INFLIGHT = 1;
5758
constexpr TDuration DEFAULT_RPC_READER_TIMEOUT = TDuration::Seconds(120);
59+
const TSet<TString> DEFAULT_BLOCK_READER_SUPPORTED_TYPES = {"pg","int8", "uint8", "int16", "uint16", "int32", "uint32", "int64", "uint64", "string", "yson", "json", "bool", "double", "tuple"};
5860

5961
constexpr auto DEFAULT_SWITCH_MEMORY_LIMIT = 128_MB;
6062

ydb/library/yql/providers/yt/common/yql_yt_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ TYtConfiguration::TYtConfiguration()
460460
REGISTER_SETTING(*this, UseRPCReaderInDQ);
461461
REGISTER_SETTING(*this, DQRPCReaderInflight).Lower(1);
462462
REGISTER_SETTING(*this, DQRPCReaderTimeout);
463+
REGISTER_SETTING(*this, BlockReaderSupportedTypes);
463464
REGISTER_SETTING(*this, MaxCpuUsageToFuseMultiOuts).Lower(1.0);
464465
REGISTER_SETTING(*this, MaxReplicationFactorToFuseMultiOuts).Lower(1.0);
465466
REGISTER_SETTING(*this, ApplyStoredConstraints)

ydb/library/yql/providers/yt/common/yql_yt_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ struct TYtSettings {
196196
NCommon::TConfSetting<bool, true> UseRPCReaderInDQ;
197197
NCommon::TConfSetting<size_t, true> DQRPCReaderInflight;
198198
NCommon::TConfSetting<TDuration, true> DQRPCReaderTimeout;
199+
NCommon::TConfSetting<TSet<TString>, true> BlockReaderSupportedTypes;
199200

200201
// Optimizers
201202
NCommon::TConfSetting<bool, true> _EnableDq;

ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,124 @@ static const THashSet<TStringBuf> POOL_TREES_WHITELIST = {"physical", "cloud",
3939

4040
using namespace NNodes;
4141

42+
namespace {
43+
void AddInfo(TExprContext& ctx, const TString& msg) {
44+
TIssue info("Can't use block reader: " + msg);
45+
info.Severity = TSeverityIds::S_INFO;
46+
ctx.IssueManager.RaiseIssue(info);
47+
}
48+
49+
bool CheckSupportedTypes(const TSet<TString>& list, const TStructExprType* types, TExprContext& ctx) {
50+
TSet<ETypeAnnotationKind> supported;
51+
TSet<NUdf::EDataSlot> dataTypesSupported;
52+
for (const auto &e: list) {
53+
if (e == "pg") {
54+
supported.insert(ETypeAnnotationKind::Pg);
55+
} else if (e == "int8") {
56+
supported.emplace(ETypeAnnotationKind::Data);
57+
dataTypesSupported.emplace(NUdf::EDataSlot::Int8);
58+
} else if (e == "uint8") {
59+
supported.emplace(ETypeAnnotationKind::Data);
60+
dataTypesSupported.emplace(NUdf::EDataSlot::Uint8);
61+
} else if (e == "int16") {
62+
supported.emplace(ETypeAnnotationKind::Data);
63+
dataTypesSupported.emplace(NUdf::EDataSlot::Int16);
64+
} else if (e == "uint16") {
65+
supported.emplace(ETypeAnnotationKind::Data);
66+
dataTypesSupported.emplace(NUdf::EDataSlot::Uint16);
67+
} else if (e == "int32") {
68+
supported.emplace(ETypeAnnotationKind::Data);
69+
dataTypesSupported.emplace(NUdf::EDataSlot::Int32);
70+
} else if (e == "uint32") {
71+
supported.emplace(ETypeAnnotationKind::Data);
72+
dataTypesSupported.emplace(NUdf::EDataSlot::Uint32);
73+
} else if (e == "int64") {
74+
supported.emplace(ETypeAnnotationKind::Data);
75+
dataTypesSupported.emplace(NUdf::EDataSlot::Int64);
76+
} else if (e == "uint64") {
77+
supported.emplace(ETypeAnnotationKind::Data);
78+
dataTypesSupported.emplace(NUdf::EDataSlot::Uint64);
79+
} else if (e == "double") {
80+
supported.emplace(ETypeAnnotationKind::Data);
81+
dataTypesSupported.emplace(NUdf::EDataSlot::Double);
82+
} else if (e == "bool") {
83+
supported.emplace(ETypeAnnotationKind::Data);
84+
dataTypesSupported.emplace(NUdf::EDataSlot::Bool);
85+
} else if (e == "string") {
86+
supported.emplace(ETypeAnnotationKind::Data);
87+
dataTypesSupported.emplace(NUdf::EDataSlot::String);
88+
} else if (e == "yson") {
89+
supported.emplace(ETypeAnnotationKind::Data);
90+
dataTypesSupported.emplace(NUdf::EDataSlot::Yson);
91+
} else if (e == "json") {
92+
supported.emplace(ETypeAnnotationKind::Data);
93+
dataTypesSupported.emplace(NUdf::EDataSlot::Json);
94+
} else if (e == "tuple") {
95+
supported.emplace(ETypeAnnotationKind::Tuple);
96+
} else {
97+
// Unknown type
98+
AddInfo(ctx, TStringBuilder() << "unknown type: " << e);
99+
return false;
100+
}
101+
}
102+
auto checkType = [&] (const TTypeAnnotationNode* type) {
103+
if (type->GetKind() == ETypeAnnotationKind::Data) {
104+
if (!supported.contains(ETypeAnnotationKind::Data)) {
105+
AddInfo(ctx, TStringBuilder() << "unsupported data types");
106+
return false;
107+
}
108+
if (!dataTypesSupported.contains(type->Cast<TDataExprType>()->GetSlot())) {
109+
AddInfo(ctx, TStringBuilder() << "unsupported data type: " << type->Cast<TDataExprType>()->GetSlot());
110+
return false;
111+
}
112+
} else if (type->GetKind() == ETypeAnnotationKind::Pg) {
113+
if (!supported.contains(ETypeAnnotationKind::Pg)) {
114+
AddInfo(ctx, TStringBuilder() << "unsupported pg");
115+
return false;
116+
}
117+
} else {
118+
AddInfo(ctx, TStringBuilder() << "unsupported annotation kind: " << type->GetKind());
119+
return false;
120+
}
121+
return true;
122+
};
123+
124+
for (auto sub: types->GetItems()) {
125+
auto subT = sub->GetItemType();
126+
while (subT->GetKind() == ETypeAnnotationKind::Optional) {
127+
subT = subT->Cast<TOptionalExprType>()->GetItemType();
128+
}
129+
if (subT->GetKind() == ETypeAnnotationKind::Tuple) {
130+
if (!supported.contains(ETypeAnnotationKind::Tuple)) {
131+
AddInfo(ctx, TStringBuilder() << "unsupported tuples");
132+
return false;
133+
}
134+
TVector<const TTypeAnnotationNode*> stack;
135+
stack.push_back(subT);
136+
while (!stack.empty()) {
137+
auto el = stack.back();
138+
stack.pop_back();
139+
if (el->GetKind() == ETypeAnnotationKind::Tuple) {
140+
for (auto e: el->Cast<TTupleExprType>()->GetItems()) {
141+
while (e->GetKind() == ETypeAnnotationKind::Optional) {
142+
e = e->Cast<TOptionalExprType>()->GetItemType();
143+
}
144+
stack.push_back(e);
145+
}
146+
continue;
147+
}
148+
if (!checkType(el)) {
149+
return false;
150+
}
151+
}
152+
} else if (!checkType(subT)) {
153+
return false;
154+
}
155+
}
156+
return true;
157+
}
158+
};
159+
42160
class TYtDqIntegration: public TDqIntegrationBase {
43161
public:
44162
TYtDqIntegration(TYtState* state)
@@ -375,10 +493,16 @@ class TYtDqIntegration: public TDqIntegrationBase {
375493
if (!State_->Configuration->UseRPCReaderInDQ.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_USE_RPC_READER_IN_DQ)) {
376494
return false;
377495
}
496+
497+
auto supportedTypes = State_->Configuration->BlockReaderSupportedTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_TYPES);
378498

379499
const auto structType = GetSeqItemType(maybeRead.Raw()->GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back())->Cast<TStructExprType>();
500+
if (!CheckSupportedTypes(supportedTypes, structType, ctx)) {
501+
return false;
502+
}
380503
TVector<const TTypeAnnotationNode*> subTypeAnn(Reserve(structType->GetItems().size()));
381504
for (const auto& type: structType->GetItems()) {
505+
//if (type->GetKind()
382506
subTypeAnn.emplace_back(type->GetItemType());
383507
}
384508

0 commit comments

Comments
 (0)