|
| 1 | +from conftest import BaseTestSet |
| 2 | +from ydb.tests.olap.scenario.helpers import ( |
| 3 | + ScenarioTestHelper, |
| 4 | + TestContext, |
| 5 | + CreateTable, |
| 6 | +) |
| 7 | + |
| 8 | +from ydb import PrimitiveType |
| 9 | +from typing import List, Dict, Any |
| 10 | +from ydb.tests.olap.lib.utils import get_external_param |
| 11 | +import threading |
| 12 | + |
| 13 | + |
| 14 | +class TestInsert(BaseTestSet): |
| 15 | + schema_cnt = ( |
| 16 | + ScenarioTestHelper.Schema() |
| 17 | + .with_column(name="key", type=PrimitiveType.Int32, not_null=True) |
| 18 | + .with_column(name="c", type=PrimitiveType.Int64) |
| 19 | + .with_key_columns("key") |
| 20 | + ) |
| 21 | + |
| 22 | + schema_log = ( |
| 23 | + ScenarioTestHelper.Schema() |
| 24 | + .with_column(name="key", type=PrimitiveType.Int32, not_null=True) |
| 25 | + .with_key_columns("key") |
| 26 | + ) |
| 27 | + |
| 28 | + def _loop_upsert(self, ctx: TestContext, data: list): |
| 29 | + sth = ScenarioTestHelper(ctx) |
| 30 | + for batch in data: |
| 31 | + sth.bulk_upsert_data("log", self.schema_log, batch) |
| 32 | + |
| 33 | + def _loop_insert(self, ctx: TestContext, rows_count: int): |
| 34 | + sth = ScenarioTestHelper(ctx) |
| 35 | + log: str = sth.get_full_path("log") |
| 36 | + cnt: str = sth.get_full_path("cnt") |
| 37 | + for i in range(rows_count): |
| 38 | + sth.execute_query( |
| 39 | + f'$cnt = SELECT CAST(COUNT(*) AS INT64) from `{log}`; INSERT INTO `{cnt}` (key, c) values({i}, $cnt)' |
| 40 | + ) |
| 41 | + |
| 42 | + def scenario_read_data_during_bulk_upsert(self, ctx: TestContext): |
| 43 | + sth = ScenarioTestHelper(ctx) |
| 44 | + cnt_table_name: str = "cnt" |
| 45 | + log_table_name: str = "log" |
| 46 | + batches_count = int(get_external_param("batches_count", "10")) |
| 47 | + rows_count = int(get_external_param("rows_count", "1000")) |
| 48 | + inserts_count = int(get_external_param("inserts_count", "200")) |
| 49 | + sth.execute_scheme_query( |
| 50 | + CreateTable(cnt_table_name).with_schema(self.schema_cnt) |
| 51 | + ) |
| 52 | + sth.execute_scheme_query( |
| 53 | + CreateTable(log_table_name).with_schema(self.schema_log) |
| 54 | + ) |
| 55 | + data: List = [] |
| 56 | + for i in range(batches_count): |
| 57 | + batch: List[Dict[str, Any]] = [] |
| 58 | + for j in range(rows_count): |
| 59 | + batch.append({"key": j + rows_count * i}) |
| 60 | + data.append(batch) |
| 61 | + |
| 62 | + thread1 = threading.Thread(target=self._loop_upsert, args=[ctx, data]) |
| 63 | + thread2 = threading.Thread(target=self._loop_insert, args=[ctx, inserts_count]) |
| 64 | + |
| 65 | + thread1.start() |
| 66 | + thread2.start() |
| 67 | + |
| 68 | + thread2.join() |
| 69 | + thread1.join() |
| 70 | + |
| 71 | + rows: int = sth.get_table_rows_count(cnt_table_name) |
| 72 | + assert rows == inserts_count |
| 73 | + scan_result = sth.execute_scan_query( |
| 74 | + f"SELECT key, c FROM `{sth.get_full_path(cnt_table_name)}` ORDER BY key" |
| 75 | + ) |
| 76 | + for i in range(rows): |
| 77 | + if scan_result.result_set.rows[i]["key"] != i: |
| 78 | + assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}" |
| 79 | + |
| 80 | + rows: int = sth.get_table_rows_count(log_table_name) |
| 81 | + assert rows == rows_count * batches_count |
| 82 | + scan_result = sth.execute_scan_query( |
| 83 | + f"SELECT key FROM `{sth.get_full_path(log_table_name)}` ORDER BY key" |
| 84 | + ) |
| 85 | + for i in range(rows): |
| 86 | + if scan_result.result_set.rows[i]["key"] != i: |
| 87 | + assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}" |
0 commit comments