Skip to content

Commit 2371c48

Browse files
authored
Merge 812eddf into adbecd5
2 parents adbecd5 + 812eddf commit 2371c48

File tree

17 files changed

+437
-112
lines changed

17 files changed

+437
-112
lines changed
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
import logging
2+
from .base import ColumnFamilyTestBase
3+
from typing import Callable
4+
from ydb.tests.library.common.helpers import plain_or_under_sanitizer
5+
from ydb.tests.olap.common.thread_helper import TestThread, TestThreads
6+
from ydb.tests.olap.common.column_table_helper import ColumnTableHelper
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class TestAlterCompression(ColumnFamilyTestBase):
12+
class_name = "alter_compression"
13+
14+
@classmethod
15+
def setup_class(cls):
16+
super(TestAlterCompression, cls).setup_class()
17+
18+
def upsert_and_wait_portions(self, table: ColumnTableHelper, number_rows_for_insert: int, count_upsert: int):
19+
prev_number_rows: int = table.get_row_count()
20+
for _ in range(count_upsert):
21+
self.ydb_client.query(
22+
"""
23+
$row_count = %i;
24+
$prev_count = %i;
25+
$rows= ListMap(ListFromRange(0, $row_count), ($i) -> {
26+
return <|
27+
value: $i + $prev_count,
28+
value1: $i + $prev_count,
29+
|>;
30+
});
31+
UPSERT INTO `%s`
32+
SELECT * FROM AS_TABLE($rows);
33+
"""
34+
% (number_rows_for_insert, prev_number_rows, table.path)
35+
)
36+
prev_number_rows += number_rows_for_insert
37+
logger.info(
38+
f"{prev_number_rows} rows inserted in {table.path}. portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}"
39+
)
40+
assert table.get_row_count() == prev_number_rows
41+
42+
if not self.wait_for(
43+
lambda: len(table.get_portion_stat_by_tier()) != 0, plain_or_under_sanitizer(70, 140)
44+
):
45+
raise Exception("not all portions have been updated")
46+
47+
if not self.wait_for(
48+
lambda: table.get_portion_stat_by_tier()['__DEFAULT']['Rows'] == number_rows_for_insert * count_upsert, plain_or_under_sanitizer(70, 140)
49+
):
50+
raise Exception("not all portions have been updated")
51+
52+
def add_family_in_create(self, name: str, settings: str):
53+
return f"FAMILY {name} ({settings})"
54+
55+
def test_all_supported_compression(self):
56+
''' Implements https://github.com/ydb-platform/ydb/issues/13640 '''
57+
58+
single_upsert_rows_count: int = 10**5
59+
upsert_count: int = 10
60+
test_name: str = "all_supported_compression"
61+
test_dir: str = f"{self.ydb_client.database}/{self.class_name}/{test_name}"
62+
tables_path: list[str] = [
63+
f"{test_dir}/off_compression",
64+
f"{test_dir}/lz4_compression",
65+
f"{test_dir}/zstd_compression",
66+
]
67+
add_defaut_family: Callable[[str], str] = lambda settings: self.add_family_in_create(name='default', settings=settings)
68+
tables_family: list[str] = [
69+
add_defaut_family('COMPRESSION = "off"'),
70+
add_defaut_family('COMPRESSION = "lz4"'),
71+
add_defaut_family('COMPRESSION = "zstd"'),
72+
]
73+
74+
for i in range(2, 22):
75+
tables_path.append(f"{test_dir}/zstd_{i}_compression")
76+
tables_family.append(add_defaut_family(f'COMPRESSION = "zstd", COMPRESSION_LEVEL = {i}'))
77+
78+
assert len(tables_path) == len(tables_family)
79+
80+
tables: list[ColumnTableHelper] = []
81+
for table_path, table_family in zip(tables_path, tables_family):
82+
self.ydb_client.query(
83+
f"""
84+
CREATE TABLE `{table_path}` (
85+
value Uint64 NOT NULL,
86+
value1 Uint64,
87+
PRIMARY KEY(value),
88+
{table_family}
89+
)
90+
WITH (STORE = COLUMN)
91+
"""
92+
)
93+
logger.info(f"Table {table_path} created")
94+
tables.append(ColumnTableHelper(self.ydb_client, table_path))
95+
96+
assert len(tables) == len(tables_path)
97+
98+
tasks: TestThreads = TestThreads()
99+
for table in tables:
100+
tasks.append(TestThread(target=self.upsert_and_wait_portions, args=[table, single_upsert_rows_count, upsert_count]))
101+
102+
tasks.start_and_wait_all()
103+
104+
volumes_without_compression: tuple[int, int] = tables[0].get_volumes_column("value")
105+
for table in tables:
106+
assert table.get_portion_stat_by_tier()['__DEFAULT']['Rows'] == single_upsert_rows_count * upsert_count
107+
assert upsert_count * single_upsert_rows_count * 8 == volumes_without_compression[0]
108+
109+
for i in range(1, len(tables_path)):
110+
volumes: tuple[int, int] = tables[i].get_volumes_column("value")
111+
koef: float = volumes_without_compression[1] / volumes[1]
112+
logging.info(
113+
f"compression in `{tables[i].path}` {volumes_without_compression[1]} / {volumes[1]}: {koef}"
114+
)
115+
assert koef > 1
116+
117+
def test_availability_data(self):
118+
''' Implements https://github.com/ydb-platform/ydb/issues/13643 '''
119+
120+
single_upsert_rows_count: int = 10 ** 2
121+
upsert_rows_count: int = 10
122+
rows_count: int = 0
123+
test_name: str = "availability_data"
124+
test_dir: str = f"{self.ydb_client.database}/{self.class_name}/{test_name}"
125+
tables_path: str = f"{test_dir}/test_table"
126+
127+
tables_family: list[str] = [
128+
self.add_family_in_create(name = 'default', settings = 'COMPRESSION = "off"'),
129+
self.add_family_in_create(name = 'family_lz4', settings = 'COMPRESSION = "lz4"'),
130+
self.add_family_in_create(name = 'family_zstd', settings = 'COMPRESSION = "zstd"'),
131+
self.add_family_in_create(name = 'family_zstd_10', settings = 'COMPRESSION = "zstd", COMPRESSION_LEVEL = 10'),
132+
]
133+
134+
self.ydb_client.query(
135+
f"""
136+
CREATE TABLE `{tables_path}` (
137+
value Uint64 NOT NULL,
138+
value1 Uint64,
139+
PRIMARY KEY(value),
140+
{','.join(tables_family)}
141+
)
142+
WITH (STORE = COLUMN)
143+
"""
144+
)
145+
logger.info(f"Table {tables_path} created")
146+
test_table: ColumnTableHelper = ColumnTableHelper(self.ydb_client, tables_path)
147+
148+
def check_data(table: ColumnTableHelper, rows_cont: int):
149+
assert table.get_row_count() == rows_cont
150+
count_row: int = 0
151+
result_set = self.ydb_client.query(f"SELECT * FROM `{table.path}` ORDER BY `value`")
152+
for result in result_set:
153+
logging.info(result)
154+
for row in result.rows:
155+
assert row['value'] == count_row and row['value1'] == count_row
156+
count_row += 1
157+
assert count_row == rows_cont
158+
159+
self.upsert_and_wait_portions(test_table, single_upsert_rows_count, upsert_rows_count)
160+
rows_count += single_upsert_rows_count * upsert_rows_count
161+
check_data(table=test_table, rows_cont=rows_count)
162+
163+
self.ydb_client.query(f"ALTER TABLE `{tables_path}` ALTER COLUMN `value` SET FAMILY family_lz4")
164+
self.upsert_and_wait_portions(test_table, single_upsert_rows_count, upsert_rows_count)
165+
rows_count += single_upsert_rows_count * upsert_rows_count
166+
check_data(table=test_table, rows_cont=rows_count)
167+
168+
self.ydb_client.query(f"ALTER TABLE `{tables_path}` ALTER COLUMN `value1` SET FAMILY family_zstd")
169+
self.upsert_and_wait_portions(test_table, single_upsert_rows_count, upsert_rows_count)
170+
rows_count += single_upsert_rows_count * upsert_rows_count
171+
check_data(table=test_table, rows_cont=rows_count)
172+
173+
self.ydb_client.query(f"ALTER TABLE `{tables_path}` ALTER COLUMN `value` SET FAMILY family_zstd_10")
174+
self.ydb_client.query(f"ALTER TABLE `{tables_path}` ALTER COLUMN `value1` SET FAMILY family_zstd_10")
175+
self.upsert_and_wait_portions(test_table, single_upsert_rows_count, upsert_rows_count)
176+
rows_count += single_upsert_rows_count * upsert_rows_count
177+
check_data(table=test_table, rows_cont=rows_count)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import yatest.common
2+
import os
3+
import time
4+
import logging
5+
6+
from ydb.tests.library.harness.kikimr_runner import KiKiMR
7+
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
8+
from ydb.tests.olap.common.ydb_client import YdbClient
9+
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class ColumnFamilyTestBase(object):
15+
@classmethod
16+
def setup_class(cls):
17+
cls._setup_ydb()
18+
19+
@classmethod
20+
def teardown_class(cls):
21+
cls.ydb_client.stop()
22+
cls.cluster.stop()
23+
24+
@classmethod
25+
def _setup_ydb(cls):
26+
ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY"))
27+
logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8"))
28+
config = KikimrConfigGenerator(
29+
extra_feature_flags={"enable_olap_compression": True},
30+
column_shard_config={
31+
"lag_for_compaction_before_tierings_ms": 0,
32+
"compaction_actualization_lag_ms": 0,
33+
"optimizer_freshness_check_duration_ms": 0,
34+
"small_portion_detect_size_limit": 0,
35+
"max_read_staleness_ms": 5000,
36+
},
37+
)
38+
cls.cluster = KiKiMR(config)
39+
cls.cluster.start()
40+
node = cls.cluster.nodes[1]
41+
cls.ydb_client = YdbClient(database=f"/{config.domain_name}", endpoint=f"grpc://{node.host}:{node.port}")
42+
cls.ydb_client.wait_connection()
43+
44+
@staticmethod
45+
def wait_for(condition_func, timeout_seconds):
46+
t0 = time.time()
47+
while time.time() - t0 < timeout_seconds:
48+
if condition_func():
49+
return True
50+
time.sleep(1)
51+
return False
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
PY3TEST()
2+
ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
3+
4+
TEST_SRCS(
5+
base.py
6+
alter_compression.py
7+
)
8+
9+
SIZE(MEDIUM)
10+
11+
PEERDIR(
12+
ydb/tests/library
13+
ydb/public/sdk/python
14+
ydb/public/sdk/python/enable_v3_new_behavior
15+
ydb/tests/olap/scenario/helpers
16+
ydb/tests/olap/common
17+
)
18+
19+
DEPENDS(
20+
ydb/apps/ydbd
21+
)
22+
23+
END()
24+
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
RECURSE(
2+
compression
3+
)
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import logging
2+
import time
3+
from .ydb_client import YdbClient
4+
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
class ColumnTableHelper:
10+
def __init__(self, ydb_client: YdbClient, path: str):
11+
self.ydb_client = ydb_client
12+
self.path = path
13+
14+
def get_row_count(self) -> int:
15+
count_row: int = 0
16+
result_set = self.ydb_client.query(f"SELECT COUNT(*) AS Rows FROM `{self.path}`")
17+
for result in result_set:
18+
for row in result.rows:
19+
count_row += row["Rows"]
20+
return count_row
21+
22+
def get_portion_count(self) -> int:
23+
return self.ydb_client.query(f"select count(*) as Rows from `{self.path}/.sys/primary_index_portion_stats`")[0].rows[0]["Rows"]
24+
25+
def get_portion_stat_by_tier(self) -> dict[str, dict[str, int]]:
26+
results = self.ydb_client.query(
27+
f"select TierName, sum(Rows) as Rows, count(*) as Portions from `{self.path}/.sys/primary_index_portion_stats` group by TierName"
28+
)
29+
return {
30+
row["TierName"]: {"Rows": row["Rows"], "Portions": row["Portions"]}
31+
for result_set in results
32+
for row in result_set.rows
33+
}
34+
35+
def get_blob_stat_by_tier(self) -> dict[str, (int, int)]:
36+
stmt = f"""
37+
select TierName, count(*) as Portions, sum(BlobSize) as BlobSize, sum(BlobCount) as BlobCount from (
38+
select TabletId, PortionId, TierName, sum(BlobRangeSize) as BlobSize, count(*) as BlobCount from `{self.path}/.sys/primary_index_stats` group by TabletId, PortionId, TierName
39+
) group by TierName
40+
"""
41+
results = self.ydb_client.query(stmt)
42+
return {
43+
row["TierName"]: {"Portions": row["Portions"], "BlobSize": row["BlobSize"], "BlobCount": row["BlobCount"]}
44+
for result_set in results
45+
for row in result_set.rows
46+
}
47+
48+
def set_fast_compaction(self):
49+
self.ydb_client.query(
50+
f"""
51+
ALTER OBJECT `{self.path}` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=`
52+
{{"levels" : [{{"class_name" : "Zero", "portions_live_duration" : "5s", "expected_blobs_size" : 1000000000000, "portions_count_available" : 2}},
53+
{{"class_name" : "Zero"}}]}}`);
54+
"""
55+
)
56+
57+
def _coollect_volumes_column(self, column_name: str) -> tuple[int, int]:
58+
query = f'SELECT * FROM `{self.path}/.sys/primary_index_stats` WHERE Activity == 1 AND EntityName = \"{column_name}\"'
59+
result_set = self.ydb_client.query(query)
60+
raw_bytes, bytes = 0, 0
61+
for result in result_set:
62+
for rows in result.rows:
63+
raw_bytes += rows["RawBytes"]
64+
bytes += rows["BlobRangeSize"]
65+
return raw_bytes, bytes
66+
67+
def get_volumes_column(self, column_name: str) -> tuple[int, int]:
68+
pred_raw_bytes, pred_bytes = 0, 0
69+
raw_bytes, bytes = self._coollect_volumes_column(column_name)
70+
while pred_raw_bytes != raw_bytes and pred_bytes != bytes:
71+
pred_raw_bytes = raw_bytes
72+
pred_bytes = bytes
73+
time.sleep(10)
74+
raw_bytes, bytes = self._coollect_volumes_column(column_name)
75+
logging.info(f"Table `{self.path}`, volumes `{column_name}` ({raw_bytes}, {bytes})")
76+
return raw_bytes, bytes
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import threading
2+
3+
4+
class TestThread(threading.Thread):
5+
def run(self) -> None:
6+
self.exc = None
7+
try:
8+
self.ret = self._target(*self._args, **self._kwargs)
9+
except BaseException as e:
10+
self.exc = e
11+
12+
def join(self, timeout=None):
13+
super().join(timeout)
14+
if self.exc:
15+
raise self.exc
16+
return self.ret
17+
18+
class TestThreads():
19+
def __init__(self):
20+
self.threads: list[TestThread] = list()
21+
22+
def append(self, thread: TestThread) -> int:
23+
self.threads.append(thread)
24+
return len(self.threads) - 1
25+
26+
def start_thread(self, thread_index: int):
27+
self.threads[thread_index].start()
28+
29+
def start_all(self) -> None:
30+
for thread in self.threads:
31+
thread.start()
32+
33+
def join_thread(self, thread_index: int):
34+
self.threads[thread_index].join()
35+
36+
def join_all(self, timeout=None) -> None:
37+
for thread in self.threads:
38+
thread.join(timeout=timeout)
39+
40+
def start_and_wait_all(self):
41+
self.start_all()
42+
self.join_all()

ydb/tests/olap/common/ya.make

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
PY3_LIBRARY()
2+
3+
PY_SRCS (
4+
column_table_helper.py
5+
thread_helper.py
6+
ydb_client.py
7+
)
8+
9+
PEERDIR(
10+
ydb/public/sdk/python
11+
)
12+
13+
END()

0 commit comments

Comments
 (0)