Skip to content

Commit

Permalink
better version checking, race in grpc, redundant "on" pres update
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Aug 31, 2018
1 parent 8779312 commit f900add
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 26 deletions.
34 changes: 25 additions & 9 deletions chatbot/chatbot.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""Python implementation of a Tinode chatbot."""

# For compatibility between python 2 and 3
from __future__ import print_function

import argparse
import base64
from concurrent import futures
Expand All @@ -23,23 +26,26 @@
from tinode_grpc import pbx

APP_NAME = "Tino-chatbot"
APP_VERSION = "1.0"
APP_VERSION = "1.1"
LIB_VERSION = pkg_resources.get_distribution("tinode_grpc").version

# Dictionary wich contains lambdas to be executed when server response is received
onCompletion = {}

# Add bundle for future execution
def add_future(tid, bundle):
onCompletion[tid] = bundle

# Resolve or reject the future
def exec_future(tid, code, params):
def exec_future(tid, code, text, params):
bundle = onCompletion.get(tid)
if bundle != None:
del onCompletion[tid]
if code >= 200 and code < 400:
arg = bundle.get('arg')
bundle.get('action')(arg, params)
else:
print("Error:", code, text)

# List of active subscriptions
subscriptions = {}
Expand All @@ -49,6 +55,11 @@ def add_subscription(topic):
def del_subscription(topic):
subscriptions.pop(topic, None)

def server_version(params):
if params == None:
return
print("Server:", params['build'], params['ver'])

# Quotes from the fortune cookie file
quotes = []

Expand Down Expand Up @@ -108,32 +119,35 @@ def client_reset():

def hello():
tid = next_id()
add_future(tid, {
'action': lambda unused, params: server_version(params),
})
return pb.ClientMsg(hi=pb.ClientHi(id=tid, user_agent=APP_NAME + "/" + APP_VERSION + " (" +
platform.system() + "/" + platform.release() + "); gRPC-python/" + LIB_VERSION,
ver=LIB_VERSION, lang="EN"))

def login(cookie_file_name, scheme, secret):
tid = next_id()
onCompletion[tid] = {
add_future(tid, {
'arg': cookie_file_name,
'action': lambda fname, params: save_auth_cookie(fname, params),
}
})
return pb.ClientMsg(login=pb.ClientLogin(id=tid, scheme=scheme, secret=secret))

def subscribe(topic):
tid = next_id()
onCompletion[tid] = {
add_future(tid, {
'arg': topic,
'action': lambda topicName, unused: add_subscription(topicName),
}
})
return pb.ClientMsg(sub=pb.ClientSub(id=tid, topic=topic))

def leave(topic):
tid = next_id()
onCompletion[tid] = {
add_future(tid, {
'arg': topic,
'action': lambda topicName, unused: del_subscription(topicName)
}
})
return pb.ClientMsg(leave=pb.ClientLeave(id=tid, topic=topic))

def publish(topic, text):
Expand All @@ -154,6 +168,8 @@ def init_server(listen):
return server

def init_client(addr, schema, secret, cookie_file_name):
print("Connecting to server at", addr)

channel = grpc.insecure_channel(addr)
stub = pbx.NodeStub(channel)
# Call the server
Expand All @@ -172,7 +188,7 @@ def client_message_loop(stream):
# print("in:", msg)
if msg.HasField("ctrl"):
# Run code on command completion
exec_future(msg.ctrl.id, msg.ctrl.code, msg.ctrl.params)
exec_future(msg.ctrl.id, msg.ctrl.code, msg.ctrl.text, msg.ctrl.params)
# print(str(msg.ctrl.code) + " " + msg.ctrl.text)

elif msg.HasField("data"):
Expand Down
11 changes: 10 additions & 1 deletion server/hdl_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ type grpcNodeServer struct {

func (sess *Session) closeGrpc() {
if sess.proto == GRPC {
sess.lock.Lock()
sess.grpcnode = nil
sess.lock.Unlock()
}
}

Expand All @@ -39,7 +41,7 @@ func (*grpcNodeServer) MessageLoop(stream pbx.Node_MessageLoopServer) error {

go sess.writeGrpcLoop()

for sess.grpcnode != nil {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
Expand All @@ -49,6 +51,13 @@ func (*grpcNodeServer) MessageLoop(stream pbx.Node_MessageLoopServer) error {
}
log.Println("grpc in:", truncateStringIfTooLong(in.String()))
sess.dispatch(pbCliDeserialize(in))

sess.lock.Lock()
if sess.grpcnode == nil {
sess.lock.Unlock()
break
}
sess.lock.Unlock()
}

return nil
Expand Down
7 changes: 3 additions & 4 deletions server/pres.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (t *Topic) loadContacts(uid types.Uid) error {
// from the user2; "+rem" means the subscription is removed. "+dis" is the opposite of "en".
// The "+en/rem/dis" command itself is stripped from the notification.
func (t *Topic) presProcReq(fromUserID, what string, wantReply bool) string {

if t.isSuspended() {
return ""
}
Expand Down Expand Up @@ -119,12 +118,12 @@ func (t *Topic) presProcReq(fromUserID, what string, wantReply bool) string {
// offline
cmd = "rem"
case "?unkn":
// offline
// no change in online status
online = nil
reqReply = true
what = ""
default:
// All other notifications are not processed here
// log.Println("done processing what=", what)
return what
}

Expand All @@ -136,7 +135,7 @@ func (t *Topic) presProcReq(fromUserID, what string, wantReply bool) string {
if cmd == "rem" {
replyAs = "off+rem"
if !psd.enabled {
// If it was disabled before, don't send a redundunt update.
// If it was disabled before, don't send a redundant update.
what = ""
}
delete(t.perSubs, fromUserID)
Expand Down
8 changes: 4 additions & 4 deletions server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ type Session struct {
// Session ID
sid string

// Needed for long polling.
lpLock sync.Mutex
// Needed for long polling and grpc.
lock sync.Mutex
}

// Subscription is a mapper of sessions to topics.
Expand Down Expand Up @@ -224,8 +224,8 @@ func (s *Session) dispatch(msg *ClientComMessage) {
// Locking-unlocking is needed for long polling: the client may issue multiple requests in parallel.
// Should not affect performance
if s.proto == LPOLL {
s.lpLock.Lock()
defer s.lpLock.Unlock()
s.lock.Lock()
defer s.lock.Unlock()
}

var resp *ServerComMessage
Expand Down
22 changes: 14 additions & 8 deletions server/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,30 +345,36 @@ func parseVersion(vers string) int {
var err error

dot := strings.Index(vers, ".")
if dot >= 0 {
major, err = strconv.Atoi(vers[:dot])
} else {
if dot < 0 {
major, err = strconv.Atoi(vers)
if err != nil || major > 0x1fff || major < 0 {
return 0
}
return major << 16
}

major, err = strconv.Atoi(vers[:dot])
if err != nil {
return 0
}

dot2 := strings.IndexFunc(vers[dot+1:], func(r rune) bool {
vers = vers[dot+1:]
dot2 := strings.IndexFunc(vers, func(r rune) bool {
return !unicode.IsDigit(r)
})

if dot2 > 0 {
minor, err = strconv.Atoi(vers[dot+1 : dot2])
minor, err = strconv.Atoi(vers[:dot2])
// Ignoring the error here
trailer, _ = strconv.Atoi(vers[dot2+1:])
} else {
minor, err = strconv.Atoi(vers[dot+1:])
} else if len(vers) > 0 {
minor, err = strconv.Atoi(vers)
}
if err != nil {
return 0
}

if major < 0 || minor < 0 || trailer < 0 || minor >= 0xff || trailer >= 0xff {
if major < 0 || minor < 0 || trailer < 0 || major > 0x1fff || minor >= 0xff || trailer >= 0xff {
return 0
}

Expand Down

0 comments on commit f900add

Please sign in to comment.