Skip to content

Commit

Permalink
tn-cli: handle out of band uploads over http(s)
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Jan 26, 2019
1 parent 0d37318 commit 21cba3c
Showing 1 changed file with 107 additions and 46 deletions.
153 changes: 107 additions & 46 deletions tn-cli/tn-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
# Outstanding request for a synchronous message.
WaitingFor = None

# Last obtained authentication token
AuthToken = ''

# IO queues and a thread for asynchronous input/output
InputQueue = queue.Queue()
OutputQueue = queue.Queue()
Expand All @@ -86,6 +89,13 @@ def printerr(*args):
if text:
sys.stderr.write(text + "\n")

# Python is retarded.
class dotdict(dict):
"""dot.notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__

# Default values for user and topic
DefaultUser = None
DefaultTopic = None
Expand Down Expand Up @@ -163,6 +173,28 @@ def inline_image(filename):
stdoutln("Failed processing image '" + filename + "':", err)
return None

# Create a drafty message with an *in-band* attachment.
def attachment(filename):
try:
f = open(filename, 'rb')
# Try to guess the mime type.
mimetype = mimetypes.guess_type(filename)
data = base64.b64encode(f.read())
# python3 fix.
if type(data) is not str:
data = data.decode()
result = {
'fmt': [{'at': -1}],
'ent': [{'tp': 'EX', 'data':{
'val': data, 'mime': mimetype, 'name':os.path.basename(filename)
}}]
}
f.close()
return result
except IOError as err:
stdoutln("Error processing attachment '" + filename + "':", err)
return None

# encode_to_bytes takes an object/dictionary and converts it to json-formatted byte array.
def encode_to_bytes(src):
if src == None:
Expand Down Expand Up @@ -285,7 +317,7 @@ def hiMsg(id):
ver=LIB_VERSION, lang="EN"))

# {acc}
def accMsg(id, cmd):
def accMsg(id, cmd, ignored):
if cmd.uname:
if cmd.password == None:
cmd.password = ''
Expand All @@ -305,7 +337,7 @@ def accMsg(id, cmd):
cred=parse_cred(cmd.cred)), on_behalf_of=DefaultUser)

# {login}
def loginMsg(id, cmd):
def loginMsg(id, cmd, ignored):
if cmd.secret == None:
if cmd.uname == None:
cmd.uname = ''
Expand All @@ -327,7 +359,7 @@ def loginMsg(id, cmd):
return msg

# {sub}
def subMsg(id, cmd):
def subMsg(id, cmd, ignored):
if not cmd.topic:
cmd.topic = DefaultTopic
if cmd.get_query:
Expand All @@ -342,13 +374,13 @@ def subMsg(id, cmd):
tags=cmd.tags.split(",") if cmd.tags else None), get_query=cmd.get_query), on_behalf_of=DefaultUser)

# {leave}
def leaveMsg(id, cmd):
def leaveMsg(id, cmd, ignored):
if not cmd.topic:
cmd.topic = DefaultTopic
return pb.ClientMsg(leave=pb.ClientLeave(id=str(id), topic=cmd.topic, unsub=cmd.unsub), on_behalf_of=DefaultUser)

# {pub}
def pubMsg(id, cmd):
def pubMsg(id, cmd, ignored):
if not cmd.topic:
cmd.topic = DefaultTopic

Expand All @@ -362,15 +394,19 @@ def pubMsg(id, cmd):
key, val = h.split(":")
head[key] = encode_to_bytes(val)

content = json.loads(cmd.drafty) if cmd.drafty else inline_image(cmd.image) if cmd.image else cmd.content
content = json.loads(cmd.drafty) if cmd.drafty \
else inline_image(cmd.image) if cmd.image \
else attachment(cmd.attachment) if cmd.attachment \
else cmd.content

if not content:
return None

return pb.ClientMsg(pub=pb.ClientPub(id=str(id), topic=cmd.topic, no_echo=True,
head=head, content=encode_to_bytes(content)), on_behalf_of=DefaultUser)

# {get}
def getMsg(id, cmd):
def getMsg(id, cmd, ignored):
if not cmd.topic:
cmd.topic = DefaultTopic

Expand All @@ -387,7 +423,7 @@ def getMsg(id, cmd):
query=pb.GetQuery(what=" ".join(what))), on_behalf_of=DefaultUser)

# {set}
def setMsg(id, cmd):
def setMsg(id, cmd, ignored):
if not cmd.topic:
cmd.topic = DefaultTopic

Expand All @@ -404,7 +440,7 @@ def setMsg(id, cmd):
tags=cmd.tags.split(",") if cmd.tags else None)), on_behalf_of=DefaultUser)

# {del}
def delMsg(id, cmd):
def delMsg(id, cmd, ignored):
if not cmd.what:
stdoutln("Must specify what to delete")
return None
Expand Down Expand Up @@ -467,7 +503,7 @@ def delMsg(id, cmd):
return msg

# {note}
def noteMsg(id, cmd):
def noteMsg(id, cmd, ignored):
if not cmd.topic:
cmd.topic = DefaultTopic

Expand All @@ -483,18 +519,23 @@ def noteMsg(id, cmd):
cmd.seq = int(cmd.seq)
return pb.ClientMsg(note=pb.ClientNote(topic=cmd.topic, what=enum_what, seq_id=cmd.seq), on_behalf_of=DefaultUser)

# Upload file out of band (not gRPC)
def upload(id, cmd):
result = requests.post('/v' + PROTOCOL_VERSION + '/file/u/',
headers = {
'X-Tinode-APIKey': this._apiKey,
'X-Tinode-Auth': 'Token ' + this._authToken.token,
'User-Agent': APP_NAME + " " + APP_VERSION + "/" + LIB_VERSION
},
data = {'id': id},
files = {'file': (cmd.filename, open(cmd.filename, 'rb'))})
# Upload file out of band over HTTP(S) (not gRPC).
def upload(id, cmd, args):
try:
scheme = 'https' if args.ssl else 'http'
result = requests.post(
scheme + '://' + args.web_host + '/v' + PROTOCOL_VERSION + '/file/u/',
headers = {
'X-Tinode-APIKey': args.api_key,
'X-Tinode-Auth': 'Token ' + AuthToken,
'User-Agent': APP_NAME + " " + APP_VERSION + "/" + LIB_VERSION
},
data = {'id': id},
files = {'file': (cmd.filename, open(cmd.filename, 'rb'))})
handle_ctrl(dotdict(json.loads(result.text)['ctrl']))

print(result.status_code, result.reason)
except Exception as ex:
stdoutln("Failed to upload '{0}'".format(cmd.filename), ex)

return None

Expand Down Expand Up @@ -668,7 +709,7 @@ def parse_input(cmd):

# Process command-line input string: execute local commands, generate
# protobuf messages for remote commands.
def serialize_cmd(string, id):
def serialize_cmd(string, id, args):
"""Take string read from the command line, convert in into a protobuf message"""
cmd = {}
messages = {
Expand All @@ -680,8 +721,7 @@ def serialize_cmd(string, id):
"get": getMsg,
"set": setMsg,
"del": delMsg,
"note": noteMsg,
"upload": upload
"note": noteMsg
}
try:
# Convert string into a dictionary
Expand Down Expand Up @@ -710,8 +750,15 @@ def serialize_cmd(string, id):
time.sleep(cmd.millis/1000.)
return None, None

elif cmd.cmd == "upload":
# Start async upload
upload_thread = threading.Thread(target=upload, args=(id, derefVals(cmd), args), name="Uploader_"+cmd.filename)
upload_thread.start()
cmd.no_yield = True
return True, cmd

elif cmd.cmd in messages:
return messages[cmd.cmd](id, derefVals(cmd)), cmd
return messages[cmd.cmd](id, derefVals(cmd), args), cmd

else:
stdoutln("Error: unrecognized: '{0}'".format(cmd.cmd))
Expand All @@ -722,7 +769,7 @@ def serialize_cmd(string, id):
return None, None

# Generator of protobuf messages.
def gen_message(scheme, secret):
def gen_message(scheme, secret, args):
"""Client message generator: reads user input as string,
converts to pb.ClientMsg, and yields"""
global InputThread
Expand All @@ -744,7 +791,7 @@ def gen_message(scheme, secret):
setattr(login, 'scheme', scheme)
setattr(login, 'secret', secret)
setattr(login, 'cred', None)
yield loginMsg(id, login)
yield loginMsg(id, login, args)

print_prompt = True

Expand All @@ -756,7 +803,7 @@ def gen_message(scheme, secret):
if inp == 'exit' or inp == 'quit' or inp == '.exit' or inp == '.quit':
return

pbMsg, cmd = serialize_cmd(inp, id)
pbMsg, cmd = serialize_cmd(inp, id, args)
print_prompt = IsInteractive
if pbMsg != None:
if not IsInteractive:
Expand All @@ -768,7 +815,8 @@ def gen_message(scheme, secret):
cmd.await_id = str(id)
WaitingFor = cmd

yield pbMsg
if not cmd.no_yield:
yield pbMsg

elif not OutputQueue.empty():
sys.stdout.write("\r<= "+OutputQueue.get())
Expand All @@ -786,6 +834,29 @@ def gen_message(scheme, secret):
WaitingFor = None
time.sleep(0.1)


# Handle {ctrl} server response
def handle_ctrl(ctrl):
global WaitingFor

# Run code on command completion
func = OnCompletion.get(ctrl.id)
if func:
del OnCompletion[ctrl.id]
if ctrl.code >= 200 and ctrl.code < 400:
func(ctrl.params)

if WaitingFor and WaitingFor.await_id == ctrl.id:
if 'varname' in WaitingFor:
Variables[WaitingFor.varname] = ctrl
if WaitingFor.failOnError and ctrl.code >= 400:
raise Exception(str(ctrl.code) + " " + ctrl.text)
WaitingFor = None

topic = " (" + str(ctrl.topic) + ")" if ctrl.topic else ""
stdoutln("\r<= " + str(ctrl.code) + " " + ctrl.text + topic)


# The main processing loop: send messages to server, receive responses.
def run(args, schema, secret):
global WaitingFor
Expand All @@ -801,27 +872,12 @@ def run(args, schema, secret):
channel = grpc.insecure_channel(args.host)

# Call the server
stream = pbx.NodeStub(channel).MessageLoop(gen_message(schema, secret))
stream = pbx.NodeStub(channel).MessageLoop(gen_message(schema, secret, args))

# Read server responses
for msg in stream:
if msg.HasField("ctrl"):
# Run code on command completion
func = OnCompletion.get(msg.ctrl.id)
if func:
del OnCompletion[msg.ctrl.id]
if msg.ctrl.code >= 200 and msg.ctrl.code < 400:
func(msg.ctrl.params)

if WaitingFor and WaitingFor.await_id == msg.ctrl.id:
if 'varname' in WaitingFor:
Variables[WaitingFor.varname] = msg.ctrl
if WaitingFor.failOnError and msg.ctrl.code >= 400:
raise Exception(str(msg.ctrl.code) + " " + msg.ctrl.text)
WaitingFor = None

topic = " (" + str(msg.ctrl.topic) + ")" if msg.ctrl.topic else ""
stdoutln("\r<= " + str(msg.ctrl.code) + " " + msg.ctrl.text + topic)
handle_ctrl(msg.ctrl)

elif msg.HasField("meta"):
what = []
Expand Down Expand Up @@ -868,6 +924,7 @@ def run(args, schema, secret):
stdoutln("\rMessage type not handled" + str(msg))

except grpc.RpcError as err:
print(err)
printerr("gRPC failed with {0}: {1}".format(err.code(), err.details()))
except Exception as ex:
printerr("Request failed: {0}".format(ex))
Expand All @@ -892,6 +949,8 @@ def read_cookie():

# Save cookie to file after successful login.
def save_cookie(params):
global AuthToken

if params == None:
return

Expand All @@ -902,6 +961,8 @@ def save_cookie(params):

stdoutln("Authenticated as", nice.get('user'))

AuthToken = nice.get('token')

try:
cookie = open('.tn-cli-cookie', 'w')
json.dump(nice, cookie)
Expand Down

0 comments on commit 21cba3c

Please sign in to comment.