Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Track2 Preview3 #7059

Merged
merged 44 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
64da8ed
Small changes from code review
Aug 23, 2019
d951dcf
change EventData.msg_properties to private attribute
Aug 26, 2019
8bbac25
remove abstract method
Aug 27, 2019
70a33d0
code clean 1
Aug 28, 2019
abbdd25
code clean 2
Aug 28, 2019
b45d6b3
Fix pylint
Aug 29, 2019
247004a
Fix pylint
Aug 29, 2019
6ace6ce
Use properties EventData.partition_key
Aug 29, 2019
008421d
Small changes from code review
Aug 23, 2019
b8c027d
change EventData.msg_properties to private attribute
Aug 26, 2019
2489dd3
remove abstract method
Aug 27, 2019
3a2d72f
code clean 1
Aug 28, 2019
9735756
code clean 2
Aug 28, 2019
288617e
Fix pylint
Aug 29, 2019
2bdbffe
Fix pylint
Aug 29, 2019
e8ea699
Use properties EventData.partition_key
Aug 29, 2019
889597c
Merge branch 'eventhubs_preview3' of github.com:Azure/azure-sdk-for-p…
Aug 29, 2019
cb08478
Use properties EventData.partition_key
Aug 29, 2019
b3dcd07
Temporarily disable pylint errors that need refactoring
Aug 29, 2019
b85e6cc
fix pylint errors
Aug 29, 2019
92feb09
Merge branch 'master' into eventhubs_preview3
Aug 29, 2019
5e51ce2
fix pylint errors
Aug 30, 2019
726bf6f
ignore eventprocessor pylint temporarily
Aug 30, 2019
ffd8cb0
small pylint adjustment
Aug 30, 2019
2f69d65
Merge branch 'master' into eventhubs_preview3
Aug 30, 2019
e5c8d1c
Add typing for Python2.7
Aug 30, 2019
e85ac17
[EventHub] IoTHub management operations improvement and bug fixing (#…
yunhaoling Sep 2, 2019
1fb341b
[EventHub] Retry refactor (#7026)
yunhaoling Sep 3, 2019
7762130
add system_properties to EventData
Sep 3, 2019
1b10d00
Fix a small bug
Sep 4, 2019
13237b5
Refine example code
Sep 4, 2019
998eeed
Update receive method (#7064)
yunhaoling Sep 4, 2019
e13ddee
Update accessibility of class (#7091)
yunhaoling Sep 6, 2019
f616f37
Update samples and codes according to the review (#7098)
yunhaoling Sep 6, 2019
dad5baa
Python EventHubs load balancing (#6901)
YijunXieMS Sep 7, 2019
8e7e1c1
Fix a pylint error
Sep 7, 2019
13a8fe7
Eventhubs blobstorage checkpointstore merge to preview3 (#7109)
YijunXieMS Sep 7, 2019
b5c933f
exclude eventprocessor test for python27
Sep 7, 2019
7b0f5fe
exclude eventprocessor test
Sep 7, 2019
167361e
Revert "Eventhubs blobstorage checkpointstore merge to preview3 (#7109)"
Sep 7, 2019
1253983
Fix small problem in consumer iterator (#7110)
yunhaoling Sep 7, 2019
548a989
Fixed an issue that initializes partition processor multiple times
Sep 8, 2019
725b333
Update release history for 5.0.0b3
Sep 9, 2019
c359042
Update README for 5.0.0b3
Sep 9, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
ignore-patterns=test_*,conftest,setup
reports=no

# PYLINT DIRECTORY BLACKLIST. Ignore eventprocessor temporarily until new eventprocessor code is merged to master
ignore=_generated,samples,examples,test,tests,doc,.tox,eventprocessor
# PYLINT DIRECTORY BLACKLIST.
ignore=_generated,samples,examples,test,tests,doc,.tox

init-hook='import sys; sys.path.insert(0, os.path.abspath(os.getcwd().rsplit("azure-sdk-for-python", 1)[0] + "azure-sdk-for-python/scripts/pylint_custom_plugin"))'
load-plugins=pylint_guidelines_checker
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "5.0.0b2"
__version__ = "5.0.0b3"
from uamqp import constants # type: ignore
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,11 @@
log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
self._client = None
self._handler = None
self.name = None
self._name = None

def __enter__(self):
return self
Expand All @@ -44,59 +26,81 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))
if self._error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name))

def _create_handler(self):
pass

def _redirect(self, redirect):
self.redirected = redirect
self.running = False
self._redirected = redirect
self._running = False
self._close_connection()

def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor
def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
Open the EventHubConsumer/EventHubProducer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

"""
# pylint: disable=protected-access
if not self.running:
if not self._running:
if self._handler:
self._handler.close()
if self.redirected:
if self._redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
"username": self._client._auth_config.get("iot_username"),
"password": self._client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
self._handler.open(connection=self.client._conn_manager.get_connection(
self.client.address.hostname,
self.client.get_auth(**alt_creds)
self._handler.open(connection=self._client._conn_manager.get_connection( # pylint: disable=protected-access
self._client._address.hostname,
self._client._get_auth(**alt_creds)
))
while not self._handler.client_ready():
time.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True
self._running = True

def _close_handler(self):
self._handler.close() # close the link (sharing connection) or connection (not sharing)
self.running = False
self._running = False

def _close_connection(self):
self._close_handler()
self.client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access
self._client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
if not self.running and isinstance(exception, compat.TimeoutException):
def _handle_exception(self, exception):
if not self._running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
return _handle_exception(exception, self)

return _handle_exception(exception, self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to have two return _handle_exception(exception, self)?


def _do_retryable_operation(self, operation, timeout=100000, **kwargs):
# pylint:disable=protected-access
timeout_time = time.time() + (
timeout if timeout else 100000) # timeout equals to 0 means no timeout, set the value to be a large number.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeout or 100000

retried_times = 0
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self._client._config.max_retries: # pylint: disable=protected-access
try:
if operation_need_param:
return operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
return operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception)
self._client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self._name)
retried_times += 1

return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception)
raise last_exception

def close(self, exception=None):
# type:(Exception) -> None
Expand All @@ -118,16 +122,16 @@ def close(self, exception=None):
:caption: Close down the handler.

"""
self.running = False
if self.error: # type: ignore
self._running = False
if self._error: # type: ignore
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
self._redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
self._error = exception
elif exception:
self.error = EventHubError(str(exception))
self._error = EventHubError(str(exception))
else:
self.error = EventHubError("{} handler is closed.".format(self.name))
self._error = EventHubError("{} handler is closed.".format(self._name))
if self._handler:
self._handler.close() # this will close link if sharing connection. Otherwise close connection
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,12 @@
log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
async def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return await to_be_wrapped_func(
self, timeout_time=timeout_time, last_exception=last_exception, **kwargs
)
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):

def __init__(self):
self.client = None
self._client = None
self._handler = None
self.name = None
self._name = None

async def __aenter__(self):
return self
Expand All @@ -47,59 +27,81 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))
if self._error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name))

def _create_handler(self):
pass

async def _redirect(self, redirect):
self.redirected = redirect
self.running = False
self._redirected = redirect
self._running = False
await self._close_connection()

async def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor
async def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

"""
# pylint: disable=protected-access
if not self.running:
if not self._running:
if self._handler:
await self._handler.close_async()
if self.redirected:
if self._redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
"username": self._client._auth_config.get("iot_username"),
"password": self._client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
await self._handler.open_async(connection=await self.client._conn_manager.get_connection(
self.client.address.hostname,
self.client.get_auth(**alt_creds)
await self._handler.open_async(connection=await self._client._conn_manager.get_connection(
self._client._address.hostname,
self._client._get_auth(**alt_creds)
))
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True
self._running = True

async def _close_handler(self):
await self._handler.close_async() # close the link (sharing connection) or connection (not sharing)
self.running = False
self._running = False

async def _close_connection(self):
await self._close_handler()
await self.client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access
await self._client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access

async def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
if not self.running and isinstance(exception, compat.TimeoutException):
async def _handle_exception(self, exception):
if not self._running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)
return await _handle_exception(exception, self)

return await _handle_exception(exception, self)

async def _do_retryable_operation(self, operation, timeout=100000, **kwargs):
# pylint:disable=protected-access
timeout_time = time.time() + (
timeout if timeout else 100000) # timeout equals to 0 means no timeout, set the value to be a large number.
retried_times = 0
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self._client._config.max_retries:
try:
if operation_need_param:
return await operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
return await operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception)
await self._client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self._name)
retried_times += 1

return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)
log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception)
raise last_exception

async def close(self, exception=None):
# type: (Exception) -> None
Expand All @@ -121,18 +123,18 @@ async def close(self, exception=None):
:caption: Close down the handler.

"""
self.running = False
if self.error: #type: ignore
self._running = False
if self._error: #type: ignore
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
self._redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
self._error = exception
elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
self.error = ConnectError(str(exception), exception)
self._error = ConnectError(str(exception), exception)
elif exception:
self.error = EventHubError(str(exception))
self._error = EventHubError(str(exception))
else:
self.error = EventHubError("This receive handler is now closed.")
self._error = EventHubError("This receive handler is now closed.")
if self._handler:
await self._handler.close_async()
Loading