@@ -83,29 +83,36 @@ def wait_row_dispatcher_sensor_value(kikimr, sensor, expected_count, exact_match
8383class TestPqRowDispatcher (TestYdsBase ):
8484
8585 @yq_v1
86- def test_read_raw_format_without_row_dispatcher (self , kikimr , client ):
86+ def test_read_raw_format_with_row_dispatcher (self , kikimr , client ):
8787 client .create_yds_connection (
8888 YDS_CONNECTION , os .getenv ("YDB_DATABASE" ), os .getenv ("YDB_ENDPOINT" ), shared_reading = True
8989 )
9090 self .init_topics ("test_read_raw_format_without_row_dispatcher" , create_output = False )
91-
9291 output_topic = "pq_test_pq_read_write_output"
93-
9492 create_stream (output_topic , partitions_count = 1 )
9593 create_read_rule (output_topic , self .consumer_name )
9694
97- sql = Rf'''
98- INSERT INTO { YDS_CONNECTION } .`{ output_topic } `
99- SELECT * FROM { YDS_CONNECTION } .`{ self .input_topic } `;'''
95+ sql1 = Rf'''INSERT INTO { YDS_CONNECTION } .`{ output_topic } `
96+ SELECT * FROM { YDS_CONNECTION } .`{ self .input_topic } ` WITH (format=raw, SCHEMA (data String NOT NULL));'''
10097
101- query_id = start_yds_query (kikimr , client , sql )
98+ query_id = start_yds_query (kikimr , client , sql1 )
10299 data = ['{"time" = 101;}' , '{"time" = 102;}' ]
103100
104101 self .write_stream (data )
105- expected = data
106- assert self .read_stream (len (expected ), topic_path = output_topic ) == expected
102+ assert self .read_stream (len (data ), topic_path = output_topic ) == data
103+ wait_actor_count (kikimr , "FQ_ROW_DISPATCHER_SESSION" , 1 )
104+ stop_yds_query (client , query_id )
105+
106+ sql2 = Rf'''INSERT INTO { YDS_CONNECTION } .`{ output_topic } `
107+ SELECT * FROM { YDS_CONNECTION } .`{ self .input_topic } ` WITH (format=raw, SCHEMA (data String NOT NULL))
108+ WHERE data != "romashka";'''
107109
108- wait_actor_count (kikimr , "FQ_ROW_DISPATCHER_SESSION" , 0 )
110+ query_id = start_yds_query (kikimr , client , sql2 )
111+ data = ['{"time" = 103;}' , '{"time" = 104;}' ]
112+
113+ self .write_stream (data )
114+ assert self .read_stream (len (data ), topic_path = output_topic ) == data
115+ wait_actor_count (kikimr , "FQ_ROW_DISPATCHER_SESSION" , 1 )
109116 stop_yds_query (client , query_id )
110117
111118 @yq_v1
@@ -193,7 +200,7 @@ def test_scheme_error(self, kikimr, client):
193200
194201 client .wait_query_status (query_id , fq .QueryMeta .FAILED )
195202 issues = str (client .describe_query (query_id ).result .query .issue )
196- assert "Failed to unwrap empty optional " in issues , "Incorrect Issues: " + issues
203+ assert "Cannot parse JSON string " in issues , "Incorrect Issues: " + issues
197204
198205 wait_actor_count (kikimr , "DQ_PQ_READ_ACTOR" , 0 )
199206 wait_actor_count (kikimr , "FQ_ROW_DISPATCHER_SESSION" , 0 )
0 commit comments