Skip to content

kills agents gracefully #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ __pycache__/

*.pyc
*.pyc
*.DS_Store
13 changes: 9 additions & 4 deletions src/main/nluas/Transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class TransportSecurityError(TransportError):
######################################################################
#
# The main class for Transport. On creation, sets up a thread to
# listen for incoming messages.
# listen for incoming messages.
#

class Transport():
Expand All @@ -107,6 +107,11 @@ def send(self, dest, ntuple):
self._pyre.shout(dest, json.dumps(ntuple).encode('utf-8'))
# send()

def broadcast(self, ntuple):
'''Send given ntuple to Transport all destinations. If the destination isn't listening then the message will (currently) be silently ignored.'''
self._pyre.shout(self._globalchannel, json.dumps(ntuple).encode('utf-8'))
# broadcast()

# Notes on subscribe
#
# The callback is called in the same thread that listens for pyre
Expand Down Expand Up @@ -150,7 +155,7 @@ def unsubscribe_all(self):
# unsubscribe_all()

# Notes on get()
#
#
# If you already subscribe to remote, temporarly overrides
# the subscribe. The subscribed callback will NOT be called.
# The subscription is replaced after get() returns.
Expand Down Expand Up @@ -187,7 +192,7 @@ def get_callback(tup, **kw):

# Set the subscription
self._subscribers[remote] = get_callback

# Wait for the callback to be called.
e.wait()

Expand Down Expand Up @@ -225,7 +230,7 @@ def __init__(self, myname, port=None, prefix=None):

# dict of remote name to callback. See subscribe method above.
self._subscribers = {}

# Callback for all message (or None if none registered)
self._subscribe_all = None

Expand Down
20 changes: 9 additions & 11 deletions src/main/nluas/app/core_solver.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""
Simple solver "core". Contains capabilities for unpacking
a JSON n-tuple, as well as routing this n-tuple based
on the predicate_type (command, query, assertion, etc.).
Other general capabilities can be added. The design
is general enough that the same "unpacking" and "routing"
Simple solver "core". Contains capabilities for unpacking
a JSON n-tuple, as well as routing this n-tuple based
on the predicate_type (command, query, assertion, etc.).
Other general capabilities can be added. The design
is general enough that the same "unpacking" and "routing"
method can be used, as long as a new method is written for a given
predicate_type.
predicate_type.

"Route_action" can be called by command/query/assertion methods,
to route each parameter to the task-specific method. E.g., "solve_move",
Expand Down Expand Up @@ -56,7 +56,6 @@ def __init__(self, args):
self.eventFeatures=None
self.parameter_templates = OrderedDict()
#self.initialize_templates()



def setup_solver_parser(self):
Expand All @@ -65,6 +64,8 @@ def setup_solver_parser(self):
return parser

def callback(self, ntuple):
if self.is_quit(ntuple):
return self.close()
self.solve(ntuple)

def initialize_templates(self):
Expand Down Expand Up @@ -109,7 +110,7 @@ def solve(self, ntuple):
def broadcast(self):
""" Here, does nothing. Later, an AgentSolver will broadcast information back to BossSolver. """
pass

def update_world(self, discovered=[]):
for item in discovered:
self.world.append(item)
Expand Down Expand Up @@ -189,9 +190,6 @@ def route_dispatch(self, dispatch_function, parameters):
""" Simply runs dispatch_function on PARAMETERS. """
return dispatch_function(parameters)

def close(self):
return

def check_for_clarification(self, ntuple):
""" Will need to be replaced by a process that checks whether ntuple needs clarification.
Requires some sort of context/world model. """
Expand Down
34 changes: 26 additions & 8 deletions src/main/nluas/core_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Author: seantrott <seantrott@icsi.berkeley.edu>

Defines a CoreAgent, which uses the Transport module. Can be initialized
by just feeding it a channel name. All "Agents" inherit from the CoreAgent.
by just feeding it a channel name. All "Agents" inherit from the CoreAgent.

------
See LICENSE.txt for licensing information.
Expand All @@ -14,6 +14,8 @@
import os
import sys
import logging
import json
import time

from collections import OrderedDict

Expand Down Expand Up @@ -52,7 +54,7 @@ def read_templates(self, filename):
return base

def unify_templates(self, child, parent):
""" Unifies a child and parent template. Adds all parent key-value pairs
""" Unifies a child and parent template. Adds all parent key-value pairs
unless the key already exists in the child. """
child.update({key:value for (key, value) in parent.items() if key not in child})
return child
Expand All @@ -69,6 +71,8 @@ def initialize(self, args):
self.logfile = args.logfile
self.loglevel = args.loglevel
self.logagent = args.logagent
self._keep_alive = True
self._broadcasted = False

def setup_parser(self):
parser = argparse.ArgumentParser()
Expand All @@ -78,17 +82,31 @@ def setup_parser(self):
parser.add_argument("-logagent", type=str, help="indicate agent responsible for logging output")
return parser

def close(self):
#self.transport.join()
print("Transport needs a QUIT procedure.")
sys.exit()
def close(self, quit_federation=False):
if not self._broadcasted:
self._broadcasted = True
self.transport.broadcast({"text": "QUIT", "type": "QUIT"}) # application-level quit

if quit_federation:
time.sleep(0.5)
self.transport.quit_federation() # transport-level quit

self._keep_alive = False

def keep_alive(self, func=None):
while self._keep_alive:
if func:
func()
else:
time.sleep(0.1)

def is_quit(self, ntuple):
""" Checks if an ntuple is the application quit message """
return "type" in ntuple and ntuple["type"] == 'QUIT'

def callback(self, ntuple):
print("{} received {}.".format(self.name, ntuple))

def subscribe_mass(self, ports):
for port in ports:
self.transport.subscribe(port, self.callback)


7 changes: 2 additions & 5 deletions src/main/nluas/language/text_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def prompt(self):
#print("Clarification is: {}".format(self.clarification))
msg = input("> ")
if msg == "q":
self.transport.quit_federation()
quit()
self.close(True)
elif msg == None or msg =="":
pass
else:
Expand All @@ -67,6 +66,4 @@ def output_stream(self, tag, message):

if __name__ == "__main__":
text = TextAgent(sys.argv[1:])
while True:
text.prompt()

text.keep_alive(text.prompt())
26 changes: 7 additions & 19 deletions src/main/nluas/language/user_agent.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

"""
The User-Agent (also called UI-Agent, Agent-UI) receives text/speech
as input, and produces an n-tuple, which it sends to a ProblemSolver.
as input, and produces an n-tuple, which it sends to a ProblemSolver.
It feeds the text through the ECG Analyzer (running on a local server)
to produce a SemSpec, which it then runs through the CoreSpecializer to produce
the n-tuple.
the n-tuple.

Interaction with the user is modulated through the output_stream method, which
allows designers to subclass the User-Agent and define a new mode of interaction.
Expand Down Expand Up @@ -78,7 +78,7 @@ def initialize_UI(self):

def initialize_analyzer(self):
self.analyzer = Analyzer(self.analyzer_port)

def initialize_specializer(self):
try:
self.specializer=CoreSpecializer(self.analyzer)
Expand Down Expand Up @@ -130,7 +130,7 @@ def speech_callback(self, ntuple):
#ntuple = json.loads(ntuple)
text = ntuple['text'].lower()
print("Got {}".format(text))
new_ntuple = self.process_input(text)
new_ntuple = self.process_input(text)
if new_ntuple and new_ntuple != "null" and "predicate_type" in new_ntuple:
self.transport.send(self.solve_destination, new_ntuple)

Expand All @@ -141,7 +141,9 @@ def text_callback(self, ntuple):
specialize = True
#ntuple = json.loads(ntuple)
msg = ntuple['text']
if ntuple['type'] == "standard":
if self.is_quit(ntuple):
self.close()
elif ntuple['type'] == "standard":
if msg == None or msg == "":
specialize = False
elif msg.lower() == "d":
Expand All @@ -159,7 +161,6 @@ def text_callback(self, ntuple):
self.clarification = False



def callback(self, ntuple):
print(ntuple)
#ntuple = self.decoder.convert_JSON_to_ntuple(ntuple)
Expand All @@ -182,7 +183,6 @@ def write_file(self, json_ntuple, msg):




def process_clarification(self, tag, msg, ntuple):
self.clarification = True
#self.output_stream(tag, msg)
Expand All @@ -203,15 +203,6 @@ def clarify_ntuple(self, ntuple, descriptor):
new[key] = value
return new


def prompt(self):
while True:
s = input("> ")
if s == "q":
self.transport.quit_federation()
quit()


def check_spelling(self, msg):
table = self.spell_checker.spell_check(msg)
if table:
Expand All @@ -229,6 +220,3 @@ def check_spelling(self, msg):

if __name__ == "__main__":
ui = UserAgent(sys.argv[1:])
ui.prompt()