Skip to content

Commit

Permalink
[EventHubs] Track2 Preview3 (#7059)
Browse files Browse the repository at this point in the history
* Small changes from code review

* change EventData.msg_properties to private attribute

* remove abstract method

* code clean 1

* code clean 2

* Fix pylint

* Fix pylint

* Use properties EventData.partition_key

* Small changes from code review

* change EventData.msg_properties to private attribute

* remove abstract method

* code clean 1

* code clean 2

* Fix pylint

* Fix pylint

* Use properties EventData.partition_key

* Use properties EventData.partition_key

* Temporarily disable pylint errors that need refactoring

* fix pylint errors

* fix pylint errors

* ignore eventprocessor pylint temporarily

* small pylint adjustment

* Add typing for Python2.7

* [EventHub] IoTHub management operations improvement and bug fixing (#6894)

* Fix bug that iothub hub can't receive

* Support direct mgmt ops of iothub

* Improve mgmt ops and update livetest

* Small fix

* Improvement of iothub mgmt

* [EventHub] Retry refactor (#7026)

* Retry refactor

* Refactor retry, delay and handle exception

* Remove unused module

* Small fix

* Small fix

* add system_properties to EventData

* Fix a small bug

* Refine example code

* Update receive method (#7064)

* Update accessibility of class (#7091)

* Fix pylint

* Update accessibility of of class

* Small fix in livetest

* Wait longer in iothub livetest

* Small updates in livetest

* Update samples and codes according to the review (#7098)

* Update samples and codes according to the review

* Small update

* Python EventHubs load balancing (#6901)

* Draft EventProcessor Loadbalancing

* EventProcessor Load balancing

* small changes from bryan's review

* remove checkpoint manager from initialize

* small changes

* Draft EventProcessor Loadbalancing

* EventProcessor Load balancing

* small changes from bryan's review

* remove checkpoint manager from initialize

* small changes

* Fix code review feedback

* Packaging update of azure-mgmt-datalake-analytics

* Packaging update of azure-loganalytics

* Packaging update of azure-mgmt-storage

* code review fixes and pylint error

* reduce dictionary access

* Revert "Packaging update of azure-mgmt-storage"

This reverts commit cf22c7c.

* Revert "Packaging update of azure-loganalytics"

This reverts commit 40c7f03.

* Revert "Packaging update of azure-mgmt-datalake-analytics"

This reverts commit c126bea.

* Trivial code change

* Refine exception handling for eventprocessor

* Enable pylint for eventprocessor

* Expose OwnershipLostError

* Move eventprocessor to aio
rename Sqlite3PartitionManager to SamplePartitionManager

* change checkpoint_manager to partition context

* fix pylint error

* fix a small issue

* Catch list_ownership/claim_ownership exceptions and retry

* Fix code review issues

* fix event processor long running test

* Remove utils.py

* Remove close() method

* Updated docstrings

* add pytest

* small fixes

* Revert "Remove utils.py"

This reverts commit a9446de.

* change asyncio.create_task to 3.5 friendly code

* Remove Callable

* raise CancelledError instead of break

* Fix a pylint error

* Eventhubs blobstorage checkpointstore merge to preview3 (#7109)

* exclude eventprocessor test for python27

* exclude eventprocessor test

* Revert "Eventhubs blobstorage checkpointstore merge to preview3 (#7109)"

This reverts commit 13a8fe7.

* Fix small problem in consumer iterator (#7110)

* Fixed an issue that initializes partition processor multiple times

* Update release history for 5.0.0b3

* Update README for 5.0.0b3
  • Loading branch information
yunhaoling authored and YijunXieMS committed Sep 9, 2019
1 parent 7e0ddbe commit 6aba868
Show file tree
Hide file tree
Showing 53 changed files with 1,794 additions and 1,341 deletions.
4 changes: 2 additions & 2 deletions pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
ignore-patterns=test_*,conftest,setup
reports=no

# PYLINT DIRECTORY BLACKLIST. Ignore eventprocessor temporarily until new eventprocessor code is merged to master
ignore=_generated,samples,examples,test,tests,doc,.tox,eventprocessor
# PYLINT DIRECTORY BLACKLIST.
ignore=_generated,samples,examples,test,tests,doc,.tox

init-hook='import sys; sys.path.insert(0, os.path.abspath(os.getcwd().rsplit("azure-sdk-for-python", 1)[0] + "azure-sdk-for-python/scripts/pylint_custom_plugin"))'
load-plugins=pylint_guidelines_checker
Expand Down
14 changes: 14 additions & 0 deletions sdk/eventhub/azure-eventhubs/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
# Release History
## 5.0.0b3 (2019-09-10)

**New features**
- `EventProcessor` has a load balancer that balances load among multiple EventProcessors automatically
- In addition to `SamplePartitionManager`, A new `PartitionManager` implementation that uses Azure Blob Storage is added
to centrally store the checkpoint data for event processors. It's not packaged separately as a plug-in to this package.
Refer to [Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) for details.

**Breaking changes**

- `PartitionProcessor` constructor removed argument "checkpoint_manager". Its methods (initialize, process_events,
process_error, close) added argument "partition_context", which has method update_checkpoint.
- `CheckpointManager` was replaced by `PartitionContext`
- Renamed `Sqlite3PartitionManager` to `SamplePartitionManager`

## 5.0.0b2 (2019-08-06)

Expand Down
22 changes: 9 additions & 13 deletions sdk/eventhub/azure-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,16 @@ Using an `EventHubConsumer` to consume events like in the previous examples puts

The `EventProcessor` will delegate the processing of events to a `PartitionProcessor` that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer operations including checkpointing and load balancing.

While load balancing is a feature we will be adding in the next update, you can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory.
You can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory.

[Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) is another `PartitionManager` implementation that allows multiple EventProcessors to share the load balancing and checkpoint data in a central storage.


```python
import asyncio

from azure.eventhub.aio import EventHubClient
from azure.eventhub.eventprocessor import EventProcessor, PartitionProcessor, Sqlite3PartitionManager
from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor, SamplePartitionManager

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'

Expand All @@ -232,24 +235,16 @@ async def do_operation(event):
print(event)

class MyPartitionProcessor(PartitionProcessor):
def __init__(self, checkpoint_manager):
super(MyPartitionProcessor, self).__init__(checkpoint_manager)

async def process_events(self, events):
async def process_events(self, events, partition_context):
if events:
await asyncio.gather(*[do_operation(event) for event in events])
await self._checkpoint_manager.update_checkpoint(events[-1].offset, events[-1].sequence_number)

def partition_processor_factory(checkpoint_manager):
return MyPartitionProcessor(checkpoint_manager)
await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number)

async def main():
client = EventHubClient.from_connection_string(connection_str, receive_timeout=5, retry_total=3)
partition_manager = Sqlite3PartitionManager() # in-memory PartitionManager
partition_manager = SamplePartitionManager() # in-memory PartitionManager.
try:
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
# You can also define a callable object for creating PartitionProcessor like below:
# event_processor = EventProcessor(client, "$default", partition_processor_factory, partition_manager)
asyncio.ensure_future(event_processor.start())
await asyncio.sleep(60)
await event_processor.stop()
Expand All @@ -273,6 +268,7 @@ The Event Hubs APIs generate the following exceptions.
- **EventDataError:** The EventData to be sent fails data validation.
For instance, this error is raised if you try to send an EventData that is already sent.
- **EventDataSendError:** The Eventhubs service responds with an error when an EventData is sent.
- **OperationTimeoutError:** EventHubConsumer.send() times out.
- **EventHubError:** All other Eventhubs related errors. It is also the root error class of all the above mentioned errors.

## Next steps
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "5.0.0b2"
__version__ = "5.0.0b3"
from uamqp import constants # type: ignore
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,11 @@
log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
self._client = None
self._handler = None
self.name = None
self._name = None

def __enter__(self):
return self
Expand All @@ -44,59 +26,81 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))
if self._error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name))

def _create_handler(self):
pass

def _redirect(self, redirect):
self.redirected = redirect
self.running = False
self._redirected = redirect
self._running = False
self._close_connection()

def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor
def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
Open the EventHubConsumer/EventHubProducer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.
"""
# pylint: disable=protected-access
if not self.running:
if not self._running:
if self._handler:
self._handler.close()
if self.redirected:
if self._redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
"username": self._client._auth_config.get("iot_username"),
"password": self._client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
self._handler.open(connection=self.client._conn_manager.get_connection(
self.client.address.hostname,
self.client.get_auth(**alt_creds)
self._handler.open(connection=self._client._conn_manager.get_connection( # pylint: disable=protected-access
self._client._address.hostname,
self._client._get_auth(**alt_creds)
))
while not self._handler.client_ready():
time.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True
self._running = True

def _close_handler(self):
self._handler.close() # close the link (sharing connection) or connection (not sharing)
self.running = False
self._running = False

def _close_connection(self):
self._close_handler()
self.client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access
self._client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
if not self.running and isinstance(exception, compat.TimeoutException):
def _handle_exception(self, exception):
if not self._running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
return _handle_exception(exception, self)

return _handle_exception(exception, self)

def _do_retryable_operation(self, operation, timeout=100000, **kwargs):
# pylint:disable=protected-access
timeout_time = time.time() + (
timeout if timeout else 100000) # timeout equals to 0 means no timeout, set the value to be a large number.
retried_times = 0
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self._client._config.max_retries: # pylint: disable=protected-access
try:
if operation_need_param:
return operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
return operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception)
self._client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self._name)
retried_times += 1

return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception)
raise last_exception

def close(self, exception=None):
# type:(Exception) -> None
Expand All @@ -118,16 +122,16 @@ def close(self, exception=None):
:caption: Close down the handler.
"""
self.running = False
if self.error: # type: ignore
self._running = False
if self._error: # type: ignore
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
self._redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
self._error = exception
elif exception:
self.error = EventHubError(str(exception))
self._error = EventHubError(str(exception))
else:
self.error = EventHubError("{} handler is closed.".format(self.name))
self._error = EventHubError("{} handler is closed.".format(self._name))
if self._handler:
self._handler.close() # this will close link if sharing connection. Otherwise close connection
Loading

0 comments on commit 6aba868

Please sign in to comment.