77from helpers .thread_helper import TestThread
88from ydb import PrimitiveType
99from typing import List , Dict , Any
10- from ydb .tests .olap .lib .utils import get_external_param
10+ from ydb .tests .olap .lib .utils import get_external_param , external_param_is_true
1111
1212
1313class TestInsert (BaseTestSet ):
@@ -24,19 +24,26 @@ class TestInsert(BaseTestSet):
2424 .with_key_columns ("key" )
2525 )
2626
27- def _loop_upsert (self , ctx : TestContext , data : list ):
27+ def _loop_upsert (self , ctx : TestContext , data : list , table : str ):
2828 sth = ScenarioTestHelper (ctx )
29+ table_name = "log" + table
2930 for batch in data :
30- sth .bulk_upsert_data ("log" , self .schema_log , batch )
31+ sth .bulk_upsert_data (table_name , self .schema_log , batch )
3132
32- def _loop_insert (self , ctx : TestContext , rows_count : int ):
33+ def _loop_insert (self , ctx : TestContext , rows_count : int , table : str , ignore_read_errors : bool ):
3334 sth = ScenarioTestHelper (ctx )
34- log : str = sth .get_full_path ("log" )
35- cnt : str = sth .get_full_path ("cnt" )
35+ log : str = sth .get_full_path ("log" + table )
36+ cnt : str = sth .get_full_path ("cnt" + table )
3637 for i in range (rows_count ):
37- sth .execute_query (
38- f'$cnt = SELECT CAST(COUNT(*) AS INT64) from `{ log } `; INSERT INTO `{ cnt } ` (key, c) values({ i } , $cnt)'
39- )
38+ try :
39+ sth .execute_query (
40+ yql = f'$cnt = SELECT CAST(COUNT(*) AS INT64) from `{ log } `; INSERT INTO `{ cnt } ` (key, c) values({ i } , $cnt)' , retries = 10
41+ )
42+ except Exception :
43+ if ignore_read_errors :
44+ pass
45+ else :
46+ raise
4047
4148 def scenario_read_data_during_bulk_upsert (self , ctx : TestContext ):
4249 sth = ScenarioTestHelper (ctx )
@@ -45,42 +52,59 @@ def scenario_read_data_during_bulk_upsert(self, ctx: TestContext):
4552 batches_count = int (get_external_param ("batches_count" , "10" ))
4653 rows_count = int (get_external_param ("rows_count" , "1000" ))
4754 inserts_count = int (get_external_param ("inserts_count" , "200" ))
48- sth .execute_scheme_query (
49- CreateTable (cnt_table_name ).with_schema (self .schema_cnt )
50- )
51- sth .execute_scheme_query (
52- CreateTable (log_table_name ).with_schema (self .schema_log )
53- )
55+ tables_count = int (get_external_param ("tables_count" , "1" ))
56+ ignore_read_errors = external_param_is_true ("ignore_read_errors" )
57+ for table in range (tables_count ):
58+ sth .execute_scheme_query (
59+ CreateTable (cnt_table_name + str (table )).with_schema (self .schema_cnt )
60+ )
61+ for table in range (tables_count ):
62+ sth .execute_scheme_query (
63+ CreateTable (log_table_name + str (table )).with_schema (self .schema_log )
64+ )
5465 data : List = []
5566 for i in range (batches_count ):
5667 batch : List [Dict [str , Any ]] = []
5768 for j in range (rows_count ):
5869 batch .append ({"key" : j + rows_count * i })
5970 data .append (batch )
6071
61- thread1 = TestThread (target = self ._loop_upsert , args = [ctx , data ])
62- thread2 = TestThread (target = self ._loop_insert , args = [ctx , inserts_count ])
72+ thread1 = []
73+ thread2 = []
74+ for table in range (tables_count ):
75+ thread1 .append (TestThread (target = self ._loop_upsert , args = [ctx , data , str (table )]))
76+ for table in range (tables_count ):
77+ thread2 .append (TestThread (target = self ._loop_insert , args = [ctx , inserts_count , str (table ), ignore_read_errors ]))
78+
79+ for thread in thread1 :
80+ thread .start ()
6381
64- thread1 . start ()
65- thread2 .start ()
82+ for thread in thread2 :
83+ thread .start ()
6684
67- thread2 . join ()
68- thread1 .join ()
85+ for thread in thread2 :
86+ thread .join ()
6987
70- rows : int = sth .get_table_rows_count (cnt_table_name )
71- assert rows == inserts_count
72- scan_result = sth .execute_scan_query (
73- f"SELECT key, c FROM `{ sth .get_full_path (cnt_table_name )} ` ORDER BY key"
74- )
75- for i in range (rows ):
76- if scan_result .result_set .rows [i ]["key" ] != i :
77- assert False , f"{ i } ?= { scan_result .result_set .rows [i ]['key' ]} "
88+ for thread in thread1 :
89+ thread .join ()
7890
79- rows : int = sth .get_table_rows_count (log_table_name )
80- assert rows == rows_count * batches_count
81- scan_result = sth .execute_scan_query (
82- f"SELECT key FROM `{ sth .get_full_path (log_table_name )} ` ORDER BY key"
83- )
84- for i in range (rows ):
85- if scan_result .result_set .rows [i ]["key" ] != i :
86- assert False , f"{ i } ?= { scan_result .result_set .rows [i ]['key' ]} "
91+ for table in range (tables_count ):
92+ cnt_table_name0 = cnt_table_name + str (table )
93+ rows : int = sth .get_table_rows_count (cnt_table_name0 )
94+ assert rows == inserts_count
95+ scan_result = sth .execute_scan_query (
96+ f"SELECT key, c FROM `{ sth .get_full_path (cnt_table_name0 )} ` ORDER BY key"
97+ )
98+ for i in range (rows ):
99+ if scan_result .result_set .rows [i ]["key" ] != i :
100+ assert False , f"{ i } ?= { scan_result .result_set .rows [i ]['key' ]} "
101+
102+ log_table_name0 = log_table_name + str (table )
103+ rows : int = sth .get_table_rows_count (log_table_name0 )
104+ assert rows == rows_count * batches_count
105+ scan_result = sth .execute_scan_query (
106+ f"SELECT key FROM `{ sth .get_full_path (log_table_name0 )} ` ORDER BY key"
107+ )
108+ for i in range (rows ):
109+ if scan_result .result_set .rows [i ]["key" ] != i :
110+ assert False , f"{ i } ?= { scan_result .result_set .rows [i ]['key' ]} "
0 commit comments