Skip to content

Commit 5607227

Browse files
authored
Merge be06b15 into bf4bc95
2 parents bf4bc95 + be06b15 commit 5607227

File tree

4 files changed

+360
-0
lines changed

4 files changed

+360
-0
lines changed

ydb/tests/olap/ttl_tiering/base.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
import yatest.common
2+
import os
3+
import time
4+
import ydb
5+
import logging
6+
import boto3
7+
import requests
8+
from library.recipes import common as recipes_common
9+
10+
from ydb.tests.library.harness.kikimr_runner import KiKiMR
11+
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
12+
from ydb.tests.library.common.types import Erasure
13+
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class S3Client:
19+
def __init__(self, endpoint, region, key_id, key_secret):
20+
self.endpoint = endpoint
21+
self.region = region
22+
self.key_id = key_id
23+
self.key_secret = key_secret
24+
25+
session = boto3.session.Session()
26+
self.s3 = session.resource(
27+
service_name="s3",
28+
aws_access_key_id=key_id,
29+
aws_secret_access_key=key_secret,
30+
region_name=region,
31+
endpoint_url=endpoint
32+
)
33+
self.client = session.client(
34+
service_name="s3",
35+
aws_access_key_id=key_id,
36+
aws_secret_access_key=key_secret,
37+
region_name=region,
38+
endpoint_url=endpoint
39+
)
40+
41+
def create_bucket(self, name: str):
42+
self.client.create_bucket(Bucket=name)
43+
44+
def get_bucket_stat(self, bucket_name: str) -> (int, int):
45+
bucket = self.s3.Bucket(bucket_name)
46+
count = 0
47+
size = 0
48+
for obj in bucket.objects.all():
49+
count += 1
50+
size += obj.size
51+
return (count, size)
52+
53+
54+
class YdbClient:
55+
def __init__(self, endpoint, database):
56+
self.driver = ydb.Driver(endpoint=endpoint, database=database, oauth=None)
57+
self.database = database
58+
self.session_pool = ydb.QuerySessionPool(self.driver)
59+
60+
def stop(self):
61+
self.session_pool.stop()
62+
self.driver.stop()
63+
64+
def wait_connection(self, timeout=5):
65+
self.driver.wait(timeout, fail_fast=True)
66+
67+
def query(self, statement):
68+
return self.session_pool.execute_with_retries(statement)
69+
70+
71+
class ColumnTableHelper:
72+
def __init__(self, ydb_client: YdbClient, path: str):
73+
self.ydb_client = ydb_client
74+
self.path = path
75+
76+
def get_row_count(self) -> int:
77+
return self.ydb_client.query(f"select count(*) as Rows from `{self.path}`")[0].rows[0]["Rows"]
78+
79+
def get_portion_count(self) -> int:
80+
return self.ydb_client.query(f"select count(*) as Rows from `{self.path}/.sys/primary_index_portion_stats`")[0].rows[0]["Rows"]
81+
82+
def get_portion_stat_by_tier(self) -> dict[str, dict[str, int]]:
83+
results = self.ydb_client.query(f"select TierName, sum(Rows) as Rows, count(*) as Portions from `{self.path}/.sys/primary_index_portion_stats` group by TierName")
84+
return {row["TierName"]: {"Rows": row["Rows"], "Portions": row["Portions"]} for result_set in results for row in result_set.rows}
85+
86+
def get_blob_stat_by_tier(self) -> dict[str, (int, int)]:
87+
stmt = f"""
88+
select TierName, count(*) as Portions, sum(BlobSize) as BlobSize, sum(BlobCount) as BlobCount from (
89+
select TabletId, PortionId, TierName, sum(BlobRangeSize) as BlobSize, count(*) as BlobCount from `{self.path}/.sys/primary_index_stats` group by TabletId, PortionId, TierName
90+
) group by TierName
91+
"""
92+
results = self.ydb_client.query(stmt)
93+
return {row["TierName"]: {"Portions": row["Portions"], "BlobSize": row["BlobSize"], "BlobCount": row["BlobCount"]} for result_set in results for row in result_set.rows}
94+
95+
def get_stat(self):
96+
pass
97+
98+
99+
class TllTieringTestBase(object):
100+
@classmethod
101+
def setup_class(cls):
102+
cls._setup_ydb()
103+
cls._setup_s3()
104+
105+
@classmethod
106+
def teardown_class(cls):
107+
cls.ydb_client.stop()
108+
cls.cluster.stop()
109+
recipes_common.stop_daemon(cls.s3_pid)
110+
111+
@classmethod
112+
def _setup_ydb(cls):
113+
ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY", "ydb/apps/ydbd/ydbd"))
114+
logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8"))
115+
config = KikimrConfigGenerator(
116+
erasure=Erasure.MIRROR_3_DC,
117+
extra_feature_flags={
118+
"enable_external_data_sources": True,
119+
"enable_tiering_in_column_shard": True
120+
},
121+
column_shard_config={
122+
"lag_for_compaction_before_tierings_ms": 0,
123+
"compaction_actualization_lag_ms": 0,
124+
"optimizer_freshness_check_duration_ms": 0,
125+
"small_portion_detect_size_limit": 0,
126+
}
127+
)
128+
cls.cluster = KiKiMR(config)
129+
cls.cluster.start()
130+
node = cls.cluster.nodes[1]
131+
cls.ydb_client = YdbClient(database=f"/{config.domain_name}", endpoint=f"grpc://{node.host}:{node.port}")
132+
cls.ydb_client.wait_connection()
133+
134+
@classmethod
135+
def _setup_s3(cls):
136+
s3_pid_file = "s3.pid"
137+
moto_server_path = os.environ["MOTO_SERVER_PATH"]
138+
139+
port_manager = yatest.common.network.PortManager()
140+
port = port_manager.get_port()
141+
endpoint = f"http://localhost:{port}"
142+
command = [yatest.common.binary_path(moto_server_path), "s3", "--port", str(port)]
143+
144+
def is_s3_ready():
145+
try:
146+
response = requests.get(endpoint)
147+
response.raise_for_status()
148+
return True
149+
except requests.RequestException as err:
150+
logging.debug(err)
151+
return False
152+
153+
recipes_common.start_daemon(
154+
command=command, environment=None, is_alive_check=is_s3_ready, pid_file_name=s3_pid_file
155+
)
156+
157+
with open(s3_pid_file, 'r') as f:
158+
cls.s3_pid = int(f.read())
159+
160+
cls.s3_client = S3Client(endpoint, "us-east-1", "fake_key_id", "fake_key_secret")
161+
162+
@staticmethod
163+
def wait_for(condition_func, timeout_seconds):
164+
t0 = time.time()
165+
while time.time() - t0 < timeout_seconds:
166+
if condition_func():
167+
return True
168+
time.sleep(1)
169+
return False
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import time
2+
import logging
3+
from .base import TllTieringTestBase, ColumnTableHelper
4+
5+
logger = logging.getLogger(__name__)
6+
7+
8+
class TestDeleteS3Ttl(TllTieringTestBase):
9+
''' Implements https://github.com/ydb-platform/ydb/issues/13467 '''
10+
11+
test_name = "delete_s3_ttl"
12+
row_count = 10 ** 7
13+
single_upsert_row_count = 10 ** 6
14+
cold_bucket = "cold"
15+
frozen_bucket = "frozen"
16+
days_to_cool = 1000
17+
days_to_froze = 3000
18+
19+
@classmethod
20+
def setup_class(cls):
21+
super(TestDeleteS3Ttl, cls).setup_class()
22+
cls.s3_client.create_bucket(cls.cold_bucket)
23+
cls.s3_client.create_bucket(cls.frozen_bucket)
24+
25+
def get_row_count_by_date(self, table_path: str, past_days: int) -> int:
26+
return self.ydb_client.query(f"SELECT count(*) as Rows from `{table_path}` WHERE ts < CurrentUtcTimestamp() - DateTime::IntervalFromDays({past_days})")[0].rows[0]["Rows"]
27+
28+
def test(self):
29+
test_dir = f"{self.ydb_client.database}/{self.test_name}"
30+
table_path = f"{test_dir}/table"
31+
secret_prefix = self.test_name
32+
access_key_id_secret_name = f"{secret_prefix}_key_id"
33+
access_key_secret_secret_name = f"{secret_prefix}_key_secret"
34+
cold_eds_path = f"{test_dir}/{self.cold_bucket}"
35+
frozen_eds_path = f"{test_dir}/{self.frozen_bucket}"
36+
37+
# Expect empty buckets to avoid unintentional data deletion/modification
38+
if self.s3_client.get_bucket_stat(self.cold_bucket) != (0, 0):
39+
raise Exception("Bucket for cold data is not empty")
40+
if self.s3_client.get_bucket_stat(self.frozen_bucket) != (0, 0):
41+
raise Exception("Bucket for frozen data is not empty")
42+
43+
self.ydb_client.query(f"""
44+
CREATE TABLE `{table_path}` (
45+
ts Timestamp NOT NULL,
46+
s String,
47+
val Uint64,
48+
PRIMARY KEY(ts),
49+
)
50+
WITH (STORE = COLUMN)
51+
"""
52+
)
53+
54+
logger.info(f"Table {table_path} created")
55+
56+
self.ydb_client.query(f"CREATE OBJECT {access_key_id_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_id}'")
57+
self.ydb_client.query(f"CREATE OBJECT {access_key_secret_secret_name} (TYPE SECRET) WITH value='{self.s3_client.key_secret}'")
58+
59+
self.ydb_client.query(f"""
60+
CREATE EXTERNAL DATA SOURCE `{cold_eds_path}` WITH (
61+
SOURCE_TYPE="ObjectStorage",
62+
LOCATION="{self.s3_client.endpoint}/{self.cold_bucket}",
63+
AUTH_METHOD="AWS",
64+
AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}",
65+
AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}",
66+
AWS_REGION="{self.s3_client.region}"
67+
)
68+
""")
69+
70+
self.ydb_client.query(f"""
71+
CREATE EXTERNAL DATA SOURCE `{frozen_eds_path}` WITH (
72+
SOURCE_TYPE="ObjectStorage",
73+
LOCATION="{self.s3_client.endpoint}/{self.frozen_bucket}",
74+
AUTH_METHOD="AWS",
75+
AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_secret_name}",
76+
AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_secret_name}",
77+
AWS_REGION="{self.s3_client.region}"
78+
)
79+
""")
80+
table = ColumnTableHelper(self.ydb_client, table_path)
81+
82+
cur_rows = 0
83+
while cur_rows < self.row_count:
84+
self.ydb_client.query("""
85+
$row_count = %i;
86+
$from_us = CAST(Timestamp('2010-01-01T00:00:00.000000Z') as Uint64);
87+
$to_us = CAST(Timestamp('2030-01-01T00:00:00.000000Z') as Uint64);
88+
$dt = $to_us - $from_us;
89+
$k = ((1ul << 64) - 1) / CAST($dt - 1 as Double);
90+
$rows= ListMap(ListFromRange(0, $row_count), ($i)->{
91+
$us = CAST(RandomNumber($i) / $k as Uint64) + $from_us;
92+
$ts = Unwrap(CAST($us as Timestamp));
93+
return <|
94+
ts: $ts,
95+
s: 'some date:' || CAST($ts as String),
96+
val: $us
97+
|>;
98+
});
99+
upsert into `%s`
100+
select * FROM AS_TABLE($rows);
101+
""" % (min(self.row_count - cur_rows, self.single_upsert_row_count), table_path))
102+
cur_rows = table.get_row_count()
103+
logger.info(f"{cur_rows} rows inserted in total, portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}")
104+
105+
logger.info(f"Rows older than {self.days_to_cool} days: {self.get_row_count_by_date(table_path, self.days_to_cool)}")
106+
logger.info(f"Rows older than {self.days_to_froze} days: {self.get_row_count_by_date(table_path, self.days_to_froze)}")
107+
108+
def portions_actualized_in_sys():
109+
portions = table.get_portion_stat_by_tier()
110+
logger.info(f"portions: {portions}, blobs: {table.get_blob_stat_by_tier()}")
111+
if len(portions) != 1 or "__DEFAULT" not in portions:
112+
raise Exception("Data not in __DEFAULT teir")
113+
return self.row_count <= portions["__DEFAULT"]["Rows"]
114+
115+
if not self.wait_for(lambda: portions_actualized_in_sys(), 120):
116+
raise Exception(".sys reports incorrect data portions")
117+
118+
t0 = time.time()
119+
stmt = f"""
120+
ALTER TABLE `{table_path}` SET (TTL =
121+
Interval("P{self.days_to_cool}D") TO EXTERNAL DATA SOURCE `{cold_eds_path}`,
122+
Interval("P{self.days_to_froze}D") TO EXTERNAL DATA SOURCE `{frozen_eds_path}`
123+
ON ts
124+
)
125+
"""
126+
logger.info(stmt)
127+
self.ydb_client.query(stmt)
128+
logger.info(f"TTL set in {time.time() - t0} seconds")
129+
130+
def data_distributes_across_tiers():
131+
cold_bucket_stat = self.s3_client.get_bucket_stat(self.cold_bucket)
132+
frozen_bucket_stat = self.s3_client.get_bucket_stat(self.frozen_bucket)
133+
logger.info(f"portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}")
134+
# TODO FIXME
135+
# We can not expect proper distribution of data across tiers due to https://github.com/ydb-platform/ydb/issues/13525
136+
# So we wait until some data appears in any bucket
137+
return cold_bucket_stat[0] != 0 or frozen_bucket_stat[0] != 0
138+
139+
if not self.wait_for(lambda: data_distributes_across_tiers(), 600):
140+
raise Exception("Data eviction has not been started")
141+
142+
t0 = time.time()
143+
stmt = f"""
144+
ALTER TABLE `{table_path}` SET (TTL =
145+
Interval("P{self.days_to_cool}D")
146+
ON ts
147+
)
148+
"""
149+
logger.info(stmt)
150+
self.ydb_client.query(stmt)
151+
logger.info(f"TTL set in {time.time() - t0} seconds")
152+
153+
# TODO FIXME after https://github.com/ydb-platform/ydb/issues/13523
154+
def data_deleted_from_buckets():
155+
cold_bucket_stat = self.s3_client.get_bucket_stat(self.cold_bucket)
156+
frozen_bucket_stat = self.s3_client.get_bucket_stat(self.frozen_bucket)
157+
logger.info(
158+
f"portions: {table.get_portion_stat_by_tier()}, blobs: {table.get_blob_stat_by_tier()}, cold bucket stat: {cold_bucket_stat}, frozen bucket stat: {frozen_bucket_stat}")
159+
return cold_bucket_stat[0] == 0 and frozen_bucket_stat[0] == 0
160+
161+
if not self.wait_for(lambda: data_deleted_from_buckets(), 120):
162+
# raise Exception("not all data deleted") TODO FIXME after https://github.com/ydb-platform/ydb/issues/13535
163+
pass

ydb/tests/olap/ttl_tiering/ya.make

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
PY3TEST()
2+
ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
3+
ENV(MOTO_SERVER_PATH="contrib/python/moto/bin/moto_server")
4+
ENV(YDB_ADDITIONAL_LOG_CONFIGS="TX_TIERING:DEBUG")
5+
6+
TEST_SRCS(
7+
base.py
8+
ttl_delete_s3.py
9+
)
10+
11+
SIZE(MEDIUM)
12+
13+
PEERDIR(
14+
ydb/tests/library
15+
ydb/public/sdk/python
16+
ydb/public/sdk/python/enable_v3_new_behavior
17+
contrib/python/boto3
18+
library/recipes/common
19+
)
20+
21+
DEPENDS(
22+
ydb/apps/ydbd
23+
contrib/python/moto/bin
24+
)
25+
26+
END()
27+

ydb/tests/olap/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ RECURSE(
33
scenario
44
docs
55
load
6+
ttl_tiering
67
)

0 commit comments

Comments
 (0)