Skip to content

webrepl_cli.py: Add support for accessing REPL. #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 8, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 94 additions & 16 deletions webrepl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
WEBREPL_PUT_FILE = 1
WEBREPL_GET_FILE = 2
WEBREPL_GET_VER = 3
WEBREPL_FRAME_TXT = 0x81
WEBREPL_FRAME_BIN = 0x82


def debugmsg(msg):
Expand All @@ -35,13 +37,12 @@ def __init__(self, s):
self.s = s
self.buf = b""

def write(self, data):
def write(self, data, frame=WEBREPL_FRAME_BIN):
l = len(data)
if l < 126:
# TODO: hardcoded "binary" type
hdr = struct.pack(">BB", 0x82, l)
hdr = struct.pack(">BB", frame, l)
else:
hdr = struct.pack(">BBH", 0x82, 126, l)
hdr = struct.pack(">BBH", frame, 126, l)
self.s.send(hdr)
self.s.send(data)

Expand Down Expand Up @@ -115,6 +116,71 @@ def get_ver(ws):
return d


def do_repl(ws):
import termios, select

class ConsolePosix:
def __init__(self):
self.infd = sys.stdin.fileno()
self.infile = sys.stdin.buffer.raw
self.outfile = sys.stdout.buffer.raw
self.orig_attr = termios.tcgetattr(self.infd)

def enter(self):
# attr is: [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]
attr = termios.tcgetattr(self.infd)
attr[0] &= ~(
termios.BRKINT | termios.ICRNL | termios.INPCK | termios.ISTRIP | termios.IXON
)
attr[1] = 0
attr[2] = attr[2] & ~(termios.CSIZE | termios.PARENB) | termios.CS8
attr[3] = 0
attr[6][termios.VMIN] = 1
attr[6][termios.VTIME] = 0
termios.tcsetattr(self.infd, termios.TCSANOW, attr)

def exit(self):
termios.tcsetattr(self.infd, termios.TCSANOW, self.orig_attr)

def readchar(self):
res = select.select([self.infd], [], [], 0)
if res[0]:
return self.infile.read(1)
else:
return None

def write(self, buf):
self.outfile.write(buf)

print("Use Ctrl-] to exit this shell")
console = ConsolePosix()
console.enter()
try:
while True:
sel = select.select([console.infd, ws.s], [], [])
c = console.readchar()
if c:
if c == b"\x1d": # ctrl-], exit
break
else:
ws.write(c, WEBREPL_FRAME_TXT)
if ws.s in sel[0]:
c = ws.read(1, text_ok=True)
while c is not None:
# pass character through to the console
oc = ord(c)
if oc in (8, 9, 10, 13, 27) or oc >= 32:
console.write(c)
else:
console.write(b"[%02x]" % ord(c))
if ws.buf:
c = ws.read(1)
else:
c = None
finally:
console.exit()


def put_file(ws, local_file, remote_file):
sz = os.stat(local_file)[6]
dest_fname = (SANDBOX + remote_file).encode("utf-8")
Expand Down Expand Up @@ -164,11 +230,16 @@ def get_file(ws, local_file, remote_file):

def help(rc=0):
exename = sys.argv[0].rsplit("/", 1)[-1]
print("%s - Perform remote file operations using MicroPython WebREPL protocol" % exename)
print(
"%s - Access REPL, perform remote file operations via MicroPython WebREPL protocol"
% exename
)
print("Arguments:")
print(" [-p password] <host> - Access the remote REPL")
print(" [-p password] <host>:<remote_file> <local_file> - Copy remote file to local file")
print(" [-p password] <local_file> <host>:<remote_file> - Copy local file to remote file")
print("Examples:")
print(" %s 192.168.4.1" % exename)
print(" %s script.py 192.168.4.1:/another_name.py" % exename)
print(" %s script.py 192.168.4.1:/app/" % exename)
print(" %s -p password 192.168.4.1:/app/script.py ." % exename)
Expand Down Expand Up @@ -212,26 +283,30 @@ def client_handshake(sock):


def main():
if len(sys.argv) not in (3, 5):
help(1)

passwd = None
for i in range(len(sys.argv)):
if sys.argv[i] == '-p':
sys.argv.pop(i)
passwd = sys.argv.pop(i)
break

if not passwd:
if len(sys.argv) not in (2, 3):
help(1)

if passwd is None:
import getpass
passwd = getpass.getpass()

if ":" in sys.argv[1] and ":" in sys.argv[2]:
error("Operations on 2 remote files are not supported")
if ":" not in sys.argv[1] and ":" not in sys.argv[2]:
error("One remote file is required")
if len(sys.argv) > 2:
if ":" in sys.argv[1] and ":" in sys.argv[2]:
error("Operations on 2 remote files are not supported")
if ":" not in sys.argv[1] and ":" not in sys.argv[2]:
error("One remote file is required")

if ":" in sys.argv[1]:
if len(sys.argv) == 2:
op = "repl"
host, port, _ = parse_remote(sys.argv[1] + ":")
elif ":" in sys.argv[1]:
op = "get"
host, port, src_file = parse_remote(sys.argv[1])
dst_file = sys.argv[2]
Expand All @@ -248,7 +323,8 @@ def main():

if True:
print("op:%s, host:%s, port:%d, passwd:%s." % (op, host, port, passwd))
print(src_file, "->", dst_file)
if op in ("get", "put"):
print(src_file, "->", dst_file)

s = socket.socket()

Expand All @@ -267,7 +343,9 @@ def main():
# Set websocket to send data marked as "binary"
ws.ioctl(9, 2)

if op == "get":
if op == "repl":
do_repl(ws)
elif op == "get":
get_file(ws, dst_file, src_file)
elif op == "put":
put_file(ws, src_file, dst_file)
Expand Down