Skip to content

Commit d353cb0

Browse files
authored
YQ-3841 RD add column types validation (#11487)
1 parent f0860b5 commit d353cb0

File tree

2 files changed

+51
-9
lines changed

2 files changed

+51
-9
lines changed

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
144144
TParserInputType InputType;
145145
};
146146

147+
struct TFieldDescription {
148+
ui64 IndexInParserSchema = 0;
149+
TString Type;
150+
};
151+
147152
bool InflightReconnect = false;
148153
TDuration ReconnectPeriod;
149154
const TString TopicPath;
@@ -170,7 +175,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
170175
const ::NMonitoring::TDynamicCounterPtr Counters;
171176
TTopicSessionMetrics Metrics;
172177
TParserSchema ParserSchema;
173-
THashMap<TString, ui64> FieldsIndexes;
178+
THashMap<TString, TFieldDescription> FieldsIndexes;
174179
NYql::IPqGateway::TPtr PqGateway;
175180
TMaybe<TString> ConsumerName;
176181
ui64 RestartSessionByOffsets = 0;
@@ -686,14 +691,16 @@ void TTopicSession::SendData(TClientsInfo& info) {
686691
}
687692

688693
void TTopicSession::UpdateFieldsIds(TClientsInfo& info) {
689-
for (auto name : info.Settings.GetSource().GetColumns()) {
694+
const auto& source = info.Settings.GetSource();
695+
for (size_t i = 0; i < source.ColumnsSize(); ++i) {
696+
const auto& name = source.GetColumns().Get(i);
690697
auto it = FieldsIndexes.find(name);
691698
if (it == FieldsIndexes.end()) {
692699
auto nextIndex = FieldsIndexes.size();
693700
info.FieldsIds.push_back(nextIndex);
694-
FieldsIndexes[name] = nextIndex;
701+
FieldsIndexes[name] = {nextIndex, source.GetColumnTypes().Get(i)};
695702
} else {
696-
info.FieldsIds.push_back(it->second);
703+
info.FieldsIds.push_back(it->second.IndexInParserSchema);
697704
}
698705
}
699706
}
@@ -821,7 +828,7 @@ void TTopicSession::UpdateParserSchema(const TParserInputType& inputType) {
821828
ui64 offset = 0;
822829
for (const auto& [name, type]: inputType) {
823830
Y_ENSURE(FieldsIndexes.contains(name));
824-
ui64 index = FieldsIndexes[name];
831+
ui64 index = FieldsIndexes[name].IndexInParserSchema;
825832
ParserSchema.FieldsMap[index] = offset++;
826833
}
827834
ParserSchema.InputType = inputType;
@@ -950,13 +957,26 @@ bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr&
950957
SendSessionError(ev->Sender, "Internal error: such a client already exists");
951958
return false;
952959
}
953-
if (!Config.GetWithoutConsumer()
954-
&& ConsumerName
955-
&& ConsumerName != ev->Get()->Record.GetSource().GetConsumerName()) {
956-
LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << ev->Get()->Record.GetSource().GetConsumerName() << ", send error");
960+
961+
const auto& source = ev->Get()->Record.GetSource();
962+
if (!Config.GetWithoutConsumer() && ConsumerName && ConsumerName != source.GetConsumerName()) {
963+
LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << source.GetConsumerName() << ", send error");
957964
SendSessionError(ev->Sender, TStringBuilder() << "Use the same consumer in all queries via RD (current consumer " << ConsumerName << ")");
958965
return false;
959966
}
967+
968+
Y_ENSURE(source.ColumnsSize() == source.ColumnTypesSize());
969+
for (size_t i = 0; i < source.ColumnsSize(); ++i) {
970+
const auto& name = source.GetColumns().Get(i);
971+
const auto& type = source.GetColumnTypes().Get(i);
972+
const auto it = FieldsIndexes.find(name);
973+
if (it != FieldsIndexes.end() && it->second.Type != type) {
974+
LOG_ROW_DISPATCHER_INFO("Different column `" << name << "` type, expected " << it->second.Type << ", actual " << type << ", send error");
975+
SendSessionError(ev->Sender, TStringBuilder() << "Use the same column type in all queries via RD, current type for column `" << name << "` is " << it->second.Type << " (requested type is " << type <<")");
976+
return false;
977+
}
978+
}
979+
960980
return true;
961981
}
962982

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,28 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
435435
StopSession(ReadActorId1, source1);
436436
StopSession(ReadActorId2, source2);
437437
}
438+
439+
Y_UNIT_TEST_F(TwoSessionsWithDifferentColumnTypes, TFixture) {
440+
const TString topicName = "dif_types";
441+
PQCreateStream(topicName);
442+
Init(topicName);
443+
444+
auto source1 = BuildSource(topicName);
445+
source1.AddColumns("field1");
446+
source1.AddColumnTypes("[OptionalType; [DataType; String]]");
447+
StartSession(ReadActorId1, source1);
448+
449+
TString json1 = "{\"dt\":101,\"field1\":null,\"value\":\"value1\"}";
450+
PQWrite({ json1 }, topicName);
451+
ExpectNewDataArrived({ReadActorId1});
452+
ExpectMessageBatch(ReadActorId1, { json1 });
453+
454+
auto source2 = BuildSource(topicName);
455+
source2.AddColumns("field1");
456+
source2.AddColumnTypes("[DataType; String]");
457+
StartSession(ReadActorId2, source2);
458+
ExpectSessionError(ReadActorId2, "Use the same column type in all queries via RD, current type for column `field1` is [OptionalType; [DataType; String]] (requested type is [DataType; String])");
459+
}
438460
}
439461

440462
}

0 commit comments

Comments
 (0)