Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@
Release History
===============

0.2.0rc2 (2018-07-29)
+++++++++++++++++++++

- **Breaking change** `EventData.offset` will now return an object of type `~uamqp.common.Offset` rather than str.
The original string value can be retrieved from `~uamqp.common.Offset.value`.
- Each sender/receiver will now run in its own independent connection.
- Updated uAMQP dependency to 0.2.0
- Fixed issue with IoTHub clients not being able to retrieve partition information.
- Added support for HTTP proxy settings to both EventHubClient and EPH.
- Added error handling policy to automatically reconnect on retryable error.
- Added keep-alive thread for maintaining an unused connection.


0.2.0rc1 (2018-07-06)
+++++++++++++++++++++

Expand Down
2 changes: 1 addition & 1 deletion azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "0.2.0rc1"
__version__ = "0.2.0rc2"

from azure.eventhub.common import EventData, EventHubError, Offset
from azure.eventhub.client import EventHubClient
Expand Down
72 changes: 26 additions & 46 deletions azure/eventhub/_async/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
import asyncio
import time
import datetime
try:
from urllib import urlparse, unquote_plus, urlencode, quote_plus
except ImportError:
from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus

from uamqp import authentication, constants, types, errors
from uamqp import (
Message,
Source,
ConnectionAsync,
AMQPClientAsync,
SendClientAsync,
Expand All @@ -37,7 +40,7 @@ class EventHubClientAsync(EventHubClient):
sending events to and receiving events from the Azure Event Hubs service.
"""

def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self-use
def _create_auth(self, username=None, password=None): # pylint: disable=no-self-use
"""
Create an ~uamqp.authentication.cbs_auth_async.SASTokenAuthAsync instance to authenticate
the session.
Expand All @@ -49,32 +52,13 @@ def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self
:param password: The shared access key.
:type password: str
"""
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
return authentication.SASLPlain(self.address.hostname, username, password)
return authentication.SASTokenAsync.from_shared_access_key(auth_uri, username, password)

def _create_connection_async(self):
"""
Create a new ~uamqp._async.connection_async.ConnectionAsync instance that will be shared between all
AsyncSender/AsyncReceiver clients.
"""
if not self.connection:
log.info("{}: Creating connection with address={}".format(
self.container_id, self.address.geturl()))
self.connection = ConnectionAsync(
self.address.hostname,
self.auth,
container_id=self.container_id,
properties=self._create_properties(),
debug=self.debug)

async def _close_connection_async(self):
"""
Close and destroy the connection async.
"""
if self.connection:
await self.connection.destroy_async()
self.connection = None
return authentication.SASLPlain(
self.address.hostname, username, password, http_proxy=self.http_proxy)
return authentication.SASTokenAsync.from_shared_access_key(
self.auth_uri, username, password, timeout=60, http_proxy=self.http_proxy)

async def _close_clients_async(self):
"""
Expand All @@ -85,17 +69,13 @@ async def _close_clients_async(self):
async def _wait_for_client(self, client):
try:
while client.get_handler_state().value == 2:
await self.connection.work_async()
await client._handler._connection.work_async() # pylint: disable=protected-access
except Exception as exp: # pylint: disable=broad-except
await client.close_async(exception=exp)

async def _start_client_async(self, client):
try:
await client.open_async(self.connection)
started = await client.has_started()
while not started:
await self.connection.work_async()
started = await client.has_started()
await client.open_async()
except Exception as exp: # pylint: disable=broad-except
await client.close_async(exception=exp)

Expand All @@ -108,9 +88,8 @@ async def _handle_redirect(self, redirects):
redirects = [c.redirected for c in self.clients if c.redirected]
if not all(r.hostname == redirects[0].hostname for r in redirects):
raise EventHubError("Multiple clients attempting to redirect to different hosts.")
self.auth = self._create_auth(redirects[0].address.decode('utf-8'), **self._auth_config)
await self.connection.redirect_async(redirects[0], self.auth)
await asyncio.gather(*[c.open_async(self.connection) for c in self.clients])
self._process_redirect_uri(redirects[0])
await asyncio.gather(*[c.open_async() for c in self.clients])

async def run_async(self):
"""
Expand All @@ -125,7 +104,6 @@ async def run_async(self):
:rtype: list[~azure.eventhub.common.EventHubError]
"""
log.info("{}: Starting {} clients".format(self.container_id, len(self.clients)))
self._create_connection_async()
tasks = [self._start_client_async(c) for c in self.clients]
try:
await asyncio.gather(*tasks)
Expand Down Expand Up @@ -153,18 +131,21 @@ async def stop_async(self):
log.info("{}: Stopping {} clients".format(self.container_id, len(self.clients)))
self.stopped = True
await self._close_clients_async()
await self._close_connection_async()

async def get_eventhub_info_async(self):
"""
Get details on the specified EventHub async.

:rtype: dict
"""
eh_name = self.address.path.lstrip('/')
target = "amqps://{}/{}".format(self.address.hostname, eh_name)
async with AMQPClientAsync(target, auth=self.auth, debug=self.debug) as mgmt_client:
mgmt_msg = Message(application_properties={'name': eh_name})
alt_creds = {
"username": self._auth_config.get("iot_username"),
"password":self._auth_config.get("iot_password")}
try:
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.debug)
await mgmt_client.open_async()
mgmt_msg = Message(application_properties={'name': self.eh_name})
response = await mgmt_client.mgmt_request_async(
mgmt_msg,
constants.READ_OPERATION,
Expand All @@ -180,6 +161,8 @@ async def get_eventhub_info_async(self):
output['partition_count'] = eh_info[b'partition_count']
output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']]
return output
finally:
await mgmt_client.close_async()

def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=300, operation=None, loop=None):
"""
Expand All @@ -201,10 +184,7 @@ def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=30
path = self.address.path + operation if operation else self.address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self.address.hostname, path, consumer_group, partition)
source = Source(source_url)
if offset is not None:
source.set_filter(offset.selector())
handler = AsyncReceiver(self, source, prefetch=prefetch, loop=loop)
handler = AsyncReceiver(self, source_url, offset=offset, prefetch=prefetch, loop=loop)
self.clients.append(handler)
return handler

Expand Down
93 changes: 74 additions & 19 deletions azure/eventhub/_async/receiver_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@
import asyncio

from uamqp import errors, types
from uamqp import ReceiveClientAsync
from uamqp import ReceiveClientAsync, Source

from azure.eventhub import EventHubError, EventData
from azure.eventhub.receiver import Receiver
from azure.eventhub.common import _error_handler


class AsyncReceiver(Receiver):
"""
Implements the async API of a Receiver.
"""

def __init__(self, client, source, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called
def __init__(self, client, source, offset=None, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called
"""
Instantiate an async receiver.

Expand All @@ -33,25 +34,32 @@ def __init__(self, client, source, prefetch=300, epoch=None, loop=None): # pyli
:param loop: An event loop.
"""
self.loop = loop or asyncio.get_event_loop()
self.client = client
self.source = source
self.offset = offset
self.prefetch = prefetch
self.epoch = epoch
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.redirected = None
self.error = None
self.debug = client.debug
self.offset = None
self.prefetch = prefetch
self.properties = None
self.epoch = epoch
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
if epoch:
self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))}
self._handler = ReceiveClientAsync(
source,
auth=client.auth,
debug=self.debug,
auth=self.client.get_auth(),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=30,
loop=self.loop)

async def open_async(self, connection):
async def open_async(self):
"""
Open the Receiver using the supplied conneciton.
If the handler has previously been redirected, the redirect
Expand All @@ -60,16 +68,54 @@ async def open_async(self, connection):
:param connection: The underlying client shared connection.
:type: connection: ~uamqp._async.connection_async.ConnectionAsync
"""
# pylint: disable=protected-access
if self.redirected:
self.source = self.redirected.address
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
self._handler = ReceiveClientAsync(
self.redirected.address,
auth=None,
debug=self.debug,
source,
auth=self.client.get_auth(**alt_creds),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=30,
loop=self.loop)
await self._handler.open_async(connection=connection)
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async()

async def reconnect_async(self):
"""If the Receiver was disconnected from the service with
a retryable error - attempt to reconnect."""
# pylint: disable=protected-access
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
await self._handler.close_async()
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
self._handler = ReceiveClientAsync(
source,
auth=self.client.get_auth(**alt_creds),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=30,
properties=self.client.create_properties(),
loop=self.loop)
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async()

async def has_started(self):
"""
Expand All @@ -88,7 +134,7 @@ async def has_started(self):
raise EventHubError("Authorization timeout.")
elif auth_in_progress:
return False
elif not await self._handler._client_ready():
elif not await self._handler._client_ready_async():
return False
else:
return True
Expand All @@ -109,6 +155,8 @@ async def close_async(self, exception=None):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
self.error = EventHubError(str(exception), exception)
elif exception:
self.error = EventHubError(str(exception))
else:
Expand All @@ -129,21 +177,28 @@ async def receive(self, max_batch_size=None, timeout=None):
"""
if self.error:
raise self.error
data_batch = []
try:
timeout_ms = 1000 * timeout if timeout else 0
message_batch = await self._handler.receive_message_batch_async(
max_batch_size=max_batch_size,
timeout=timeout_ms)
data_batch = []
for message in message_batch:
event_data = EventData(message=message)
self.offset = event_data.offset
data_batch.append(event_data)
return data_batch
except errors.LinkDetach as detach:
error = EventHubError(str(detach))
await self.close_async(exception=error)
raise error
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry:
await self.reconnect_async()
return data_batch
else:
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError:
await self.reconnect_async()
return data_batch
except Exception as e:
error = EventHubError("Receive failed: {}".format(e))
await self.close_async(exception=error)
Expand Down
Loading