-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[EventHubs] Track2 Preview3 #7059
Changes from 32 commits
64da8ed
d951dcf
8bbac25
70a33d0
abbdd25
b45d6b3
247004a
6ace6ce
008421d
b8c027d
2489dd3
3a2d72f
9735756
288617e
2bdbffe
e8ea699
889597c
cb08478
b3dcd07
b85e6cc
92feb09
5e51ce2
726bf6f
ffd8cb0
2f69d65
e5c8d1c
e85ac17
1fb341b
7762130
1b10d00
13237b5
998eeed
e13ddee
f616f37
dad5baa
8e7e1c1
13a8fe7
b5c933f
7b0f5fe
167361e
1253983
548a989
725b333
c359042
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,26 +13,6 @@ | |
log = logging.getLogger(__name__) | ||
|
||
|
||
def _retry_decorator(to_be_wrapped_func): | ||
async def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor | ||
timeout = kwargs.pop("timeout", 100000) | ||
if not timeout: | ||
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number. | ||
timeout_time = time.time() + timeout | ||
max_retries = self.client.config.max_retries | ||
retry_count = 0 | ||
last_exception = None | ||
while True: | ||
try: | ||
return await to_be_wrapped_func( | ||
self, timeout_time=timeout_time, last_exception=last_exception, **kwargs | ||
) | ||
except Exception as exception: # pylint:disable=broad-except | ||
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access | ||
retry_count += 1 | ||
return wrapped_func | ||
|
||
|
||
class ConsumerProducerMixin(object): | ||
|
||
def __init__(self): | ||
|
@@ -58,7 +38,7 @@ async def _redirect(self, redirect): | |
self.running = False | ||
await self._close_connection() | ||
|
||
async def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor | ||
async def _open(self): | ||
""" | ||
Open the EventHubConsumer using the supplied connection. | ||
If the handler has previously been redirected, the redirect | ||
|
@@ -94,12 +74,36 @@ async def _close_connection(self): | |
await self._close_handler() | ||
await self.client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access | ||
|
||
async def _handle_exception(self, exception, retry_count, max_retries, timeout_time): | ||
async def _handle_exception(self, exception): | ||
if not self.running and isinstance(exception, compat.TimeoutException): | ||
exception = errors.AuthenticationException("Authorization timeout.") | ||
return await _handle_exception(exception, retry_count, max_retries, self, timeout_time) | ||
return await _handle_exception(exception, self) | ||
|
||
return await _handle_exception(exception, self) | ||
|
||
async def _do_retryable_operation(self, operation, timeout=None, **kwargs): | ||
# pylint:disable=protected-access | ||
if not timeout: | ||
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not just set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good idea. just updated the code to set the default value 100000. |
||
timeout_time = time.time() + timeout | ||
retried_times = 0 | ||
last_exception = kwargs.pop('last_exception', None) | ||
operation_need_param = kwargs.pop('operation_need_param', True) | ||
|
||
while retried_times <= self.client.config.max_retries: | ||
try: | ||
if operation_need_param: | ||
return await operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs) | ||
else: | ||
return await operation() | ||
except Exception as exception: # pylint:disable=broad-except | ||
last_exception = await self._handle_exception(exception) | ||
await self.client._try_delay(retried_times=retried_times, last_exception=last_exception, | ||
timeout_time=timeout_time, entity_name=self.name) | ||
retried_times += 1 | ||
|
||
return await _handle_exception(exception, retry_count, max_retries, self, timeout_time) | ||
log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest being explicit that that exception is in fact the last exception:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for the suggestion, just updated |
||
raise last_exception | ||
|
||
async def close(self, exception=None): | ||
# type: (Exception) -> None | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to have two return _handle_exception(exception, self)?