From f900add92479900d4fe08ed0c9191d8257936783 Mon Sep 17 00:00:00 2001 From: or-else Date: Fri, 31 Aug 2018 20:03:38 +0300 Subject: [PATCH] better version checking, race in grpc, redundant "on" pres update --- chatbot/chatbot.py | 34 +++++++++++++++++++++++++--------- server/hdl_grpc.go | 11 ++++++++++- server/pres.go | 7 +++---- server/session.go | 8 ++++---- server/utils.go | 22 ++++++++++++++-------- 5 files changed, 56 insertions(+), 26 deletions(-) diff --git a/chatbot/chatbot.py b/chatbot/chatbot.py index 6e7f64bfc..6327aeb0f 100644 --- a/chatbot/chatbot.py +++ b/chatbot/chatbot.py @@ -1,5 +1,8 @@ """Python implementation of a Tinode chatbot.""" +# For compatibility between python 2 and 3 +from __future__ import print_function + import argparse import base64 from concurrent import futures @@ -23,23 +26,26 @@ from tinode_grpc import pbx APP_NAME = "Tino-chatbot" -APP_VERSION = "1.0" +APP_VERSION = "1.1" LIB_VERSION = pkg_resources.get_distribution("tinode_grpc").version # Dictionary wich contains lambdas to be executed when server response is received onCompletion = {} + # Add bundle for future execution def add_future(tid, bundle): onCompletion[tid] = bundle # Resolve or reject the future -def exec_future(tid, code, params): +def exec_future(tid, code, text, params): bundle = onCompletion.get(tid) if bundle != None: del onCompletion[tid] if code >= 200 and code < 400: arg = bundle.get('arg') bundle.get('action')(arg, params) + else: + print("Error:", code, text) # List of active subscriptions subscriptions = {} @@ -49,6 +55,11 @@ def add_subscription(topic): def del_subscription(topic): subscriptions.pop(topic, None) +def server_version(params): + if params == None: + return + print("Server:", params['build'], params['ver']) + # Quotes from the fortune cookie file quotes = [] @@ -108,32 +119,35 @@ def client_reset(): def hello(): tid = next_id() + add_future(tid, { + 'action': lambda unused, params: server_version(params), + }) return pb.ClientMsg(hi=pb.ClientHi(id=tid, user_agent=APP_NAME + "/" + APP_VERSION + " (" + platform.system() + "/" + platform.release() + "); gRPC-python/" + LIB_VERSION, ver=LIB_VERSION, lang="EN")) def login(cookie_file_name, scheme, secret): tid = next_id() - onCompletion[tid] = { + add_future(tid, { 'arg': cookie_file_name, 'action': lambda fname, params: save_auth_cookie(fname, params), - } + }) return pb.ClientMsg(login=pb.ClientLogin(id=tid, scheme=scheme, secret=secret)) def subscribe(topic): tid = next_id() - onCompletion[tid] = { + add_future(tid, { 'arg': topic, 'action': lambda topicName, unused: add_subscription(topicName), - } + }) return pb.ClientMsg(sub=pb.ClientSub(id=tid, topic=topic)) def leave(topic): tid = next_id() - onCompletion[tid] = { + add_future(tid, { 'arg': topic, 'action': lambda topicName, unused: del_subscription(topicName) - } + }) return pb.ClientMsg(leave=pb.ClientLeave(id=tid, topic=topic)) def publish(topic, text): @@ -154,6 +168,8 @@ def init_server(listen): return server def init_client(addr, schema, secret, cookie_file_name): + print("Connecting to server at", addr) + channel = grpc.insecure_channel(addr) stub = pbx.NodeStub(channel) # Call the server @@ -172,7 +188,7 @@ def client_message_loop(stream): # print("in:", msg) if msg.HasField("ctrl"): # Run code on command completion - exec_future(msg.ctrl.id, msg.ctrl.code, msg.ctrl.params) + exec_future(msg.ctrl.id, msg.ctrl.code, msg.ctrl.text, msg.ctrl.params) # print(str(msg.ctrl.code) + " " + msg.ctrl.text) elif msg.HasField("data"): diff --git a/server/hdl_grpc.go b/server/hdl_grpc.go index 1ca4a1d85..cf2d27839 100644 --- a/server/hdl_grpc.go +++ b/server/hdl_grpc.go @@ -23,7 +23,9 @@ type grpcNodeServer struct { func (sess *Session) closeGrpc() { if sess.proto == GRPC { + sess.lock.Lock() sess.grpcnode = nil + sess.lock.Unlock() } } @@ -39,7 +41,7 @@ func (*grpcNodeServer) MessageLoop(stream pbx.Node_MessageLoopServer) error { go sess.writeGrpcLoop() - for sess.grpcnode != nil { + for { in, err := stream.Recv() if err == io.EOF { return nil @@ -49,6 +51,13 @@ func (*grpcNodeServer) MessageLoop(stream pbx.Node_MessageLoopServer) error { } log.Println("grpc in:", truncateStringIfTooLong(in.String())) sess.dispatch(pbCliDeserialize(in)) + + sess.lock.Lock() + if sess.grpcnode == nil { + sess.lock.Unlock() + break + } + sess.lock.Unlock() } return nil diff --git a/server/pres.go b/server/pres.go index c64d82684..a2abca341 100644 --- a/server/pres.go +++ b/server/pres.go @@ -88,7 +88,6 @@ func (t *Topic) loadContacts(uid types.Uid) error { // from the user2; "+rem" means the subscription is removed. "+dis" is the opposite of "en". // The "+en/rem/dis" command itself is stripped from the notification. func (t *Topic) presProcReq(fromUserID, what string, wantReply bool) string { - if t.isSuspended() { return "" } @@ -119,12 +118,12 @@ func (t *Topic) presProcReq(fromUserID, what string, wantReply bool) string { // offline cmd = "rem" case "?unkn": - // offline + // no change in online status + online = nil reqReply = true what = "" default: // All other notifications are not processed here - // log.Println("done processing what=", what) return what } @@ -136,7 +135,7 @@ func (t *Topic) presProcReq(fromUserID, what string, wantReply bool) string { if cmd == "rem" { replyAs = "off+rem" if !psd.enabled { - // If it was disabled before, don't send a redundunt update. + // If it was disabled before, don't send a redundant update. what = "" } delete(t.perSubs, fromUserID) diff --git a/server/session.go b/server/session.go index 425c95aa1..b95b40d5e 100644 --- a/server/session.go +++ b/server/session.go @@ -104,8 +104,8 @@ type Session struct { // Session ID sid string - // Needed for long polling. - lpLock sync.Mutex + // Needed for long polling and grpc. + lock sync.Mutex } // Subscription is a mapper of sessions to topics. @@ -224,8 +224,8 @@ func (s *Session) dispatch(msg *ClientComMessage) { // Locking-unlocking is needed for long polling: the client may issue multiple requests in parallel. // Should not affect performance if s.proto == LPOLL { - s.lpLock.Lock() - defer s.lpLock.Unlock() + s.lock.Lock() + defer s.lock.Unlock() } var resp *ServerComMessage diff --git a/server/utils.go b/server/utils.go index ab5302842..c1f05614f 100644 --- a/server/utils.go +++ b/server/utils.go @@ -345,30 +345,36 @@ func parseVersion(vers string) int { var err error dot := strings.Index(vers, ".") - if dot >= 0 { - major, err = strconv.Atoi(vers[:dot]) - } else { + if dot < 0 { major, err = strconv.Atoi(vers) + if err != nil || major > 0x1fff || major < 0 { + return 0 + } + return major << 16 } + + major, err = strconv.Atoi(vers[:dot]) if err != nil { return 0 } - dot2 := strings.IndexFunc(vers[dot+1:], func(r rune) bool { + vers = vers[dot+1:] + dot2 := strings.IndexFunc(vers, func(r rune) bool { return !unicode.IsDigit(r) }) + if dot2 > 0 { - minor, err = strconv.Atoi(vers[dot+1 : dot2]) + minor, err = strconv.Atoi(vers[:dot2]) // Ignoring the error here trailer, _ = strconv.Atoi(vers[dot2+1:]) - } else { - minor, err = strconv.Atoi(vers[dot+1:]) + } else if len(vers) > 0 { + minor, err = strconv.Atoi(vers) } if err != nil { return 0 } - if major < 0 || minor < 0 || trailer < 0 || minor >= 0xff || trailer >= 0xff { + if major < 0 || minor < 0 || trailer < 0 || major > 0x1fff || minor >= 0xff || trailer >= 0xff { return 0 }