Skip to content

Sendmsg support #53

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Dec 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ before_install:
- export PATH="$PWD:$PATH"
- cd ..

# pull and start up freeswitch container bound to 'lo'
# - docker pull safarov/freeswitch
# - docker run -d --net=host -v $TRAVIS_BUILD_DIR/conf/ci-minimal/:/etc/freeswitch safarov/freeswitch
# - docker ps -a
# pull and start up freeswitch container bound to 'lo'
# - docker pull safarov/freeswitch
# - docker run -d --net=host -e SOUND_RATES=8000:16000 -e SOUND_TYPES=music:en-us-callie -v freeswitch-sounds:/usr/share/freeswitch/sounds -v $TRAVIS_BUILD_DIR/conf/ci-minimal/:/etc/freeswitch safarov/freeswitch
# - docker ps -a

install:
# - pip install -U tox
- cd $TRAVIS_BUILD_DIR
- pip install . -r requirements-test.txt
# - pip install -U tox
- cd $TRAVIS_BUILD_DIR
- pip install . -r requirements-test.txt

# NOTE: no support for pandas/pytables yet
script:
- pytest --use-docker tests/
- pytest --use-docker tests/ -p no:logging
32 changes: 10 additions & 22 deletions switchio/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .utils import ConfigurationError, APIError
from .commands import build_originate_cmd
from . import marks
from .connection import get_connection, ConnectionError
from .connection import get_connection


class Client(object):
Expand Down Expand Up @@ -321,7 +321,7 @@ def _assert_alive(self, listener=None):
return listener

def bgapi(self, cmd, listener=None, callback=None, client_id=None,
**jobkwargs):
sess_uuid=None, **jobkwargs):
'''Execute a non blocking api call and handle it to completion

Parameters
Expand All @@ -337,27 +337,15 @@ def bgapi(self, cmd, listener=None, callback=None, client_id=None,
kwargs passed here.
'''
listener = self._assert_alive(listener)
# block the event loop while we insert our job
listener.block_jobs()
con = listener.event_loop._con
try:
ev = con.bgapi(cmd)
if ev:
bj = listener.register_job(
ev, callback=callback,
client_id=client_id or self._id,
con=self._con,
**jobkwargs
)
else:
if not con.connected():
raise ConnectionError("local connection down on '{}'!?"
.format(con.host))
else:
raise APIError("bgapi cmd failed?!\n{}".format(cmd))
finally:
# wakeup the listener's event loop
listener.unblock_jobs()
future = con.bgapi(cmd)
bj = listener.register_job(
future=future, callback=callback,
client_id=client_id or self._id,
con=self._con,
sess_uuid=sess_uuid,
**jobkwargs
)
return bj

def originate(self, dest_url=None,
Expand Down
4 changes: 2 additions & 2 deletions switchio/apps/blockers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def response(self, value):
else: # str case
self.action = partial(Session.hangup, cause=value)

@event_callback("CHANNEL_CREATE")
@callback("CHANNEL_CREATE")
def on_invite(self, sess):
if sess.is_inbound():
self.action(sess)
Expand Down Expand Up @@ -72,7 +72,7 @@ def __setduration__(self, value):
if self.auto_duration:
self.callee_hup_after = value

@event_callback("CHANNEL_CALLSTATE")
@callback("CHANNEL_CALLSTATE")
def on_cs(self, sess):
self.log.debug("'{}' sess CS is '{}'".format(
sess['Call-Direction'], sess['Channel-Call-State']))
Expand Down
2 changes: 1 addition & 1 deletion switchio/apps/call_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def _handle_bj(self, sess, job):
self._report_on_none()

@marks.event_callback("CHANNEL_HANGUP")
def _handle_hangup(self, sess, job):
def _handle_hangup(self, sess):
self._report_on_none()
# if sess.call.sessions and sess.is_outbound():
# # we normally expect that the caller hangs up
Expand Down
2 changes: 1 addition & 1 deletion switchio/apps/measure/cdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def on_originate(self, sess):
def on_answer(self, sess):
sess.times['answer'] = sess.time

@event_callback('CHANNEL_HANGUP')
@event_callback('CHANNEL_DESTROY')
def log_stats(self, sess, job):
"""Append measurement data only once per call
"""
Expand Down
3 changes: 2 additions & 1 deletion switchio/apps/players.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def on_answer(self, sess):

# play infinite tones on calling leg
if sess.is_outbound():
sess.broadcast('playback::{loops=-1}tone_stream://%(251,0,1004)')
sess.playback('tone_stream://%(251,0,1004)',
params={'loops': '-1'})


RecInfo = namedtuple("RecInfo", "host caller callee")
Expand Down
6 changes: 6 additions & 0 deletions switchio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ async def recv_event(self):
queue.task_done()
return event

def execute(self, uuid, app, arg='', params='', loops=1):
"""Execute a dialplan ``app`` with argument ``arg``.
"""
return self.protocol.sendmsg(uuid, 'execute', app, arg, params,
loops=loops)

def api(self, cmd, errcheck=True, block=False, timeout=0.5):
'''Invoke api command (with error checking by default).
'''
Expand Down
150 changes: 61 additions & 89 deletions switchio/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,14 @@ def __init__(
self.sessions_per_app = Counter()
self.max_limit = max_limit
self.call_tracking_header = call_tracking_header
# job synchronization
self._lookup_blocker = mp.Event() # used block event loop temporarily
self._lookup_blocker.set()
# state reset
self.reset()

# add default handlers
for evname, cbtype, cb in get_callbacks(self, only='handler'):
self.event_loop.add_handler(evname, cb)

def register_job(self, event, **kwargs):
def register_job(self, future, **kwargs):
'''Register for a job to be handled when the appropriate event arrives.
Once an event corresponding to the job is received, the bgjob event
handler will 'consume' it and invoke its callback.
Expand All @@ -82,29 +79,10 @@ def register_job(self, event, **kwargs):
-------
bj : an instance of Job (a background job)
'''
bj = Job(event, **kwargs)
bj = Job(future, **kwargs)
self.bg_jobs[bj.uuid] = bj
return bj

def block_jobs(self):
'''Block the event loop from processing
background job events (useful for registering for
job events - see `self.register_job`)

WARNING
-------
This will block the event loop thread permanently starting on the next
received background job event. Be sure to run 'unblock_jobs'
immediately after registering your job.
'''
self._lookup_blocker.clear()

def unblock_jobs(self):
'''Unblock the event loop from processing
background job events
'''
self._lookup_blocker.set()

def count_jobs(self):
return len(self.bg_jobs)

Expand All @@ -131,19 +109,27 @@ def reset(self):
self.failed_jobs = Counter()
self.total_answered_sessions = 0

@handler('CHANNEL_HANGUP')
@handler('CHANNEL_PARK')
@handler('CALL_UPDATE')
def lookup_sess(self, e):
"""The most basic handler template which looks up the locally tracked
session corresponding to event `e` and updates it with event data
"""
uuid = e.get('Unique-ID')
sess = self.sessions.get(uuid, False)
sess = self.sessions.get(e.get('Unique-ID'), False)
if sess:
sess.update(e)
return True, sess
return False, None

def lookup_sess_and_job(self, e):
"""Look up and return the session and any corresponding background job.
"""
consumed, sess = self.lookup_sess(e)
if consumed:
return True, sess, sess.bg_job
return False, None, None

@handler('LOG')
def _handle_log(self, e):
self.log.info(e.get('Body'))
Expand Down Expand Up @@ -173,78 +159,63 @@ def _handle_bj(self, e):
err = '-ERR'
job_uuid = e.get('Job-UUID')
body = e.get('Body')

# always report errors even for jobs which we aren't tracking
if err in body:
resp = body.strip(err).strip()
error = True
self.log.debug("job '{}' failed with:\n{}".format(
job_uuid, str(body)))
elif ok in body:
resp = body.strip(ok + '\n')

if job_uuid in self.bg_jobs:
job = self.bg_jobs.get(job_uuid, None)
job = self.bg_jobs.get(job_uuid, None)
if not job:
job = Job(event=e)
else:
# might be in the middle of inserting a job
self._lookup_blocker.wait()
job = self.bg_jobs.get(job_uuid, None)
job.events.update(e)

# if this job is registered, process it
if job:
job.update(e)
consumed = True
# attempt to lookup an associated session
sess = self.sessions.get(job.sess_uuid or resp, None)

if error:
# if the job returned an error, report it and remove the job
if error:
# if this job corresponds to a tracked session then
# remove it as well
self.log.error(
"Job '{}' corresponding to session '{}'"
" failed with:\n{}".format(
job_uuid,
job.sess_uuid, str(body))
)
job.fail(resp) # fail the job
# always pop failed jobs
self.bg_jobs.pop(job_uuid)
# append the id for later lookup and discard?
self.failed_jobs[resp] += 1
consumed = True

else: # OK case
if sess:
# special case: the bg job event returns a known originated
# session's (i.e. pre-registered) uuid in its body
if job.sess_uuid:
self.log.error(
"Job '{}' corresponding to session '{}'"
" failed with:\n{}".format(
job_uuid,
job.sess_uuid, str(body))
)
# session may already have been popped in hangup handler?
# TODO make a special method for popping sessions?
sess = self.sessions.pop(job.sess_uuid, None)
if sess:
# remove any call containing this session
call = sess.call
if call:
call = self.calls.pop(call.uuid, None)
else:
self.log.debug("No Call containing Session "
"'{}'".format(sess.uuid))
else:
self.log.warn("No session corresponding to bj '{}'"
.format(job_uuid))
job.fail(resp) # fail the job
# always pop failed jobs
self.bg_jobs.pop(job_uuid)
# append the id for later lookup and discard?
self.failed_jobs[resp] += 1

# success, associate with any related session
elif ok in body:
resp = body.strip(ok + '\n')

# special case: the bg job event returns an originated
# session's uuid in its body
sess = self.sessions.get(resp, None)
if sess:
if job.sess_uuid:
assert str(job.sess_uuid) == str(resp), \
("""Session uuid '{}' <-> BgJob uuid '{}' mismatch!?
""".format(job.sess_uuid, resp))

# reference this job in the corresponding session
# self.sessions[resp].bg_job = job
sess.bg_job = job
self.log.debug("Job '{}' was sucessful".format(
job_uuid))
# run the job's callback
job(resp)

assert str(job.sess_uuid) == str(resp), \
("""Session uuid '{}' <-> BgJob uuid '{}' mismatch!?
""".format(job.sess_uuid, resp))

# reference this job in the corresponding session
# self.sessions[resp].bg_job = job
sess.bg_job = job
self.log.debug("Job '{}' was sucessful".format(
job_uuid))
consumed = True
else:
self.log.warning("Received unexpected job message:\n{}"
.format(body))
self.log.warn("No session corresponding to bj '{}'"
.format(job_uuid))

# run the job's callback
job(resp)

return consumed, sess, job

@handler('CHANNEL_CREATE')
Expand Down Expand Up @@ -321,9 +292,10 @@ def _handle_answer(self, e):
self.log.warn('Skipping answer of {}'.format(uuid))
return False, None

@handler('CHANNEL_HANGUP')
def _handle_hangup(self, e):
'''Handle hangup events
@handler('CHANNEL_DESTROY')
# @handler('CHANNEL_HANGUP_COMPLETE') # XXX: a race between these two...
def _handle_destroy(self, e):
'''Handle channel destroy events.

Returns
-------
Expand Down
4 changes: 2 additions & 2 deletions switchio/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def _launch_bg_loop(self, debug=False):
self._thread.daemon = True # die with parent
self._thread.start()

def connect(self, loop=None, timeout=3, **conn_kwargs):
def connect(self, loop=None, timeout=3, debug=False, **conn_kwargs):
'''Initialize underlying receive connection.
'''
# TODO: once we remove SWIG/py27 support this check can be removed
Expand All @@ -165,7 +165,7 @@ def connect(self, loop=None, timeout=3, **conn_kwargs):
return

if not self.is_alive():
self._launch_bg_loop()
self._launch_bg_loop(debug=debug)
while not self.loop:
time.sleep(0.1)

Expand Down
Loading