Skip to content

Commit 9bea29c

Browse files
authored
Merge 1ff9882 into 971d7f8
2 parents 971d7f8 + 1ff9882 commit 9bea29c

File tree

4 files changed

+148
-1
lines changed

4 files changed

+148
-1
lines changed

ydb/library/yql/providers/yt/common/yql_configuration.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <util/system/types.h>
44
#include <util/datetime/base.h>
55
#include <util/generic/size_literals.h>
6+
#include <util/generic/set.h>
67

78
namespace NYql {
89

@@ -55,6 +56,8 @@ constexpr bool DEFAULT_JOIN_COMMON_USE_MULTI_OUT = false;
5556
constexpr bool DEFAULT_USE_RPC_READER_IN_DQ = false;
5657
constexpr size_t DEFAULT_RPC_READER_INFLIGHT = 1;
5758
constexpr TDuration DEFAULT_RPC_READER_TIMEOUT = TDuration::Seconds(120);
59+
const TSet<TString> DEFAULT_BLOCK_READER_SUPPORTED_TYPES = {"pg", "tuple"};
60+
const TSet<TString> DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES = {"Int8", "Uint8", "Int16", "Uint16", "Int32", "Uint32", "Int64", "Uint64", "String", "Yson", "Json", "Bool", "Double"};
5861

5962
constexpr auto DEFAULT_SWITCH_MEMORY_LIMIT = 128_MB;
6063

ydb/library/yql/providers/yt/common/yql_yt_settings.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,8 @@ TYtConfiguration::TYtConfiguration()
460460
REGISTER_SETTING(*this, UseRPCReaderInDQ);
461461
REGISTER_SETTING(*this, DQRPCReaderInflight).Lower(1);
462462
REGISTER_SETTING(*this, DQRPCReaderTimeout);
463+
REGISTER_SETTING(*this, BlockReaderSupportedTypes);
464+
REGISTER_SETTING(*this, BlockReaderSupportedDataTypes);
463465
REGISTER_SETTING(*this, MaxCpuUsageToFuseMultiOuts).Lower(1.0);
464466
REGISTER_SETTING(*this, MaxReplicationFactorToFuseMultiOuts).Lower(1.0);
465467
REGISTER_SETTING(*this, ApplyStoredConstraints)

ydb/library/yql/providers/yt/common/yql_yt_settings.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ struct TYtSettings {
196196
NCommon::TConfSetting<bool, true> UseRPCReaderInDQ;
197197
NCommon::TConfSetting<size_t, true> DQRPCReaderInflight;
198198
NCommon::TConfSetting<TDuration, true> DQRPCReaderTimeout;
199+
NCommon::TConfSetting<TSet<TString>, true> BlockReaderSupportedTypes;
200+
NCommon::TConfSetting<TSet<TString>, true> BlockReaderSupportedDataTypes;
199201

200202
// Optimizers
201203
NCommon::TConfSetting<bool, true> _EnableDq;

ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,141 @@ static const THashSet<TStringBuf> POOL_TREES_WHITELIST = {"physical", "cloud",
3939

4040
using namespace NNodes;
4141

42+
namespace {
43+
void AddInfo(TExprContext& ctx, const TString& msg) {
44+
TIssue info("Can't use block reader: " + msg);
45+
info.Severity = TSeverityIds::S_INFO;
46+
ctx.IssueManager.RaiseIssue(info);
47+
}
48+
49+
bool CheckSupportedTypes(const TSet<TString>& list, const TSet<TString>& dataTList, const TStructExprType* types, TExprContext& ctx) {
50+
TSet<ETypeAnnotationKind> supported;
51+
TSet<NUdf::EDataSlot> dataTypesSupported;
52+
for (const auto &e: list) {
53+
if (e == "pg") {
54+
supported.insert(ETypeAnnotationKind::Pg);
55+
} else if (e == "tuple") {
56+
supported.emplace(ETypeAnnotationKind::Tuple);
57+
} else if (e == "struct") {
58+
supported.emplace(ETypeAnnotationKind::Struct);
59+
} else if (e == "dict") {
60+
supported.emplace(ETypeAnnotationKind::Dict);
61+
} else if (e == "list") {
62+
supported.emplace(ETypeAnnotationKind::List);
63+
} else if (e == "variant") {
64+
supported.emplace(ETypeAnnotationKind::Variant);
65+
} else {
66+
// Unknown type
67+
AddInfo(ctx, TStringBuilder() << "unknown type: " << e);
68+
return false;
69+
}
70+
}
71+
if (dataTList.size()) {
72+
supported.emplace(ETypeAnnotationKind::Data);
73+
}
74+
for (const auto &e: dataTList) {
75+
dataTypesSupported.emplace(NUdf::GetDataSlot(e));
76+
}
77+
auto checkType = [&] (const TTypeAnnotationNode* type) {
78+
if (type->GetKind() == ETypeAnnotationKind::Data) {
79+
if (!supported.contains(ETypeAnnotationKind::Data)) {
80+
AddInfo(ctx, TStringBuilder() << "unsupported data types");
81+
return false;
82+
}
83+
if (!dataTypesSupported.contains(type->Cast<TDataExprType>()->GetSlot())) {
84+
AddInfo(ctx, TStringBuilder() << "unsupported data type: " << type->Cast<TDataExprType>()->GetSlot());
85+
return false;
86+
}
87+
} else if (type->GetKind() == ETypeAnnotationKind::Pg) {
88+
if (!supported.contains(ETypeAnnotationKind::Pg)) {
89+
AddInfo(ctx, TStringBuilder() << "unsupported pg");
90+
return false;
91+
}
92+
} else {
93+
AddInfo(ctx, TStringBuilder() << "unsupported annotation kind: " << type->GetKind());
94+
return false;
95+
}
96+
return true;
97+
};
98+
99+
for (auto sub: types->GetItems()) {
100+
auto subT = sub->GetItemType();
101+
while (subT->GetKind() == ETypeAnnotationKind::Optional) {
102+
subT = subT->Cast<TOptionalExprType>()->GetItemType();
103+
}
104+
if (subT->GetKind() == ETypeAnnotationKind::Tuple || subT->GetKind() == ETypeAnnotationKind::Struct
105+
|| subT->GetKind() == ETypeAnnotationKind::Dict || subT->GetKind() == ETypeAnnotationKind::Variant
106+
|| subT->GetKind() == ETypeAnnotationKind::List) {
107+
if (!supported.contains(subT->GetKind())) {
108+
AddInfo(ctx, TStringBuilder() << "unsupported " << subT->GetKind());
109+
return false;
110+
}
111+
TVector<const TTypeAnnotationNode*> stack;
112+
stack.push_back(subT);
113+
while (!stack.empty()) {
114+
auto el = stack.back();
115+
stack.pop_back();
116+
if (!supported.contains(el->GetKind())) {
117+
AddInfo(ctx, TStringBuilder() << "unsupported " << el->GetKind());
118+
return false;
119+
}
120+
if (el->GetKind() == ETypeAnnotationKind::Tuple) {
121+
for (auto e: el->Cast<TTupleExprType>()->GetItems()) {
122+
while (e->GetKind() == ETypeAnnotationKind::Optional) {
123+
e = e->Cast<TOptionalExprType>()->GetItemType();
124+
}
125+
stack.push_back(e);
126+
}
127+
continue;
128+
} else if (el->GetKind() == ETypeAnnotationKind::Struct) {
129+
for (auto e: el->Cast<TStructExprType>()->GetItems()) {
130+
const TTypeAnnotationNode* c = e->GetItemType();
131+
while (c->GetKind() == ETypeAnnotationKind::Optional) {
132+
c = c->Cast<TOptionalExprType>()->GetItemType();
133+
}
134+
stack.push_back(c);
135+
}
136+
continue;
137+
138+
} else if (el->GetKind() == ETypeAnnotationKind::List) {
139+
const TTypeAnnotationNode* c = el->Cast<TListExprType>()->GetItemType();
140+
while (c->GetKind() == ETypeAnnotationKind::Optional) {
141+
c = c->Cast<TOptionalExprType>()->GetItemType();
142+
}
143+
stack.push_back(c);
144+
continue;
145+
} else if (el->GetKind() == ETypeAnnotationKind::Dict) {
146+
const TTypeAnnotationNode* c = el->Cast<TDictExprType>()->GetKeyType();
147+
while (c->GetKind() == ETypeAnnotationKind::Optional) {
148+
c = c->Cast<TOptionalExprType>()->GetItemType();
149+
}
150+
stack.push_back(c);
151+
c = el->Cast<TDictExprType>()->GetPayloadType();
152+
while (c->GetKind() == ETypeAnnotationKind::Optional) {
153+
c = c->Cast<TOptionalExprType>()->GetItemType();
154+
}
155+
stack.push_back(c);
156+
continue;
157+
} else if (el->GetKind() == ETypeAnnotationKind::Variant) {
158+
const TTypeAnnotationNode* c = el->Cast<TVariantExprType>()->GetUnderlyingType();
159+
while (c->GetKind() == ETypeAnnotationKind::Optional) {
160+
c = c->Cast<TOptionalExprType>()->GetItemType();
161+
}
162+
stack.push_back(c);
163+
continue;
164+
}
165+
if (!checkType(el)) {
166+
return false;
167+
}
168+
}
169+
} else if (!checkType(subT)) {
170+
return false;
171+
}
172+
}
173+
return true;
174+
}
175+
};
176+
42177
class TYtDqIntegration: public TDqIntegrationBase {
43178
public:
44179
TYtDqIntegration(TYtState* state)
@@ -375,8 +510,13 @@ class TYtDqIntegration: public TDqIntegrationBase {
375510
if (!State_->Configuration->UseRPCReaderInDQ.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_USE_RPC_READER_IN_DQ)) {
376511
return false;
377512
}
378-
513+
514+
auto supportedTypes = State_->Configuration->BlockReaderSupportedTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_TYPES);
515+
auto supportedDataTypes = State_->Configuration->BlockReaderSupportedDataTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES);
379516
const auto structType = GetSeqItemType(maybeRead.Raw()->GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back())->Cast<TStructExprType>();
517+
if (!CheckSupportedTypes(supportedTypes, supportedDataTypes, structType, ctx)) {
518+
return false;
519+
}
380520
TVector<const TTypeAnnotationNode*> subTypeAnn(Reserve(structType->GetItems().size()));
381521
for (const auto& type: structType->GetItems()) {
382522
subTypeAnn.emplace_back(type->GetItemType());

0 commit comments

Comments
 (0)