@@ -285,6 +285,34 @@ def test_filter(self, kikimr, client):
285285 issues = str (client .describe_query (query_id ).result .query .transient_issue )
286286 assert "Row dispatcher will use the predicate: WHERE (`time` > 101" in issues , "Incorrect Issues: " + issues
287287
288+ @yq_v1
289+ def test_filter_missing_fields (self , kikimr , client ):
290+ client .create_yds_connection (
291+ YDS_CONNECTION , os .getenv ("YDB_DATABASE" ), os .getenv ("YDB_ENDPOINT" ), shared_reading = True
292+ )
293+ self .init_topics ("test_filter" )
294+
295+ sql = Rf'''
296+ INSERT INTO { YDS_CONNECTION } .`{ self .output_topic } `
297+ SELECT Cast(time as String) FROM { YDS_CONNECTION } .`{ self .input_topic } `
298+ WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String, event String NOT NULL))
299+ WHERE data = "";'''
300+
301+ query_id = start_yds_query (kikimr , client , sql )
302+ wait_actor_count (kikimr , "FQ_ROW_DISPATCHER_SESSION" , 1 )
303+
304+ data = [
305+ '{"time": 101, "event": "event1"}' ,
306+ '{"time": 102, "data": null, "event": "event2"}' ,
307+ ]
308+
309+ self .write_stream (data )
310+ expected = ['101' , '102' ]
311+ assert self .read_stream (len (expected ), topic_path = self .output_topic ) == expected
312+
313+ wait_actor_count (kikimr , "DQ_PQ_READ_ACTOR" , 1 )
314+ stop_yds_query (client , query_id )
315+
288316 @yq_v1
289317 def test_filter_use_unsupported_predicate (self , kikimr , client ):
290318 client .create_yds_connection (
0 commit comments