Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eventhubs preview3 documentation update #7139

Merged
merged 55 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
64da8ed
Small changes from code review
Aug 23, 2019
d951dcf
change EventData.msg_properties to private attribute
Aug 26, 2019
8bbac25
remove abstract method
Aug 27, 2019
70a33d0
code clean 1
Aug 28, 2019
abbdd25
code clean 2
Aug 28, 2019
b45d6b3
Fix pylint
Aug 29, 2019
247004a
Fix pylint
Aug 29, 2019
6ace6ce
Use properties EventData.partition_key
Aug 29, 2019
008421d
Small changes from code review
Aug 23, 2019
b8c027d
change EventData.msg_properties to private attribute
Aug 26, 2019
2489dd3
remove abstract method
Aug 27, 2019
3a2d72f
code clean 1
Aug 28, 2019
9735756
code clean 2
Aug 28, 2019
288617e
Fix pylint
Aug 29, 2019
2bdbffe
Fix pylint
Aug 29, 2019
e8ea699
Use properties EventData.partition_key
Aug 29, 2019
889597c
Merge branch 'eventhubs_preview3' of github.com:Azure/azure-sdk-for-p…
Aug 29, 2019
cb08478
Use properties EventData.partition_key
Aug 29, 2019
b3dcd07
Temporarily disable pylint errors that need refactoring
Aug 29, 2019
b85e6cc
fix pylint errors
Aug 29, 2019
92feb09
Merge branch 'master' into eventhubs_preview3
Aug 29, 2019
5e51ce2
fix pylint errors
Aug 30, 2019
726bf6f
ignore eventprocessor pylint temporarily
Aug 30, 2019
ffd8cb0
small pylint adjustment
Aug 30, 2019
2f69d65
Merge branch 'master' into eventhubs_preview3
Aug 30, 2019
e5c8d1c
Add typing for Python2.7
Aug 30, 2019
e85ac17
[EventHub] IoTHub management operations improvement and bug fixing (#…
yunhaoling Sep 2, 2019
1fb341b
[EventHub] Retry refactor (#7026)
yunhaoling Sep 3, 2019
7762130
add system_properties to EventData
Sep 3, 2019
1b10d00
Fix a small bug
Sep 4, 2019
13237b5
Refine example code
Sep 4, 2019
998eeed
Update receive method (#7064)
yunhaoling Sep 4, 2019
e13ddee
Update accessibility of class (#7091)
yunhaoling Sep 6, 2019
f616f37
Update samples and codes according to the review (#7098)
yunhaoling Sep 6, 2019
dad5baa
Python EventHubs load balancing (#6901)
YijunXieMS Sep 7, 2019
8e7e1c1
Fix a pylint error
Sep 7, 2019
13a8fe7
Eventhubs blobstorage checkpointstore merge to preview3 (#7109)
YijunXieMS Sep 7, 2019
b5c933f
exclude eventprocessor test for python27
Sep 7, 2019
7b0f5fe
exclude eventprocessor test
Sep 7, 2019
167361e
Revert "Eventhubs blobstorage checkpointstore merge to preview3 (#7109)"
Sep 7, 2019
1253983
Fix small problem in consumer iterator (#7110)
yunhaoling Sep 7, 2019
548a989
Fixed an issue that initializes partition processor multiple times
Sep 8, 2019
725b333
Update release history for 5.0.0b3
Sep 9, 2019
c359042
Update README for 5.0.0b3
Sep 9, 2019
b0ae73a
Merge branch 'master' into eventhubs_preview3
Sep 9, 2019
1e98a2b
Fix an issue
Sep 9, 2019
c408d7c
fix a small issue
Sep 9, 2019
48de36c
Fix a small issue
Sep 9, 2019
8563cb8
Updates for docs, tests (#7111)
yunhaoling Sep 9, 2019
6aed419
Update history (#7158)
yunhaoling Sep 10, 2019
18aa083
add pkgutil
Sep 10, 2019
f577a1a
Update docstring
Sep 10, 2019
6aca2cd
Small README update
Sep 10, 2019
40bb7c8
Update docstring (#7166)
yunhaoling Sep 10, 2019
d5c839b
Fix a pylint error
Sep 10, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions sdk/eventhub/azure-eventhubs/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
# Release History

## 5.0.0b3 (2019-09-10)

**New features**
- `EventProcessor` has a load balancer that balances load among multiple EventProcessors automatically
- In addition to `SamplePartitionManager`, A new `PartitionManager` implementation that uses Azure Blob Storage is added
to centrally store the checkpoint data for event processors. It's not packaged separately as a plug-in to this package.
Refer to [Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) for details.

- Added support for automatic load balancing among multiple `EventProcessor`.
- Added `BlobPartitionManager` which implements `PartitionManager`.
- Azure Blob Storage is applied for storing data used by `EventProcessor`.
- Packaged separately as a plug-in to `EventProcessor`.
- For details, please refer to [Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio).
- Added property `system_properties` on `EventData`.

**Breaking changes**

- `PartitionProcessor` constructor removed argument "checkpoint_manager". Its methods (initialize, process_events,
process_error, close) added argument "partition_context", which has method update_checkpoint.
- `CheckpointManager` was replaced by `PartitionContext`
- Renamed `Sqlite3PartitionManager` to `SamplePartitionManager`
- Removed constructor method of `PartitionProcessor`. For initialization please implement the method `initialize`.
- Replaced `CheckpointManager` by `PartitionContext`.
- `PartitionContext` has partition context information and method `update_checkpoint`.
- Updated all methods of `PartitionProcessor` to include `PartitionContext` as part of the arguments.
- Updated accessibility of class members in `EventHub/EventHubConsumer/EventHubProducer`to be private.
- Moved `azure.eventhub.eventprocessor` under `aio` package, which now becomes `azure.eventhub.aio.eventprocessor`.

## 5.0.0b2 (2019-08-06)

Expand Down
75 changes: 67 additions & 8 deletions sdk/eventhub/azure-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The Azure Event Hubs client library allows for publishing and consuming of Azure
- Observe interesting operations and interactions happening within your business or other ecosystem, allowing loosely coupled systems to interact without the need to bind them together.
- Receive events from one or more publishers, transform them to better meet the needs of your ecosystem, then publish the transformed events to a new stream for consumers to observe.

[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) | [Package (PyPi)](https://pypi.org/project/azure-eventhub/) | [API reference documentation](https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub) | [Product documentation](https://docs.microsoft.com/en-ca/azure/event-hubs/)
[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) | [Package (PyPi)](https://pypi.org/project/azure-eventhub/) | [API reference documentation](https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub) | [Product documentation](https://docs.microsoft.com/en-us/azure/event-hubs/)

## Getting started

Expand All @@ -20,10 +20,6 @@ Install the Azure Event Hubs client library for Python with pip:
```
$ pip install --pre azure-eventhub
```
For Python2.7, please install package "typing". This is a workaround for [issue 6767](https://github.com/Azure/azure-sdk-for-python/issues/6767).
```
$ pip install typing
```

**Prerequisites**

Expand Down Expand Up @@ -113,6 +109,8 @@ partition_ids = client.get_partition_ids()

Publish events to an Event Hub.

#### Send a single event or an array of events

```python
from azure.eventhub import EventHubClient, EventData

Expand All @@ -134,6 +132,34 @@ finally:
pass
```

#### Send a batch of events

Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method. Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached.
```python
from azure.eventhub import EventHubClient, EventData

try:
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubClient.from_connection_string(connection_str, event_hub_path)
producer = client.create_producer(partition_id="0")

event_data_batch = producer.create_batch(max_size=10000)
can_add = True
while can_add:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.

with producer:
producer.send(event_data_batch)
except:
raise
finally:
pass
```

### Consume events from an Event Hub

Consume events from an Event Hub.
Expand Down Expand Up @@ -163,6 +189,7 @@ finally:

Publish events to an Event Hub asynchronously.

#### Send a single event or an array of events
```python
from azure.eventhub.aio import EventHubClient
from azure.eventhub import EventData
Expand All @@ -178,7 +205,37 @@ try:
event_list.append(EventData(b"A single event"))

async with producer:
await producer.send(event_list)
await producer.send(event_list) # Send a list of events
await producer.send(EventData(b"A single event")) # Send a single event
except:
raise
finally:
pass
```

#### Send a batch of events

Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method. Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached.
```python
from azure.eventhub.aio import EventHubClient
from azure.eventhub import EventData

try:
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubClient.from_connection_string(connection_str, event_hub_path)
producer = client.create_producer(partition_id="0")

event_data_batch = await producer.create_batch(max_size=10000)
can_add = True
while can_add:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.

async with producer:
await producer.send(event_data_batch)
except:
raise
finally:
Expand Down Expand Up @@ -217,9 +274,11 @@ Using an `EventHubConsumer` to consume events like in the previous examples puts

The `EventProcessor` will delegate the processing of events to a `PartitionProcessor` that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer operations including checkpointing and load balancing.

Load balancing is typically useful when running multiple instances of `EventProcessor` across multiple processes or even machines. It is recommended to store checkpoints to a persistent store when running in production. Search pypi with the prefix `azure-eventhubs-checkpoint` to find packages that support persistent storage of checkpoints.

You can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory.

[Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) is another `PartitionManager` implementation that allows multiple EventProcessors to share the load balancing and checkpoint data in a central storage.
[Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) is one of the `PartitionManager` implementation we provide that applies Azure Blob Storage as the persistent store.


```python
Expand All @@ -242,7 +301,7 @@ class MyPartitionProcessor(PartitionProcessor):

async def main():
client = EventHubClient.from_connection_string(connection_str, receive_timeout=5, retry_total=3)
partition_manager = SamplePartitionManager() # in-memory PartitionManager.
partition_manager = SamplePartitionManager() # in-memory or file based PartitionManager
try:
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
asyncio.ensure_future(event_processor.start())
Expand Down
1 change: 1 addition & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
__version__ = "5.0.0b3"
from uamqp import constants # type: ignore
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async def get_properties(self):
-'partition_ids'

:rtype: dict
:raises: ~azure.eventhub.ConnectError
:raises: ~azure.eventhub.EventHubError
"""
if self._is_iothub and not self._iothub_redirect_info:
await self._iothub_redirect()
Expand Down Expand Up @@ -207,7 +207,7 @@ async def get_partition_properties(self, partition):
:param partition: The target partition id.
:type partition: str
:rtype: dict
:raises: ~azure.eventhub.ConnectError
:raises: ~azure.eventhub.EventHubError
"""
if self._is_iothub and not self._iothub_redirect_info:
await self._iothub_redirect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class EventHubConsumer(ConsumerProducerMixin): # pylint:disable=too-many-instan
group to be actively reading events from the partition. These non-exclusive consumers are
sometimes referred to as "Non-Epoch Consumers."

Please use the method `create_consumer` on `EventHubClient` for creating `EventHubConsumer`.
"""
_timeout = 0
_epoch_symbol = b'com.microsoft:epoch'
Expand All @@ -51,8 +52,8 @@ def __init__( # pylint: disable=super-init-not-called
:param prefetch: The number of events to prefetch from the service
for processing. Default is 300.
:type prefetch: int
:param owner_level: The priority of the exclusive consumer. It will an exclusive
consumer if owner_level is set.
:param owner_level: The priority of the exclusive consumer. An exclusive
consumer will be created if owner_level is set.
:type owner_level: int
:param loop: An event loop.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

class EventProcessor(object): # pylint:disable=too-many-instance-attributes
"""
An EventProcessor constantly receives events from all partitions of the Event Hub in the context of a given
An EventProcessor constantly receives events from multiple partitions of the Event Hub in the context of a given
consumer group. The received data will be sent to PartitionProcessor to be processed.

It provides the user a convenient way to receive events from multiple partitions and save checkpoints.
Expand Down Expand Up @@ -90,11 +90,13 @@ def __init__(
:type consumer_group_name: str
:param partition_processor_type: A subclass type of ~azure.eventhub.eventprocessor.PartitionProcessor.
:type partition_processor_type: type
:param partition_manager: Interacts with the storage system, dealing with ownership and checkpoints.
For an easy start, SamplePartitionManager comes with the package.
:type partition_manager: Class implementing the ~azure.eventhub.eventprocessor.PartitionManager.
:param partition_manager: Interacts with the data storage that stores ownership and checkpoints data.
~azure.eventhub.aio.eventprocessor.SamplePartitionManager demonstrates the basic usage of `PartitionManager`
which stores data in memory or a file.
Users can either use the provided `PartitionManager` plug-ins or develop their own `PartitionManager`.
:type partition_manager: Subclass of ~azure.eventhub.eventprocessor.PartitionManager.
:param initial_event_position: The event position to start a partition consumer.
if the partition has no checkpoint yet. This will be replaced by "reset" checkpoint in the near future.
if the partition has no checkpoint yet. This could be replaced by "reset" checkpoint in the near future.
:type initial_event_position: EventPosition
:param polling_interval: The interval between any two pollings of balancing and claiming
:type polling_interval: float
Expand All @@ -119,13 +121,8 @@ def __repr__(self):
async def start(self):
"""Start the EventProcessor.

1. Calls the OwnershipManager to keep claiming and balancing ownership of partitions in an
infinitely loop until self.stop() is called.
2. Cancels tasks for partitions that are no longer owned by this EventProcessor
3. Creates tasks for partitions that are newly claimed by this EventProcessor
4. Keeps tasks running for partitions that haven't changed ownership
5. Each task repeatedly calls EvenHubConsumer.receive() to retrieve events and
call user defined partition processor
The EventProcessor will try to claim and balance partition ownership with other `EventProcessor`
and asynchronously start receiving EventData from EventHub and processing events.

:return: None

Expand All @@ -145,23 +142,26 @@ async def start(self):
await asyncio.sleep(self._polling_interval)
continue

to_cancel_list = self._tasks.keys()
if claimed_ownership_list:
claimed_partition_ids = [x["partition_id"] for x in claimed_ownership_list]
to_cancel_list = self._tasks.keys() - claimed_partition_ids
self._create_tasks_for_claimed_ownership(claimed_ownership_list)
else:
to_cancel_list = set(self._tasks.keys())
log.info("EventProcessor %r hasn't claimed an ownership. It keeps claiming.", self._id)
if to_cancel_list:
self._cancel_tasks_for_partitions(to_cancel_list)
log.info("EventProcesor %r has cancelled partitions %r", self._id, to_cancel_list)
await asyncio.sleep(self._polling_interval)

async def stop(self):
"""Stop claiming ownership and all the partition consumers owned by this EventProcessor
"""Stop the EventProcessor.

This method stops claiming ownership of owned partitions and cancels tasks that are running
EventHubConsumer.receive() for the partitions owned by this EventProcessor.
The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions
it is working on.
Other running EventProcessor will take over these released partitions.

A stopped EventProcessor can be restarted by calling method `start` again.

:return: None

Expand All @@ -182,7 +182,7 @@ def _cancel_tasks_for_partitions(self, to_cancel_partitions):
def _create_tasks_for_claimed_ownership(self, to_claim_ownership_list):
for ownership in to_claim_ownership_list:
partition_id = ownership["partition_id"]
if partition_id not in self._tasks:
if partition_id not in self._tasks or self._tasks[partition_id].done():
self._tasks[partition_id] = get_running_loop().create_task(self._receive(ownership))

async def _receive(self, ownership):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class PartitionManager(ABC):
"""
PartitionManager deals with the interaction with the chosen storage service.
It's able to list/claim ownership and create checkpoint.
It's able to list/claim ownership and save checkpoint.
"""

@abstractmethod
Expand Down Expand Up @@ -76,11 +76,11 @@ async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_
will be associated with.
:type sequence_number: int
:return: None
:raise: `OwnershipLostError`, `CheckpointError`
:raise: `OwnershipLostError`
"""


class OwnershipLostError(Exception):
"""Raises when update_checkpoint detects the ownership has been lost
"""Raises when update_checkpoint detects the ownership to a partition has been lost

"""
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ class PartitionProcessor(ABC):
"""
PartitionProcessor processes events received from the Azure Event Hubs service. A single instance of a class
implementing this abstract class will be created for every partition the associated
~azure.eventhub.eventprocessor.EventProcessor owns.
~azure.eventhub.aio.eventprocessor.EventProcessor owns.

"""

async def initialize(self, partition_context: PartitionContext):
"""
"""This method will be called when `EventProcessor` creates a `PartitionProcessor`.

:param partition_context: The context information of this partition.
:type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext
"""

# Please put the code for initialization of PartitionProcessor here.

async def close(self, reason, partition_context: PartitionContext):
"""Called when EventProcessor stops processing this PartitionProcessor.

Expand All @@ -46,6 +48,8 @@ async def close(self, reason, partition_context: PartitionContext):

"""

# Please put the code for closing PartitionProcessor here.

@abstractmethod
async def process_events(self, events: List[EventData], partition_context: PartitionContext):
"""Called when a batch of events have been received.
Expand All @@ -58,8 +62,10 @@ async def process_events(self, events: List[EventData], partition_context: Parti

"""

# Please put the code for processing events here.

async def process_error(self, error, partition_context: PartitionContext):
"""Called when an error happens
"""Called when an error happens when receiving or processing events

:param error: The error that happens.
:type error: Exception
Expand All @@ -68,3 +74,5 @@ async def process_error(self, error, partition_context: PartitionContext):
:type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext

"""

# Please put the code for processing error here.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class EventHubProducer(ConsumerProducerMixin): # pylint: disable=too-many-insta
be created to allow event data to be automatically routed to an available partition or specific
to a partition.

Please use the method `create_producer` on `EventHubClient` for creating `EventHubProducer`.
"""
_timeout_symbol = b'com.microsoft:timeout'

Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def get_properties(self):
-'partition_ids'

:rtype: dict
:raises: ~azure.eventhub.ConnectError
:raises: ~azure.eventhub.EventHubError
"""
if self._is_iothub and not self._iothub_redirect_info:
self._iothub_redirect()
Expand All @@ -188,7 +188,7 @@ def get_partition_ids(self):
Get partition ids of the specified EventHub.

:rtype: list[str]
:raises: ~azure.eventhub.ConnectError
:raises: ~azure.eventhub.EventHubError
"""
return self.get_properties()['partition_ids']

Expand Down
Loading