Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions ydb/tests/olap/lib/allure_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ def _set_results_plot(test_info: dict[str, str], suite: str, test: str, refferen
def _set_logs_command(test_info: dict[str, str], start_time: float, end_time: float):
hosts = []
for node in YdbCluster.get_cluster_nodes():
ss = node.get('SystemState', {})
if 'Storage' in ss.get('Roles', []):
hosts.append(ss.get('Host'))
if node.role == YdbCluster.Node.Role.STORAGE:
hosts.append(node.host)
hosts_cmd = ' '.join([f'-H {h}' for h in hosts])
start = datetime.fromtimestamp(start_time, UTC).isoformat()
end = datetime.fromtimestamp(end_time, UTC).isoformat()
Expand Down
10 changes: 4 additions & 6 deletions ydb/tests/olap/lib/ydb_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from ydb.tests.olap.lib.ydb_cluster import YdbCluster
from ydb.tests.olap.lib.utils import get_external_param
from enum import StrEnum
from time import time
from types import TracebackType


Expand Down Expand Up @@ -152,13 +151,12 @@ def _load_query_out(self) -> None:
self.result.query_out = r.read()

@staticmethod
def _get_nodes_info() -> dict[str, dict[str, int]]:
nodes = YdbCluster.get_cluster_nodes(db_only=True)
def _get_nodes_info() -> dict[str, dict[str, Any]]:
return {
n['SystemState']['Host']: {
'start_time': int(int(n['SystemState'].get('StartTime', time() * 1000)) / 1000)
n.host: {
'start_time': n.start_time
}
for n in nodes
for n in YdbCluster.get_cluster_nodes(db_only=True)
}

def _check_nodes(self):
Expand Down
106 changes: 69 additions & 37 deletions ydb/tests/olap/lib/ydb_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from copy import deepcopy
from time import sleep, time
from typing import List, Optional
from enum import Enum

LOGGER = logging.getLogger()

Expand All @@ -20,6 +21,33 @@ def __init__(self, url: str, caption: str = 'link') -> None:
self.url = f'https://{self.url}'
self.caption = caption

class Node:
class Role(Enum):
UNKNOWN = 0
STORAGE = 1
COMPUTE = 2

class Tablet:
def __init__(self, desc: dict):
self.state: str = desc.get('State', 'Red')
self.type: str = desc.get('Type', 'Unknown')
self.count: int = desc.get('Count', 0)

def __init__(self, desc: dict):
ss = desc.get('SystemState', {})
self.host: str = ss.get('Host', '')
self.disconnected: bool = desc.get('Disconnected', False)
self.cluster_name: str = ss.get('ClusterName', '')
self.version: str = ss.get('Version', '')
self.start_time: float = 0.001 * int(ss.get('StartTime', time() * 1000))
if 'Storage' in ss.get('Roles', []):
self.role = YdbCluster.Node.Role.STORAGE
elif 'Tenants' in ss.get('Roles', []):
self.role = YdbCluster.Node.Role.COMPUTE
else:
self.role = YdbCluster.Node.Role.UNKNOWN
self.tablets = [YdbCluster.Node.Tablet(t) for t in desc.get('Tablets', [])]

_ydb_driver = None
_results_driver = None
_cluster_info = None
Expand Down Expand Up @@ -53,7 +81,7 @@ def _get_service_url(cls):
return f'http://{host}:{port}'

@classmethod
def get_cluster_nodes(cls, path: Optional[str] = None, db_only: bool = False) -> list[dict[str:any]]:
def get_cluster_nodes(cls, path: Optional[str] = None, db_only: bool = False) -> list[YdbCluster.Node]:
try:
url = f'{cls._get_service_url()}/viewer/json/nodes?'
if db_only or path is not None:
Expand All @@ -64,27 +92,32 @@ def get_cluster_nodes(cls, path: Optional[str] = None, db_only: bool = False) ->
# token = os.getenv('OLAP_YDB_OAUTH', None)
# if token is not None:
# headers['Authorization'] = token
data = requests.get(url, headers=headers).json()
nodes = data.get('Nodes', [])
return nodes
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
if not isinstance(data, dict):
raise Exception(f'Incorrect response type: {data}')
return [YdbCluster.Node(n) for n in data.get('Nodes', [])]
except requests.HTTPError as e:
LOGGER.error(f'{e.strerror}: {e.response.content}')
except Exception as e:
LOGGER.error(e)
return [], 0
return []

@classmethod
def get_cluster_info(cls):
if cls._cluster_info is None:
version = ''
cluster_name = ''
nodes_wilcard = ''
nodes = cls.get_cluster_nodes()
nodes = cls.get_cluster_nodes(db_only=True)
for node in nodes:
n = node.get('SystemState', {})
cluster_name = n.get('ClusterName', cluster_name)
version = n.get('Version', version)
for tenant in n.get('Tenants', []):
if tenant.endswith(cls.ydb_database):
nodes_wilcard = n.get('Host', nodes_wilcard).split('.')[0].rstrip('0123456789')
if not cluster_name:
cluster_name = node.cluster_name
if not version:
version = node.version
if not nodes_wilcard and node.role == YdbCluster.Node.Role.COMPUTE:
nodes_wilcard = node.host.split('.')[0].rstrip('0123456789')
cls._cluster_info = {
'database': cls.ydb_database,
'version': version,
Expand Down Expand Up @@ -162,6 +195,14 @@ def _get_tables(cls, path):
result.append(full_path)
return result

@staticmethod
def _join_errors(log_level: int, errors: list[str]):
if len(errors) > 0:
error = ', '.join(errors)
LOGGER.log(log_level, error)
return error
return None

@classmethod
@allure.step('Execute scan query')
def execute_single_result_query(cls, query, timeout=10):
Expand All @@ -180,21 +221,14 @@ def execute_single_result_query(cls, query, timeout=10):
@classmethod
@allure.step('Check if YDB alive')
def check_if_ydb_alive(cls, timeout=10, balanced_paths=None) -> tuple[str, str]:
def _check_node(n):
name = 'UnknownNode'
error = None
try:
ss = n.get('SystemState', {})
name = ss.get("Host")
start_time = int(ss.get('StartTime', int(time()) * 1000)) / 1000
uptime = int(time()) - start_time
if uptime < 15:
error = f'Node {name} too yong: {uptime}'
except BaseException as ex:
error = f"Error while process node {name}: {ex}"
if error:
LOGGER.error(error)
return error
def _check_node(n: YdbCluster.Node):
errors = []
if n.disconnected:
errors.append(f'Node {n.host} disconnected')
uptime = time() - n.start_time
if uptime < 15:
errors.append(f'Node {n.host} too yong: {uptime}')
return cls._join_errors(logging.ERROR, errors)

errors = []
warnings = []
Expand All @@ -216,7 +250,7 @@ def _check_node(n):
else:
ok_node_count += 1
if ok_node_count < nodes_count:
errors.append(f'Only {ok_node_count} from {ok_node_count} dynnodes are ok: {",".join(node_errors)}')
errors.append(f'Only {ok_node_count} from {nodes_count} dynnodes are ok: {",".join(node_errors)}')
paths_to_balance = []
if isinstance(balanced_paths, str):
paths_to_balance += cls._get_tables(balanced_paths)
Expand All @@ -232,11 +266,11 @@ def _check_node(n):
min = 0
for tn in table_nodes:
tablet_count = 0
for tablet in tn.get("Tablets", []):
if tablet.get("State") != "Green":
warnings.append(f'Node {tn.get("SystemState", {}).get("Host")}: {tablet.get("Count")} tablets of type {tablet.get("Type")} in {tablet.get("State")} state')
if tablet.get("Type") in {"ColumnShard", "DataShard"}:
tablet_count += tablet.get("Count")
for tablet in tn.tablets:
if tablet.count > 0 and tablet.state != "Green":
warnings.append(f'Node {tn.host}: {tablet.count} tablets of type {tablet.type} in {tablet.state} state')
if tablet.type in {"ColumnShard", "DataShard"}:
tablet_count += tablet.count
if tablet_count > 0:
if min is None or tablet_count < min:
min = tablet_count
Expand All @@ -251,10 +285,8 @@ def _check_node(n):
cls.execute_single_result_query("select 1", timeout)
except BaseException as ex:
errors.append(f"Cannot connect to YDB: {ex}")
error = ', '.join(errors) if len(errors) > 0 else None
warning = ', '.join(warnings) if len(warnings) > 0 else None
LOGGER.error(f'Errors: {error}, warnings: {warning}')
return error, warning

return cls._join_errors(logging.ERROR, errors), cls._join_errors(logging.WARNING, warnings)

@classmethod
@allure.step('Wait YDB alive')
Expand Down