-
Notifications
You must be signed in to change notification settings - Fork 105
/
Copy pathnodes.py
222 lines (168 loc) · 7.03 KB
/
nodes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import json
import random
from typing import Dict, Iterator, List
import waiting
from munch import Munch
import consts
from assisted_test_infra.test_infra.controllers.node_controllers import Node
from assisted_test_infra.test_infra.controllers.node_controllers.node_controller import NodeController
from assisted_test_infra.test_infra.tools import run_concurrently
from service_client.logger import SuppressAndLog, log
class NodeMapping:
def __init__(self, node, cluster_host):
self.name = node.name
self.node = node
self.cluster_host = cluster_host
class Nodes:
DEFAULT_STATIC_IPS_CONFIG = False
def __init__(self, node_controller: NodeController):
self.controller = node_controller
self._nodes = None
self._nodes_as_dict = None
self.__is_prepared = False
@property
def is_prepared(self) -> bool:
return self.__is_prepared
@property
def masters_count(self):
return self.controller.masters_count
@property
def workers_count(self):
return self.controller.workers_count
@property
def nodes_count(self):
return self.workers_count + self.masters_count
@property
def nodes(self) -> List[Node]:
if not self._nodes:
self._nodes = self.controller.list_nodes()
return self._nodes
@property
def is_ipv4(self):
return self.controller.is_ipv4
@property
def is_ipv6(self):
return self.controller.is_ipv6
def __getitem__(self, i):
return self.nodes[i]
def __len__(self):
return len(self.nodes)
def __iter__(self) -> Iterator[Node]:
for n in self.nodes:
yield n
def drop_cache(self):
self._nodes = None
self._nodes_as_dict = None
def get_nodes(self, refresh=False) -> List[Node]:
if refresh:
self.drop_cache()
return self.nodes
def get_masters(self):
return [node for node in self.nodes if node.is_master_in_name()]
def get_workers(self):
return [node for node in self.nodes if node.is_worker_in_name()]
@property
def nodes_as_dict(self):
if not self._nodes_as_dict:
self._nodes_as_dict = {node.name: node for node in self.nodes}
return self._nodes_as_dict
@property
def setup_time(self):
return self.controller.setup_time
def get_random_node(self):
return random.choice(self.nodes)
def shutdown_all(self):
self.run_for_all_nodes("shutdown")
def notify_iso_ready(self):
self.controller.notify_iso_ready()
def start_all(self, check_ips=True):
self.run_for_all_nodes("start", check_ips)
def start_given(self, nodes):
self.run_for_given_nodes(nodes, "start")
def shutdown_given(self, nodes):
self.run_for_given_nodes(nodes, "shutdown")
def format_all_disks(self):
self.run_for_all_nodes("format_disk")
def destroy_all(self):
self.run_for_all_nodes("shutdown")
def destroy_all_nodes(self):
self.controller.destroy_all_nodes()
self.__is_prepared = False
def prepare_nodes(self):
if not self.__is_prepared:
self.controller.prepare_nodes()
self.__is_prepared = True
def reboot_all(self):
self.run_for_all_nodes("restart")
def reboot_given(self, nodes):
self.run_for_given_nodes(nodes, "restart")
def get_cluster_network(self):
return self.controller.get_cluster_network()
def set_correct_boot_order(self, nodes=None, start_nodes=False):
nodes = nodes or self.nodes
log.info("Going to set correct boot order to nodes: %s", nodes)
self.run_for_given_nodes(nodes, "set_boot_order_flow", False, start_nodes)
def run_for_all_nodes(self, func_name, *args):
return self.run_for_given_nodes(self.nodes, func_name, *args)
def run_for_given_nodes(self, nodes, func_name, *args):
log.info("Running <%s> on nodes: %s", func_name, [node.name for node in nodes])
if self.controller._config.tf_platform == consts.Platforms.NUTANIX:
# nutanix doesn't allow concurrent requests
res = []
for node in nodes:
res.append(getattr(node, func_name)(*args))
return res
return run_concurrently([(getattr(node, func_name), *args) for node in nodes])
def run_for_given_nodes_by_cluster_hosts(self, cluster_hosts, func_name, *args):
return self.run_for_given_nodes(
[self.get_node_from_cluster_host(host) for host in cluster_hosts], func_name, *args
)
@staticmethod
def run_ssh_command_on_given_nodes(nodes, command) -> Dict:
return run_concurrently({node.name: (node.run_command, command) for node in nodes})
def set_wrong_boot_order(self, nodes=None, start_nodes=True):
nodes = nodes or self.nodes
log.info("Setting wrong boot order for %s", self.nodes_as_dict.keys())
self.run_for_given_nodes(nodes, "set_boot_order_flow", True, start_nodes)
def get_bootstrap_node(self, cluster) -> Node:
for cluster_host_object in cluster.get_hosts():
if cluster_host_object.get("bootstrap", False):
node = self.get_node_from_cluster_host(cluster_host_object)
log.info("Bootstrap node is %s", node.name)
return node
def create_nodes_cluster_hosts_mapping(self, cluster):
node_mapping_dict = {}
for cluster_host_object in cluster.get_hosts():
name = self.get_cluster_hostname(cluster_host_object)
node_mapping_dict[name] = NodeMapping(self.nodes_as_dict[name], Munch.fromDict(cluster_host_object))
return node_mapping_dict
def get_node_from_cluster_host(self, cluster_host_object):
hostname = self.get_cluster_hostname(cluster_host_object)
return self.get_node_by_hostname(hostname)
def get_node_by_hostname(self, get_node_by_hostname):
return self.nodes_as_dict[get_node_by_hostname]
def get_cluster_host_obj_from_node(self, cluster, node):
mapping = self.create_nodes_cluster_hosts_mapping(cluster=cluster)
return mapping[node.name].cluster_host
@staticmethod
def get_cluster_hostname(cluster_host_object):
inventory = json.loads(cluster_host_object["inventory"])
return inventory["hostname"]
def set_single_node_ip(self, ip):
self.controller.set_single_node_ip(ip)
def wait_for_networking(
self,
timeout=3 * consts.MINUTE,
interval=consts.DEFAULT_CHECK_STATUSES_INTERVAL,
):
log.info("Wait till %s nodes have MAC and IP address", len(self.nodes))
# Best effort
with SuppressAndLog(waiting.TimeoutExpired):
waiting.wait(
lambda: self._are_nodes_network_prepared(),
timeout_seconds=timeout,
sleep_seconds=interval,
waiting_for="nodes to have IP and MAC addresses",
)
def _are_nodes_network_prepared(self):
return all(node.macs and node.ips for node in self.nodes)