Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Track2 Preview3 #7059

Merged
merged 44 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
64da8ed
Small changes from code review
Aug 23, 2019
d951dcf
change EventData.msg_properties to private attribute
Aug 26, 2019
8bbac25
remove abstract method
Aug 27, 2019
70a33d0
code clean 1
Aug 28, 2019
abbdd25
code clean 2
Aug 28, 2019
b45d6b3
Fix pylint
Aug 29, 2019
247004a
Fix pylint
Aug 29, 2019
6ace6ce
Use properties EventData.partition_key
Aug 29, 2019
008421d
Small changes from code review
Aug 23, 2019
b8c027d
change EventData.msg_properties to private attribute
Aug 26, 2019
2489dd3
remove abstract method
Aug 27, 2019
3a2d72f
code clean 1
Aug 28, 2019
9735756
code clean 2
Aug 28, 2019
288617e
Fix pylint
Aug 29, 2019
2bdbffe
Fix pylint
Aug 29, 2019
e8ea699
Use properties EventData.partition_key
Aug 29, 2019
889597c
Merge branch 'eventhubs_preview3' of github.com:Azure/azure-sdk-for-p…
Aug 29, 2019
cb08478
Use properties EventData.partition_key
Aug 29, 2019
b3dcd07
Temporarily disable pylint errors that need refactoring
Aug 29, 2019
b85e6cc
fix pylint errors
Aug 29, 2019
92feb09
Merge branch 'master' into eventhubs_preview3
Aug 29, 2019
5e51ce2
fix pylint errors
Aug 30, 2019
726bf6f
ignore eventprocessor pylint temporarily
Aug 30, 2019
ffd8cb0
small pylint adjustment
Aug 30, 2019
2f69d65
Merge branch 'master' into eventhubs_preview3
Aug 30, 2019
e5c8d1c
Add typing for Python2.7
Aug 30, 2019
e85ac17
[EventHub] IoTHub management operations improvement and bug fixing (#…
yunhaoling Sep 2, 2019
1fb341b
[EventHub] Retry refactor (#7026)
yunhaoling Sep 3, 2019
7762130
add system_properties to EventData
Sep 3, 2019
1b10d00
Fix a small bug
Sep 4, 2019
13237b5
Refine example code
Sep 4, 2019
998eeed
Update receive method (#7064)
yunhaoling Sep 4, 2019
e13ddee
Update accessibility of class (#7091)
yunhaoling Sep 6, 2019
f616f37
Update samples and codes according to the review (#7098)
yunhaoling Sep 6, 2019
dad5baa
Python EventHubs load balancing (#6901)
YijunXieMS Sep 7, 2019
8e7e1c1
Fix a pylint error
Sep 7, 2019
13a8fe7
Eventhubs blobstorage checkpointstore merge to preview3 (#7109)
YijunXieMS Sep 7, 2019
b5c933f
exclude eventprocessor test for python27
Sep 7, 2019
7b0f5fe
exclude eventprocessor test
Sep 7, 2019
167361e
Revert "Eventhubs blobstorage checkpointstore merge to preview3 (#7109)"
Sep 7, 2019
1253983
Fix small problem in consumer iterator (#7110)
yunhaoling Sep 7, 2019
548a989
Fixed an issue that initializes partition processor multiple times
Sep 8, 2019
725b333
Update release history for 5.0.0b3
Sep 9, 2019
c359042
Update README for 5.0.0b3
Sep 9, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,6 @@
log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
Expand All @@ -55,9 +37,9 @@ def _redirect(self, redirect):
self.running = False
self._close_connection()

def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor
def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
Open the EventHubConsumer/EventHubProducer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

Expand All @@ -73,7 +55,7 @@ def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to
else:
alt_creds = {}
self._create_handler()
self._handler.open(connection=self.client._conn_manager.get_connection(
self._handler.open(connection=self.client._conn_manager.get_connection( # pylint: disable=protected-access
self.client.address.hostname,
self.client.get_auth(**alt_creds)
))
Expand All @@ -91,12 +73,36 @@ def _close_connection(self):
self._close_handler()
self.client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
def _handle_exception(self, exception):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
return _handle_exception(exception, self)

return _handle_exception(exception, self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to have two return _handle_exception(exception, self)?


def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# pylint:disable=protected-access
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
retried_times = 0
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self.client.config.max_retries:
try:
if operation_need_param:
return operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
else:
return operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception)
self.client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self.name)
retried_times += 1

return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception)
raise last_exception

def close(self, exception=None):
# type:(Exception) -> None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,6 @@
log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
async def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return await to_be_wrapped_func(
self, timeout_time=timeout_time, last_exception=last_exception, **kwargs
)
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):

def __init__(self):
Expand All @@ -58,7 +38,7 @@ async def _redirect(self, redirect):
self.running = False
await self._close_connection()

async def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor
async def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
Expand Down Expand Up @@ -94,12 +74,36 @@ async def _close_connection(self):
await self._close_handler()
await self.client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access

async def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
async def _handle_exception(self, exception):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)
return await _handle_exception(exception, self)

return await _handle_exception(exception, self)

async def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# pylint:disable=protected-access
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just set timeout=100000 in the argument list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. just updated the code to set the default value 100000.

timeout_time = time.time() + timeout
retried_times = 0
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self.client.config.max_retries:
try:
if operation_need_param:
return await operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
else:
return await operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception)
await self.client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self.name)
retried_times += 1

return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)
log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest being explicit that that exception is in fact the last exception:

log.info("Operation %r has exhausted retries. Last exception: %r", self.name, last_exception)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the suggestion, just updated

raise last_exception

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
51 changes: 42 additions & 9 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
# --------------------------------------------------------------------------------------------
import logging
import datetime
import time
import functools
import asyncio

from typing import Any, List, Dict, Union, TYPE_CHECKING

from uamqp import authentication, constants # type: ignore
Expand Down Expand Up @@ -47,6 +49,7 @@ class EventHubClient(EventHubClientAbstract):
def __init__(self, host, event_hub_path, credential, **kwargs):
# type:(str, str, Union[EventHubSharedKeyCredential, EventHubSASTokenCredential, TokenCredential], Any) -> None
super(EventHubClient, self).__init__(host=host, event_hub_path=event_hub_path, credential=credential, **kwargs)
self._lock = asyncio.Lock()
self._conn_manager = get_connection_manager(**kwargs)

async def __aenter__(self):
Expand Down Expand Up @@ -98,17 +101,30 @@ def _create_auth(self, username=None, password=None):
get_jwt_token, http_proxy=http_proxy,
transport_type=transport_type)

async def _handle_exception(self, exception, retry_count, max_retries):
await _handle_exception(exception, retry_count, max_retries, self)

async def _close_connection(self):
await self._conn_manager.reset_connection_if_broken()

async def _try_delay(self, retried_times, last_exception, timeout_time=None, entity_name=None):
entity_name = entity_name or self.container_id
backoff = self.config.backoff_factor * 2 ** retried_times
if backoff <= self.config.backoff_max and (
timeout_time is None or time.time() + backoff <= timeout_time): # pylint:disable=no-else-return
await asyncio.sleep(backoff)
log.info("%r has an exception (%r). Retrying...", format(entity_name), last_exception)
else:
log.info("%r operation has timed out. Last exception before timeout is (%r)",
entity_name, last_exception)
raise last_exception

async def _management_request(self, mgmt_msg, op_type):
max_retries = self.config.max_retries
retry_count = 0
while True:
mgmt_auth = self._create_auth()
alt_creds = {
"username": self._auth_config.get("iot_username"),
"password": self._auth_config.get("iot_password")
}

retried_times = 0
while retried_times <= self.config.max_retries:
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.config.network_tracing)
try:
conn = await self._conn_manager.get_connection(self.host, mgmt_auth)
Expand All @@ -121,11 +137,24 @@ async def _management_request(self, mgmt_msg, op_type):
description_fields=b'status-description')
return response
except Exception as exception: # pylint:disable=broad-except
await self._handle_exception(exception, retry_count, max_retries)
retry_count += 1
last_exception = await _handle_exception(exception, self)
await self._try_delay(retried_times=retried_times, last_exception=last_exception)
retried_times += 1
finally:
await mgmt_client.close_async()

async def _iothub_redirect(self):
async with self._lock:
if self._is_iothub and not self._iothub_redirect_info:
if not self._redirect_consumer:
self._redirect_consumer = self.create_consumer(consumer_group='$default',
partition_id='0',
event_position=EventPosition('-1'),
operation='/messages/events')
async with self._redirect_consumer:
await self._redirect_consumer._open_with_retry() # pylint: disable=protected-access
self._redirect_consumer = None

async def get_properties(self):
# type:() -> Dict[str, Any]
"""
Expand All @@ -139,6 +168,8 @@ async def get_properties(self):
:rtype: dict
:raises: ~azure.eventhub.ConnectError
"""
if self._is_iothub and not self._iothub_redirect_info:
await self._iothub_redirect()
mgmt_msg = Message(application_properties={'name': self.eh_name})
response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub')
output = {}
Expand Down Expand Up @@ -178,6 +209,8 @@ async def get_partition_properties(self, partition):
:rtype: dict
:raises: ~azure.eventhub.ConnectError
"""
if self._is_iothub and not self._iothub_redirect_info:
await self._iothub_redirect()
mgmt_msg = Message(application_properties={'name': self.eh_name,
'partition': partition})
response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:partition')
Expand Down
46 changes: 27 additions & 19 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from azure.eventhub import EventData, EventPosition
from azure.eventhub.error import EventHubError, ConnectError, _error_handler
from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator
from ._consumer_producer_mixin_async import ConsumerProducerMixin

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -92,26 +92,29 @@ def __aiter__(self):
return self

async def __anext__(self):
max_retries = self.client.config.max_retries
retry_count = 0
while True:
retried_times = 0
while retried_times < self.client.config.max_retries:
try:
await self._open()
if not self.messages_iter:
self.messages_iter = self._handler.receive_messages_iter_async()
message = await self.messages_iter.__anext__()
event_data = EventData._from_message(message) # pylint:disable=protected-access
self.offset = EventPosition(event_data.offset, inclusive=False)
retry_count = 0
retried_times = 0
return event_data
except Exception as exception: # pylint:disable=broad-except
await self._handle_exception(exception, retry_count, max_retries, timeout_time=None)
retry_count += 1
last_exception = await self._handle_exception(exception)
await self.client._try_delay(retried_times=retried_times, last_exception=last_exception,
entity_name=self.name)
retried_times += 1
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved

def _create_handler(self):
alt_creds = {
"username": self.client._auth_config.get("iot_username"), # pylint:disable=protected-access
"password": self.client._auth_config.get("iot_password")} # pylint:disable=protected-access
"username": self.client._auth_config.get("iot_username") if self.redirected else None, # pylint:disable=protected-access
"password": self.client._auth_config.get("iot_password") if self.redirected else None # pylint:disable=protected-access
}

source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset._selector()) # pylint:disable=protected-access
Expand All @@ -134,24 +137,29 @@ async def _redirect(self, redirect):
self.messages_iter = None
await super(EventHubConsumer, self)._redirect(redirect)

async def _open(self, timeout_time=None):
async def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

"""
# pylint: disable=protected-access
self.redirected = self.redirected or self.client._iothub_redirect_info

if not self.running and self.redirected:
self.client._process_redirect_uri(self.redirected)
self.source = self.redirected.address
await super(EventHubConsumer, self)._open(timeout_time)
await super(EventHubConsumer, self)._open()

async def _open_with_retry(self):
return await self._do_retryable_operation(self._open, operation_need_param=False)

async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
last_exception = kwargs.get("last_exception")
data_batch = kwargs.get("data_batch")
data_batch = []

await self._open(timeout_time)
await self._open()
remaining_time = timeout_time - time.time()
if remaining_time <= 0.0:
if last_exception:
Expand All @@ -169,9 +177,9 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
data_batch.append(event_data)
return data_batch

@_retry_decorator
async def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs):
return await self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs)
async def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs):
return await self._do_retryable_operation(self._receive, timeout=timeout,
max_batch_size=max_batch_size, **kwargs)

@property
def queue_size(self):
Expand Down Expand Up @@ -217,9 +225,8 @@ async def receive(self, *, max_batch_size=None, timeout=None):

timeout = timeout or self.client.config.receive_timeout
max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch)
data_batch = [] # type: List[EventData]

return await self._receive_with_try(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)
return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size)

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down Expand Up @@ -254,4 +261,5 @@ async def close(self, exception=None):
self.error = EventHubError(str(exception))
else:
self.error = EventHubError("This receive handler is now closed.")
await self._handler.close_async()
if self._handler:
await self._handler.close_async()
Loading