Skip to content

Commit c08a157

Browse files
authored
Merge 8233675 into 514a0c4
2 parents 514a0c4 + 8233675 commit c08a157

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,7 @@ void TDqPqRdReadActor::AddMessageBatch(TRope&& messageBatch, NKikimr::NMiniKQL::
797797
*(itemPtr++) = parsedRow[*index];
798798
} else {
799799
// TODO: support metadata fields here
800-
*(itemPtr++) = NUdf::TUnboxedValue();
800+
*(itemPtr++) = NUdf::TUnboxedValuePod::Zero();
801801
}
802802
}
803803

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ def test_simple_not_null(self, kikimr, client):
146146

147147
query_id = start_yds_query(kikimr, client, sql)
148148
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
149-
time.sleep(10)
150149

151150
data = [
152151
'{"time": 101, "data": "hello1", "event": "event1"}',
@@ -167,6 +166,35 @@ def test_simple_not_null(self, kikimr, client):
167166
assert len(read_rules) == 0, read_rules
168167
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
169168

169+
@yq_v1
170+
def test_metadatafields(self, kikimr, client):
171+
client.create_yds_connection(
172+
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
173+
)
174+
self.init_topics("test_metadatafields")
175+
176+
# Its not completely clear why metadatafields appear in this request(
177+
sql = Rf'''
178+
PRAGMA FeatureR010="prototype";
179+
PRAGMA config.flags("TimeOrderRecoverDelay", "-10");
180+
PRAGMA config.flags("TimeOrderRecoverAhead", "10");
181+
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
182+
SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM {YDS_CONNECTION}.`{self.input_topic}`
183+
WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL))
184+
MATCH_RECOGNIZE(
185+
ORDER BY CAST(time as Timestamp)
186+
MEASURES LAST(A.time) as b_key
187+
PATTERN (A )
188+
DEFINE A as A.time > 4
189+
);'''
190+
191+
query_id = start_yds_query(kikimr, client, sql)
192+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
193+
194+
self.write_stream(['{"time": 100}', '{"time": 120}'])
195+
assert len(self.read_stream(1, topic_path=self.output_topic)) == 1
196+
stop_yds_query(client, query_id)
197+
170198
@yq_v1
171199
def test_simple_optional(self, kikimr, client):
172200
client.create_yds_connection(

0 commit comments

Comments
 (0)