Skip to content

Add enhanced message and its corresponding error-handling, resent mechanism Fix #61. #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 23, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,68 @@ of the Payload constructor.
payload = Payload(alert="Hello World!", custom={'sekrit_number':123})
```

### Enhanced Message with immediate error-response
```python
apns_enhanced = APNs(use_sandbox=True, cert_file='apns.pem', enhanced=True)
```

Send a notification. note that `identifer` is the information to indicate which message has error in error-response payload.
```python
token_hex = 'b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b87'
payload = Payload(alert="Hello World!", sound="default", badge=1)
identifier = random.getrandbits(32)
apns_enhanced.gateway_server.send_notification(token_hex, payload, identifier=identifier)
```

Callback when error-response occur, with parameter {'status': 8, 'identifier': 1}.
[Status code reference](https://developer.apple.com/library/ios/documentation/NetworkingInternet/Conceptual/RemoteNotificationsPG/Chapters/CommunicatingWIthAPS.html#//apple_ref/doc/uid/TP40008194-CH101-SW4)
```python
def response_listener(error_response):
_logger.debug("client get error-response: " + str(error_response))

apns_enhanced.gateway_server.register_response_listener(response_listener)
```

Manually close thread of reading error-response if you want discard future error-responses.
```python
apns_enhanced.gateway_server.close_read_thread()
```

Extra log messages when error-response occur, auto-resent afterwards.

got error-response from APNS:(8, 1)
rebuilding connection to APNS
resending 9 notifications to APNS
resending notification with id:2 to APNS
resending notification with id:3 to APNS
resending notification with id:4 to APNS

Caveats:

* Currently support single notification only

Problem Addressed ([Reference to Redth](http://redth.codes/the-problem-with-apples-push-notification-ser/)):

* Async response of error response and response time varies from 0.1 ~ 0.8 secs by observation
* Sent success do not response, which means client cannot always expect for response.
* Async close write stream connection after error-response.
* All notification sent after failed notification are discarded, the responding error-response and closing client's write connection will be delayed
* Sometimes APNS close socket connection arbitrary

Solution:

* Non-blocking ssl socket connection to send notification without waiting for response.
* A separate thread for constantly checking error-response from read connection.
* A sent notification buffer used for re-sending notification that were sent after failed notification, or arbitrary connection close by apns.
* Reference to [non-blocking apns pull request by minorblend](https://github.com/djacobs/PyAPNs/pull/25), [enhanced message by hagino3000](https://github.com/voyagegroup/apns-proxy-server/blob/065775f87dbf25f6b06f24edc73dc5de4481ad36/apns_proxy_server/worker.py#l164-209)

Result:

* Send notification at throughput of 1000/secs
* In worse case of when 1st notification sent failed, error-response respond after 1 secs and 999 notification sent are discarded by APNS at the mean time, all discarded 999 notifications will be resent without loosing any of them. With the same logic, if notification resent failed, it will resent rest of resent notification after the failed one.

[Test Script](https://gist.github.com/jimhorng/594401f68ce48282ced5)

## Travis Build Status

[![Build Status](https://secure.travis-ci.org/djacobs/PyAPNs.png?branch=master)](http://travis-ci.org/djacobs/PyAPNs)
Expand Down
241 changes: 220 additions & 21 deletions apns.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@
from binascii import a2b_hex, b2a_hex
from datetime import datetime
from socket import socket, timeout, AF_INET, SOCK_STREAM
from socket import error as socket_error
from struct import pack, unpack
import sys

import ssl
import select
import time
import collections, itertools
import logging
try:
from ssl import wrap_socket, SSLError
except ImportError:
Expand All @@ -41,12 +46,52 @@
except ImportError:
import simplejson as json

_logger = logging.getLogger(__name__)

MAX_PAYLOAD_LENGTH = 256

NOTIFICATION_COMMAND = 0
ENHANCED_NOTIFICATION_COMMAND = 1

NOTIFICATION_FORMAT = (
'!' # network big-endian
'B' # command
'H' # token length
'32s' # token
'H' # payload length
'%ds' # payload
)

ENHANCED_NOTIFICATION_FORMAT = (
'!' # network big-endian
'B' # command
'I' # identifier
'I' # expiry
'H' # token length
'32s' # token
'H' # payload length
'%ds' # payload
)

ERROR_RESPONSE_FORMAT = (
'!' # network big-endian
'B' # command
'B' # status
'I' # identifier
)

TOKEN_LENGTH = 32
ERROR_RESPONSE_LENGTH = 6
DELAY_RESEND_SECS = 0.0
SENT_BUFFER_QTY = 3000

ER_STATUS = 'status'
ER_IDENTIFER = 'identifier'

class APNs(object):
"""A class representing an Apple Push Notification service connection"""

def __init__(self, use_sandbox=False, cert_file=None, key_file=None):
def __init__(self, use_sandbox=False, cert_file=None, key_file=None, enhanced=False):
"""
Set use_sandbox to True to use the sandbox (test) APNs servers.
Default is False.
Expand All @@ -57,6 +102,7 @@ def __init__(self, use_sandbox=False, cert_file=None, key_file=None):
self.key_file = key_file
self._feedback_connection = None
self._gateway_connection = None
self.enhanced = enhanced

@staticmethod
def packed_uchar(num):
Expand Down Expand Up @@ -93,6 +139,13 @@ def unpacked_uint_big_endian(bytes):
Returns an unsigned int from a packed big-endian (network) byte array
"""
return unpack('>I', bytes)[0]

@staticmethod
def unpacked_char_big_endian(bytes):
"""
Returns an unsigned char from a packed big-endian (network) byte array
"""
return unpack('c', bytes)[0]

@property
def feedback_server(self):
Expand All @@ -110,7 +163,8 @@ def gateway_server(self):
self._gateway_connection = GatewayConnection(
use_sandbox = self.use_sandbox,
cert_file = self.cert_file,
key_file = self.key_file
key_file = self.key_file,
enhanced = self.enhanced
)
return self._gateway_connection

Expand All @@ -119,13 +173,15 @@ class APNsConnection(object):
"""
A generic connection class for communicating with the APNs
"""
def __init__(self, cert_file=None, key_file=None, timeout=None):
def __init__(self, cert_file=None, key_file=None, timeout=None, enhanced=False):
super(APNsConnection, self).__init__()
self.cert_file = cert_file
self.key_file = key_file
self.timeout = timeout
self._socket = None
self._ssl = None
self.enhanced = enhanced
self.connection_alive = False

def __del__(self):
self._disconnect();
Expand All @@ -145,33 +201,63 @@ def _connect(self):
except:
raise

# Fallback for 'SSLError: _ssl.c:489: The handshake operation timed out'
for i in xrange(3):
try:
self._ssl = wrap_socket(self._socket, self.key_file, self.cert_file)
break
except SSLError, ex:
if ex.args[0] == SSL_ERROR_WANT_READ:
sys.exc_clear()
elif ex.args[0] == SSL_ERROR_WANT_WRITE:
sys.exc_clear()
else:
raise
if self.enhanced:
self._socket.setblocking(False)
self._ssl = wrap_socket(self._socket, self.key_file, self.cert_file,
do_handshake_on_connect=False)
while True:
try:
self._ssl.do_handshake()
self.connection_alive = True
break
except ssl.SSLError, err:
if ssl.SSL_ERROR_WANT_READ == err.args[0]:
select.select([self._ssl], [], [])
elif ssl.SSL_ERROR_WANT_WRITE == err.args[0]:
select.select([], [self._ssl], [])
else:
raise
else:
# Fallback for 'SSLError: _ssl.c:489: The handshake operation timed out'
for i in xrange(3):
try:
self._ssl = wrap_socket(self._socket, self.key_file, self.cert_file)
break
except SSLError, ex:
if ex.args[0] == SSL_ERROR_WANT_READ:
sys.exc_clear()
elif ex.args[0] == SSL_ERROR_WANT_WRITE:
sys.exc_clear()
else:
raise

def _disconnect(self):
if self._socket:
self._socket.close()
self._connection().close()
self.connection_alive = False

def _connection(self):
if not self._ssl:
self._connect()
return self._ssl

def _reconnect(self):
_logger.info("rebuilding connection to APNS")
self._disconnect()
self._connect()

def read(self, n=None):
return self._connection().read(n)

def write(self, string):
return self._connection().write(string)
# return self._connection().write(string)
if self.enhanced: # nonblocking socket
_, wlist, _ = select.select([], [self._connection()], [])
if len(wlist) > 0:
self._connection().sendall(string)
else: # blocking socket
return self._connection().write(string)


class PayloadAlert(object):
Expand Down Expand Up @@ -358,6 +444,16 @@ def __init__(self, use_sandbox=False, **kwargs):
'gateway.push.apple.com',
'gateway.sandbox.push.apple.com')[use_sandbox]
self.port = 2195
if self.enhanced == True: #start error-response monitoring thread
import threading
self._sent_notifications = collections.deque(maxlen=SENT_BUFFER_QTY)
self._send_lock = threading.RLock()
self._close_read_thread = False
self._read_error_response_worker = threading.Thread(target=self._read_error_response)
self._read_error_response_worker.start()
self._is_resending = False
self._last_resent_qty = 10
self._response_listener = None

def _get_notification(self, token_hex, payload):
"""
Expand All @@ -377,9 +473,112 @@ def _get_notification(self, token_hex, payload):

return notification

def send_notification(self, token_hex, payload):
self.write(self._get_notification(token_hex, payload))
def _get_enhanced_notification(self, token_hex, payload, identifier, expiry):
"""
form notification data in an enhanced format
"""
token = a2b_hex(token_hex)
payload = payload.json()
fmt = ENHANCED_NOTIFICATION_FORMAT % len(payload)
notification = pack(fmt, ENHANCED_NOTIFICATION_COMMAND, identifier, expiry,
TOKEN_LENGTH, token, len(payload), payload)
return notification

def send_notification(self, token_hex, payload, identifier=0, expiry=0):
"""
in enhanced mode, send_notification may return error response from APNs if any
"""
if self.enhanced:
self._wait_resending(30)
with self._send_lock:
message = self._get_enhanced_notification(token_hex, payload,
identifier, expiry)
self._sent_notifications.append(dict({'id': identifier, 'message': message}))
try:
self.write(message)
except socket_error as e:
_logger.info("sending notification with id:" + str(identifier) + " to APNS failed: " + str(type(e)) + ": " + str(e))

else:
self.write(self._get_notification(token_hex, payload))

def _wait_resending(self, timeout):
"""
timeout: in seconds
"""
elapsed = 0.0
interval = 0.01
while elapsed < timeout:
if not self._is_resending:
break
time.sleep(interval)
elapsed += interval

def send_notification_multiple(self, frame):
self.write(str(frame))

return self.write(frame.get_frame())

def register_response_listener(self, response_listener):
self._response_listener = response_listener

def close_read_thread(self):
self._close_read_thread = True

def _read_error_response(self):
while not self._close_read_thread:
while not self.connection_alive:
time.sleep(0.1)

rlist, _, _ = select.select([self._connection()], [], [], 1)

if len(rlist) > 0: # there's error response from APNs
try:
buff = self.read(ERROR_RESPONSE_LENGTH)
except socket_error as e: # APNS close connection arbitrarily
_logger.warning("exception occur when reading APNS error-response: " + str(type(e)) + ": " + str(e)) #DEBUG
self._is_resending = True
with self._send_lock:
self._reconnect()
current_sent_qty = len(self._sent_notifications)
resent_first_idx = max(current_sent_qty - self._last_resent_qty, 0)
self._resend_notification_by_range(resent_first_idx, current_sent_qty)
continue
if len(buff) == ERROR_RESPONSE_LENGTH:
command, status, identifier = unpack(ERROR_RESPONSE_FORMAT, buff)
if 8 == command: # is error response
error_response = (status, identifier)
if self._response_listener:
self._response_listener(Util.convert_error_response_to_dict(error_response))
_logger.info("got error-response from APNS:" + str(error_response))
self._is_resending = True
with self._send_lock:
self._reconnect()
self._resend_notifications_by_id(identifier)

def _resend_notifications_by_id(self, failed_identifier):
fail_idx = Util.getListIndexFromID(self._sent_notifications, failed_identifier)
#pop-out success notifications till failed one
self._resend_notification_by_range(fail_idx+1, len(self._sent_notifications))
return

def _resend_notification_by_range(self, start_idx, end_idx):
self._sent_notifications = collections.deque(itertools.islice(self._sent_notifications, start_idx, end_idx))
self._last_resent_qty = len(self._sent_notifications)
_logger.info("resending " + str(self._last_resent_qty) + " notifications to APNS") #DEBUG
for sent_notification in self._sent_notifications:
_logger.debug("resending notification with id:" + str(sent_notification['id']) + " to APNS") #DEBUG
try:
self.write(sent_notification['message'])
except socket_error as e:
_logger.debug("resending notification with id:" + str(sent_notification['id']) + " failed: " + str(type(e)) + ": " + str(e)) #DEBUG
return
time.sleep(DELAY_RESEND_SECS) #DEBUG
self._is_resending = False

class Util(object):
@classmethod
def getListIndexFromID(this_class, the_list, identifier):
return next(index for (index, d) in enumerate(the_list)
if d['id'] == identifier)
@classmethod
def convert_error_response_to_dict(this_class, error_response_tuple):
return {ER_STATUS: error_response_tuple[0], ER_IDENTIFER: error_response_tuple[1]}