forked from red-hat-storage/cephci
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbaremetal.py
158 lines (128 loc) · 4.64 KB
/
baremetal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""Collects the Baremetal information and creates the cephNode object."""
from copy import deepcopy
from os.path import expanduser
from typing import List, Optional
from ceph.ceph import SSHConnectionManager
from utility.log import Log
LOG = Log(__name__)
class CephBaremetalNode:
"""Represent the Baremetal Node."""
def __init__(self, **params) -> None:
"""
Initialize the instance node using the provided information.
This will assign below properties to Baremetal Node
params :
username
password
Ip
Hostname
root_login
volumes
subnet
Id
location
"""
# CephVM attributes
self._roles: list = list()
self.osd_scenario: Optional[int] = None
self.keypair: Optional[str] = None
self.params = params
self.location = params.get("location")
self.private_key = params.get("root_private_key")
if self.private_key:
self.private_key = expanduser(self.private_key)
self.root_connection = SSHConnectionManager(
self.params.get("ip"),
"root",
self.params.get("root_password"),
look_for_keys=True,
private_key_file_path=self.private_key,
)
else:
self.root_connection = SSHConnectionManager(
self.params.get("ip"),
"root",
self.params.get("root_password"),
look_for_keys=False,
)
self.rssh = self.root_connection.get_client
# Check if user exists
try:
_, out, err = self.rssh().exec_command(
command="id -u cephuser",
)
if err.readLines():
LOG.debug(err.readLines())
self._create_user(name="cephuser")
else:
LOG.debug("Reusing existing user account of cephuser.")
except BaseException: # noqa
self._create_user(name="cephuser")
self.rssh().exec_command(
'echo "cephuser ALL=(ALL) NOPASSWD:ALL" > /etc/sudoers.d/cephuser'
)
self.rssh().exec_command(command="touch /ceph-qa-ready")
def _create_user(self, name: str = "cephuser") -> None:
"""
Create a Linux user account using the provided name.
Args:
name (str): Name of the user account
Raises:
CommandError
"""
LOG.info(f"Creating user account with {name} ...")
self.rssh().exec_command(
command=f"useradd {name} -p '$1$1fsNAJ7G$bx4Sz9VnpOnIygVKVaGCT.'"
)
key_file = "~/.ssh/authorized_keys"
create_dir = f"install -d -m 700 -o {name} -g {name} /home/{name}/.ssh"
copy_file = f"install -m 600 -o {name} -g {name} {key_file} /home/{name}/.ssh/"
bash_script = f"if [ -f {key_file} ] ; then {create_dir}; {copy_file} ; fi"
self.rssh().exec_command(command=f"chown {name}:{name} /home/{name}")
self.rssh().exec_command(command=bash_script)
@property
def ip_address(self) -> str:
"""Return the private IP address of the node."""
return self.params.get("ip")
@property
def node_type(self) -> str:
return "baremetal"
@property
def hostname(self) -> str:
"""Return the hostname of the VM."""
return self.params.get("hostname")
@property
def root_password(self) -> str:
"""Return root password for the machine"""
return self.params.get("root_password", "passwd")
@property
def root_login(self) -> str:
return self.params.get("root_login", "root")
@property
def volumes(self):
"""Return the list of storage volumes attached to the node."""
return self.params.get("volumes", [])
@property
def no_of_volumes(self) -> int:
"""Return the number of volumes attached to the VM."""
return len(self.volumes) if self.volumes else 0
@property
def subnet(self) -> str:
"""Return the subnet information."""
return self.params.get("subnet")
@property
def role(self) -> List:
"""Return the Ceph roles of the instance."""
return self._roles
@role.setter
def role(self, roles: list) -> None:
"""Set the roles for the VM."""
self._roles = deepcopy(roles)
@property
def shortname(self) -> str:
"""Return the shortform of the hostname."""
return self.hostname.split(".")[0]
@property
def id(self) -> int:
"""Return the node id."""
return self.params.get("id")