-
Notifications
You must be signed in to change notification settings - Fork 13
/
oci-console
executable file
·186 lines (162 loc) · 7.55 KB
/
oci-console
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/env python3
# oci-console -- attach to serial console of Oracle Cloud VM instance
#
# Requires python-oci. Visit the "Profile" page in Oracle Cloud Console to set
# up an API key and generate the ~/.oci/config file (or install oci-cli and use
# `oci setup bootstrap`).
#
# API reference:
# https://docs.oracle.com/en-us/iaas/tools/python/2.105.0/api/core.html
import argparse
import oci
import os
from pprint import pprint
import re
import subprocess
import tempfile
import time
def hprint(arg, **kwargs):
print(f"\033[2m{arg}\033[m", **kwargs)
def validate_safe_cmd(cmd):
pattern = r"^ssh -o ProxyCommand='ssh -W %h:%p -p 443 [\w@.-]+\.oraclecloud\.com' [\w@.-]+$"
if re.fullmatch(pattern, cmd):
return cmd
else:
raise Exception(f"Unsafe SSH command: {cmd!r}")
class OciRcons():
def __init__(self):
self.config = oci.config.from_file()
self.compartment_id = self.config["tenancy"]
self.signer = self._get_api_signer()
self.compute_api = oci.core.ComputeClient(self.config, signer=self.signer)
def _get_api_signer(self):
if "security_token_file" in self.config:
# Short-term auth tokens retrieved via `oci session authenticate`
# cli_util.py:get_session_token_signer()
path = os.path.expanduser(self.config["security_token_file"])
with open(path, "r") as fh:
token = fh.read()
stc = oci.auth.security_token_container.SecurityTokenContainer(None, token)
if not stc.valid():
exit(f"error: CLI token has expired")
priv_key = oci.signer.load_private_key_from_file(self.config["key_file"])
return oci.auth.signers.SecurityTokenSigner(token, priv_key)
return oci.signer.Signer(self.config["tenancy"],
self.config["user"],
self.config["fingerprint"],
self.config["key_file"])
def do_list_instances(self):
hprint(f"Listing instances...")
resp = self.compute_api.list_instances(self.compartment_id)
for x in resp.data:
print(f"{x.display_name} ({x.lifecycle_state})")
def _create_console(self, instance_id, ssh_pubkey):
hprint(f"Creating console connection...", end=" ", flush=True)
parv = oci.core.models.CreateInstanceConsoleConnectionDetails(instance_id=instance_id,
public_key=ssh_pubkey)
resp = self.compute_api.create_instance_console_connection(parv)
conn = resp.data
while conn.lifecycle_state != "ACTIVE":
time.sleep(1)
resp = self.compute_api.get_instance_console_connection(conn.id)
conn = resp.data
#print(f" - Connection '...{conn.id[-20:]}' is {conn.lifecycle_state}")
hprint(f"done.")
return conn
def _delete_console(self, conn):
hprint(f"Deleting console connection...", end=" ", flush=True)
self.compute_api.delete_instance_console_connection(conn.id)
while conn.lifecycle_state != "DELETED":
time.sleep(1)
resp = self.compute_api.get_instance_console_connection(conn.id)
conn = resp.data
#print(f" - Connection '...{conn.id[-20:]}' is {conn.lifecycle_state}")
hprint(f"done.")
def _run_console(self, inst, conn):
ssh_config = ("Host *\n"
" EscapeChar &\n"
" ControlPath none\n"
" HostKeyAlgorithms +ssh-rsa\n"
" PubkeyAcceptedAlgorithms +ssh-rsa\n"
" UserKnownHostsFile {known_hosts_file}\n"
" StrictHostKeyChecking accept-new\n"
" PreferredAuthentications publickey\n"
"\n"
"Host instance-console.{region}.oci.oraclecloud.com\n"
" Port 443\n"
" User {connection_id}\n"
" # Hide the initial SSH banner\n"
" LogLevel QUIET\n"
"\n"
"Host {instance_id}\n"
" ProxyJump instance-console.{region}.oci.oraclecloud.com\n"
"\n"
"Include ~/.ssh/config\n")
with tempfile.NamedTemporaryFile() as known_hosts_file:
ssh_config = ssh_config.format(known_hosts_file=known_hosts_file.name,
region=inst.region,
instance_id=inst.id,
connection_id=conn.id)
with tempfile.NamedTemporaryFile() as ssh_config_file:
ssh_config_file.write(ssh_config.encode())
ssh_config_file.flush()
# TODO: validate conn.service_host_key_fingerprint
cmd = validate_safe_cmd(conn.connection_string)
# cmd = cmd.removeprefix("ssh ")
# cmd = f"ssh -F '{ssh_config_file.name}' {cmd}"
# print(f"Running: {cmd}")
# subprocess.run(cmd, shell=True)
print("\033[1mStarting console connection (use &. to exit)\033[m")
try:
subprocess.run(["ssh", "-F", ssh_config_file.name, inst.id])
except KeyboardInterrupt:
pass
def do_connect_instance(self, instance_name, ssh_pubkey):
hprint(f"Querying instance...", end=" ", flush=True)
resp = self.compute_api.list_instances(self.compartment_id,
display_name=instance_name)
for x in resp.data:
if x.display_name == instance_name:
inst = x
hprint(f"instance {inst.display_name!r} is {inst.lifecycle_state}")
if inst.lifecycle_state != "RUNNING":
exit(f"error: Instance {inst.display_name!r} is not running")
break
else:
hprint(f"not found")
exit(f"error: Instance {instance_name} not found")
hprint(f"Querying console connection...", end=" ", flush=True)
resp = self.compute_api.list_instance_console_connections(self.compartment_id,
instance_id=inst.id)
#for x in resp.data:
# print(f" - Connection '...{x.id[-20:]}' is {x.lifecycle_state}")
for x in resp.data:
if x.lifecycle_state == "ACTIVE":
# XXX: compare public key
conn = x
hprint(f"found existing")
break
else:
hprint(f"not found")
conn = self._create_console(inst.id, ssh_pubkey)
self._run_console(inst, conn)
self._delete_console(conn)
parser = argparse.ArgumentParser()
parser.description = "Connect to the serial console of an Oracle Cloud VM."
parser.add_argument("-i",
metavar="<path>",
dest="ssh_key",
default="~/.ssh/id_rsa",
help="specify path to SSH keypair")
parser.add_argument("instance",
nargs="?",
help="instance name to connect")
args = parser.parse_args()
rc = OciRcons()
if args.instance:
ssh_privkey = os.path.expanduser(args.ssh_key)
with open(f"{ssh_privkey}.pub", "r") as fh:
ssh_pubkey = fh.read()
rc.do_connect_instance(args.instance, ssh_pubkey)
else:
rc.do_list_instances()