Skip to content

Commit d202fd8

Browse files
authored
Merge 7da354e into 1787dd7
2 parents 1787dd7 + 7da354e commit d202fd8

File tree

2 files changed

+148
-1
lines changed

2 files changed

+148
-1
lines changed

ydb/core/kqp/gateway/behaviour/streaming_query/queries.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2523,7 +2523,10 @@ class TCreateStreamingQueryActor final : public TRequestHandlerWithSync<TCreateS
25232523
CHECK_STATUS(validator.SaveRequired(ESqlSettings::QUERY_TEXT_FEATURE, &TPropertyValidator::ValidateNotEmpty));
25242524
CHECK_STATUS(validator.SaveDefault(EName::Run, "true", &TPropertyValidator::ValidateBool));
25252525
CHECK_STATUS(validator.SaveDefault(EName::ResourcePool, NResourcePool::DEFAULT_POOL_ID));
2526-
CHECK_STATUS(validator.Save(EName::QueryTextRevision, ToString(1)));
2526+
CHECK_STATUS(validator.Save(
2527+
EName::QueryTextRevision,
2528+
ToString(SchemeInfo ? TStreamingQuerySettings().FromProto(SchemeInfo->Properties).QueryTextRevision + 1 : 1)
2529+
));
25272530

25282531
return validator.Finish();
25292532
}

ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1892,6 +1892,111 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
18921892
ReadTopicMessage(outputTopicName, "test message");
18931893
}
18941894

1895+
Y_UNIT_TEST_F(StreamingQueryTextChangeWithCreateOrReplace, TStreamingTestFixture) {
1896+
constexpr char inputTopicName[] = "createAndReplaceStreamingQueryInputTopic";
1897+
constexpr char outputTopicName[] = "createAndReplaceStreamingQueryOutputTopic";
1898+
CreateTopic(inputTopicName);
1899+
CreateTopic(outputTopicName);
1900+
1901+
constexpr char pqSourceName[] = "sourceName";
1902+
CreatePqSource(pqSourceName);
1903+
1904+
constexpr char queryName[] = "streamingQuery";
1905+
ExecQuery(fmt::format(R"(
1906+
CREATE STREAMING QUERY `{query_name}` AS
1907+
DO BEGIN
1908+
INSERT INTO `{pq_source}`.`{output_topic}`
1909+
SELECT key || value FROM `{pq_source}`.`{input_topic}` WITH (
1910+
FORMAT = "json_each_row",
1911+
SCHEMA (
1912+
key String NOT NULL,
1913+
value String NOT NULL
1914+
)
1915+
)
1916+
END DO;)",
1917+
"query_name"_a = queryName,
1918+
"pq_source"_a = pqSourceName,
1919+
"input_topic"_a = inputTopicName,
1920+
"output_topic"_a = outputTopicName
1921+
));
1922+
1923+
CheckScriptExecutionsCount(1, 1);
1924+
Sleep(TDuration::Seconds(1));
1925+
1926+
WriteTopicMessage(inputTopicName, R"({"key": "key1", "value": "value1"})");
1927+
ReadTopicMessages(outputTopicName, {"key1value1"});
1928+
1929+
ExecQuery(fmt::format(R"(
1930+
CREATE OR REPLACE STREAMING QUERY `{query_name}` AS
1931+
DO BEGIN
1932+
INSERT INTO `{pq_source}`.`{output_topic}`
1933+
SELECT value || key FROM `{pq_source}`.`{input_topic}` WITH (
1934+
FORMAT = "json_each_row",
1935+
SCHEMA (
1936+
key String NOT NULL,
1937+
value String NOT NULL
1938+
)
1939+
)
1940+
END DO;)",
1941+
"query_name"_a = queryName,
1942+
"pq_source"_a = pqSourceName,
1943+
"input_topic"_a = inputTopicName,
1944+
"output_topic"_a = outputTopicName
1945+
));
1946+
1947+
CheckScriptExecutionsCount(2, 1);
1948+
Sleep(TDuration::Seconds(1));
1949+
1950+
WriteTopicMessage(inputTopicName, R"({"key": "key2", "value": "value2"})");
1951+
ReadTopicMessages(outputTopicName, {"key1value1", "value2key2"});
1952+
}
1953+
1954+
Y_UNIT_TEST_F(StreamingQueryCreateOrReplaceFailure, TStreamingTestFixture) {
1955+
constexpr char inputTopicName[] = "createOrReplaceStreamingQueryFailInputTopic";
1956+
constexpr char outputTopicName[] = "createOrReplaceStreamingQueryFailOutputTopic";
1957+
CreateTopic(inputTopicName);
1958+
CreateTopic(outputTopicName);
1959+
1960+
constexpr char pqSourceName[] = "sourceName";
1961+
CreatePqSource(pqSourceName);
1962+
1963+
constexpr char queryName[] = "streamingQuery";
1964+
ExecQuery(fmt::format(R"(
1965+
CREATE STREAMING QUERY `{query_name}` AS
1966+
DO BEGIN
1967+
INSERT INTO `{pq_source}`.`{output_topic}`
1968+
SELECT * FROM `{pq_source}`.`{input_topic}`
1969+
END DO;)",
1970+
"query_name"_a = queryName,
1971+
"pq_source"_a = pqSourceName,
1972+
"input_topic"_a = inputTopicName,
1973+
"output_topic"_a = outputTopicName
1974+
));
1975+
1976+
CheckScriptExecutionsCount(1, 1);
1977+
Sleep(TDuration::Seconds(1));
1978+
1979+
WriteTopicMessage(inputTopicName, "key1value1");
1980+
ReadTopicMessages(outputTopicName, {"key1value1"});
1981+
1982+
ExecQuery(fmt::format(R"(
1983+
CREATE OR REPLACE STREAMING QUERY `{query_name}` AS
1984+
DO BEGIN
1985+
PRAGMA ydb.OverridePlanner = @@ [
1986+
{{ "tx": 0, "stage": 10, "tasks": 1 }}
1987+
] @@;
1988+
INSERT INTO `{pq_source}`.`{output_topic}`
1989+
SELECT * FROM `{pq_source}`.`{input_topic}`
1990+
END DO;)",
1991+
"query_name"_a = queryName,
1992+
"pq_source"_a = pqSourceName,
1993+
"input_topic"_a = inputTopicName,
1994+
"output_topic"_a = outputTopicName
1995+
), EStatus::GENERIC_ERROR, "Invalid override planner settings");
1996+
1997+
CheckScriptExecutionsCount(2, 0);
1998+
}
1999+
18952000
Y_UNIT_TEST_F(StreamingQueryWithSolomonInsert, TStreamingTestFixture) {
18962001
const auto pqGateway = SetupMockPqGateway();
18972002

@@ -2638,6 +2743,45 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
26382743
ReadTopicMessage(info.OutputTopicName, "B-2025-08-25T00:00:00.000000Z-1", readDisposition);
26392744
}
26402745

2746+
Y_UNIT_TEST_F(OffsetsRecoveryOnQueryTextChangeCreateOrReplace, TStreamingTestFixture) {
2747+
const auto info = SetupCheckpointRecoveryTest(*this);
2748+
2749+
WriteTopicMessages(info.InputTopicName, {
2750+
R"({"time": "2025-08-24T00:00:00.000000Z", "event": "A"})",
2751+
R"({"time": "2025-08-25T00:00:00.000000Z", "event": "A"})",
2752+
});
2753+
ReadTopicMessage(info.OutputTopicName, "A-2025-08-24T00:00:00.000000Z-1");
2754+
2755+
ExecQuery(fmt::format(R"(
2756+
CREATE OR REPLACE STREAMING QUERY `{query_name}` WITH (
2757+
RUN = FALSE
2758+
) AS
2759+
DO BEGIN
2760+
/* some comment */
2761+
{text}
2762+
END DO;)",
2763+
"query_name"_a = info.QueryName,
2764+
"text"_a = info.QueryText
2765+
));
2766+
CheckScriptExecutionsCount(1, 0);
2767+
2768+
Sleep(TDuration::Seconds(1));
2769+
WriteTopicMessage(info.InputTopicName, R"({"time": "2025-08-25T00:00:00.000000Z", "event": "B"})");
2770+
const auto readDisposition = TInstant::Now();
2771+
2772+
ExecQuery(fmt::format(R"(
2773+
ALTER STREAMING QUERY `{query_name}` SET (
2774+
RUN = TRUE
2775+
);)",
2776+
"query_name"_a = info.QueryName
2777+
));
2778+
CheckScriptExecutionsCount(2, 1);
2779+
Sleep(TDuration::Seconds(1));
2780+
2781+
WriteTopicMessage(info.InputTopicName, R"({"time": "2025-08-26T00:00:00.000000Z", "event": "A"})");
2782+
ReadTopicMessage(info.OutputTopicName, "B-2025-08-25T00:00:00.000000Z-1", readDisposition);
2783+
}
2784+
26412785
Y_UNIT_TEST_F(OffsetsRecoveryOnQueryTextChangeWithFail, TStreamingTestFixture) {
26422786
const auto info = SetupCheckpointRecoveryTest(*this);
26432787

0 commit comments

Comments
 (0)