Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,41 @@ dist: xenial
sudo: required
matrix:
include:
- os: linux
python: "2.7"
dist: trusty
script:
- pytest
- python ./setup.py check -r -s
- pylint --ignore=async_ops azure.eventhub
- os: linux
python: "3.4"
dist: trusty
script:
- pytest
- python ./setup.py check -r -s
- pylint --ignore=async_ops azure.eventhub
- os: linux
python: "3.5"
script:
- pytest
- python ./setup.py check -r -s
- pylint azure.eventhub
- pylint azure.eventprocessorhost
- os: linux
python: "3.6"
script:
- pytest
- python ./setup.py check -r -s
- pylint azure.eventhub
- pylint azure.eventprocessorhost
- os: linux
python: "3.7"
script:
- pytest
- python ./setup.py check -r -s
- pylint azure.eventhub
- pylint azure.eventprocessorhost
install:
- pip install -r dev_requirements.txt
- pip install -e .
script:
- pytest
- python ./setup.py check -r -s
- pylint azure.eventhub
- pylint azure.eventprocessorhost
7 changes: 7 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
Release History
===============

1.2.0
+++++

- Support for Python 2.7 in azure.eventhub module (azure.eventprocessorhost will not support Python 2.7).
- Parse EventData.enqueued_time as a UTC timestamp (issue #72, thanks @vjrantal)


1.1.1 (2019-10-03)
++++++++++++++++++

Expand Down
11 changes: 0 additions & 11 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,6 @@ Wheels are provided for all major operating systems, so you can install directly
$ pip install azure-eventhub
Python 2.7 support
++++++++++++++++++

Python 2.7 will be supported for the synchronous operations in azure.eventhub from v1.2.0.
This is available as a pre-release.

.. code:: shell
$ pip install azure-eventhub --pre
Python 2.7 support is not planned for azure.eventprocessorhost.
Documentation
+++++++++++++
Expand Down
2 changes: 1 addition & 1 deletion azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "1.1.1"
__version__ = "1.2.0"

from azure.eventhub.common import EventData, EventHubError, Offset
from azure.eventhub.client import EventHubClient
Expand Down
5 changes: 1 addition & 4 deletions azure/eventhub/async_ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import asyncio
import time
import datetime
try:
from urllib import urlparse, unquote_plus, urlencode, quote_plus
except ImportError:
from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus
from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus

from uamqp import authentication, constants, types, errors
from uamqp import (
Expand Down
9 changes: 5 additions & 4 deletions azure/eventhub/async_ops/receiver_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ async def open_async(self):
properties=self.client.create_properties(),
loop=self.loop)
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async()
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)

async def reconnect_async(self):
"""If the Receiver was disconnected from the service with
Expand All @@ -132,8 +132,8 @@ async def reconnect_async(self):
loop=self.loop)
try:
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async()
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
Expand Down Expand Up @@ -163,6 +163,7 @@ async def has_started(self):
Whether the handler has completed all start up processes such as
establishing the connection, session, link and authentication, and
is not ready to process messages.
**This function is now deprecated and will be removed in v2.0+.**
:rtype: bool
"""
Expand Down
5 changes: 3 additions & 2 deletions azure/eventhub/async_ops/sender_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ async def open_async(self):
properties=self.client.create_properties(),
loop=self.loop)
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async() # pylint: disable=protected-access
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)

async def reconnect_async(self):
"""If the Receiver was disconnected from the service with
Expand Down Expand Up @@ -148,6 +148,7 @@ async def has_started(self):
Whether the handler has completed all start up processes such as
establishing the connection, session, link and authentication, and
is not ready to process messages.
**This function is now deprecated and will be removed in v2.0+.**

:rtype: bool
"""
Expand Down
4 changes: 3 additions & 1 deletion azure/eventhub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals

import logging
import datetime
Expand All @@ -10,7 +11,8 @@
import time
import functools
try:
from urllib import urlparse, unquote_plus, urlencode, quote_plus
from urlparse import urlparse
from urllib import unquote_plus, urlencode, quote_plus
except ImportError:
from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus

Expand Down
17 changes: 11 additions & 6 deletions azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals

import datetime
import time
import json

import six

from uamqp import Message, BatchMessage
from uamqp import types, constants, errors
from uamqp.message import MessageHeader, MessageProperties
Expand Down Expand Up @@ -63,6 +66,8 @@ def __init__(self, body=None, batch=None, to_device=None, message=None):
:type body: str, bytes or list
:param batch: A data generator to send batched messages.
:type batch: Generator
:param to_device: An IoT device to route to.
:type to_device: str
:param message: The received message.
:type message: ~uamqp.message.Message
"""
Expand Down Expand Up @@ -94,7 +99,7 @@ def sequence_number(self):
"""
The sequence number of the event data object.
:rtype: int
:rtype: int or long
"""
return self._annotations.get(EventData.PROP_SEQ_NUMBER, None)

Expand All @@ -103,7 +108,7 @@ def offset(self):
"""
The offset of the event data object.
:rtype: int
:rtype: ~azure.eventhub.common.Offset
"""
try:
return Offset(self._annotations[EventData.PROP_OFFSET].decode('UTF-8'))
Expand Down Expand Up @@ -200,13 +205,13 @@ def body_as_str(self, encoding='UTF-8'):
:param encoding: The encoding to use for decoding message data.
Default is 'UTF-8'
:rtype: str
:rtype: str or unicode
"""
data = self.body
try:
return "".join(b.decode(encoding) for b in data)
except TypeError:
return str(data)
return six.text_type(data)
except: # pylint: disable=bare-except
pass
try:
Expand Down Expand Up @@ -269,7 +274,7 @@ def selector(self):
if isinstance(self.value, datetime.datetime):
timestamp = (time.mktime(self.value.timetuple()) * 1000) + (self.value.microsecond/1000)
return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
if isinstance(self.value, int):
if isinstance(self.value, six.integer_types):
return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')

Expand Down Expand Up @@ -310,7 +315,7 @@ def __init__(self, message, details=None):

def _parse_error(self, error_list):
details = []
self.message = error_list if isinstance(error_list, str) else error_list.decode('UTF-8')
self.message = error_list if isinstance(error_list, six.text_type) else error_list.decode('UTF-8')
details_index = self.message.find(" Reference:")
if details_index >= 0:
details_msg = self.message[details_index + 1:]
Expand Down
15 changes: 9 additions & 6 deletions azure/eventhub/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals

import uuid
import logging
import time

from uamqp import types, errors
from uamqp import ReceiveClient, Source
Expand All @@ -14,7 +16,7 @@
log = logging.getLogger(__name__)


class Receiver:
class Receiver(object):
"""
Implements a Receiver.
"""
Expand Down Expand Up @@ -97,16 +99,16 @@ def open(self):
client_name=self.name,
properties=self.client.create_properties())
self._handler.open()
while not self.has_started():
self._handler._connection.work()
while not self._handler.client_ready():
time.sleep(0.05)

def reconnect(self):
"""If the Receiver was disconnected from the service with
a retryable error - attempt to reconnect."""
# pylint: disable=protected-access
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
"password": self.client._auth_config.get("iot_password")}
self._handler.close()
source = Source(self.source)
if self.offset is not None:
Expand All @@ -124,8 +126,8 @@ def reconnect(self):
properties=self.client.create_properties())
try:
self._handler.open()
while not self.has_started():
self._handler._connection.work()
while not self._handler.client_ready():
time.sleep(0.05)
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
self.reconnect()
Expand Down Expand Up @@ -160,6 +162,7 @@ def has_started(self):
Whether the handler has completed all start up processes such as
establishing the connection, session, link and authentication, and
is not ready to process messages.
**This function is now deprecated and will be removed in v2.0+.**

:rtype: bool
"""
Expand Down
9 changes: 6 additions & 3 deletions azure/eventhub/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals

import uuid
import logging
import time

from uamqp import constants, errors
from uamqp import SendClient
Expand All @@ -14,7 +16,7 @@
log = logging.getLogger(__name__)


class Sender:
class Sender(object):
"""
Implements a Sender.
"""
Expand Down Expand Up @@ -88,8 +90,8 @@ def open(self):
client_name=self.name,
properties=self.client.create_properties())
self._handler.open()
while not self.has_started():
self._handler._connection.work() # pylint: disable=protected-access
while not self._handler.client_ready():
time.sleep(0.05)

def reconnect(self):
"""If the Sender was disconnected from the service with
Expand Down Expand Up @@ -144,6 +146,7 @@ def has_started(self):
Whether the handler has completed all start up processes such as
establishing the connection, session, link and authentication, and
is not ready to process messages.
**This function is now deprecated and will be removed in v2.0+.**

:rtype: bool
"""
Expand Down
Loading