Skip to content

Commit

Permalink
[EventHubs] Comment, Test Updates (#9366)
Browse files Browse the repository at this point in the history
* update docstring

* await task to prevent potential seg fault

* review update

* PM review update

* review update

* review update

* minor improvement

* minor improvement
  • Loading branch information
yunhaoling authored Jan 9, 2020
1 parent b658626 commit a621b9f
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 58 deletions.
15 changes: 5 additions & 10 deletions sdk/eventhub/azure-eventhub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,21 @@ There, you can also find detailed instructions for using the Azure CLI, Azure Po

### Authenticate the client

Interaction with Event Hubs starts with an instance of the EventHubClient class. You need the host name, SAS/AAD credential and event hub name to instantiate the client object.
Interaction with Event Hubs starts with an instance of EventHubConsumerClient or EventHubProducerClient class. You need either the host name, SAS/AAD credential and event hub name or a connection string to instantiate the client object.

#### Obtain a connection string
**Create client from connection string:**

For the Event Hubs client library to interact with an Event Hub, it will need to understand how to connect and authorize with it.
The easiest means for doing so is to use a connection string, which is created automatically when creating an Event Hubs namespace.
For the Event Hubs client library to interact with an Event Hub, the easiest means is to use a connection string, which is created automatically when creating an Event Hubs namespace.
If you aren't familiar with shared access policies in Azure, you may wish to follow the step-by-step guide to [get an Event Hubs connection string](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string).

#### Create client

There are several ways to instantiate the EventHubClient object and the following code snippets demonstrate two ways:

**Create client from connection string:**

```python
from azure.eventhub import EventHubConsumerClient
from azure.eventhub import EventHubConsumerClient, EventHubProducerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
producer_client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
consumer_client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

```
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class EventDataBatch(object):
at which point a `ValueError` will be raised.
Use the `send_batch` method of :class:`EventHubProducerClient<azure.eventhub.EventHubProducerClient>`
or the async :class:`EventHubProducerClient<azure.eventhub.aio.EventHubProducerClient>`
for sending. The `create_batch` method accepts partition_key as a parameter for sending a particular partition.
for sending.
**Please use the create_batch method of EventHubProducerClient
to create an EventDataBatch object instead of instantiating an EventDataBatch object directly.**
Expand Down
13 changes: 6 additions & 7 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,23 @@


class EventHubConsumerClient(ClientBase):
"""The EventHubProducerClient class defines a high level interface for
"""The EventHubConsumerClient class defines a high level interface for
receiving events from the Azure Event Hubs service.
The main goal of `EventHubConsumerClient` is to receive events from all partitions of an EventHub with
load-balancing and checkpointing.
When multiple `EventHubConsumerClient` operate within one or more processes or machines targeting the same
checkpointing location, they will balance automatically.
To enable the load-balancing and / or checkpointing, checkpoint_store must be set when creating the
To enable load-balancing and persisted checkpoints, checkpoint_store must be set when creating the
`EventHubConsumerClient`.
If a checkpoint store is not provided, the checkpoint will be maintained internally in memory.
An `EventHubConsumerClient` can also receive from a specific partition when you call its method `receive()`
and specify the partition_id.
Load-balancing won't work in single-partition mode. But users can still save checkpoints if the checkpoint_store
is set.
and specify the partition_id. Load-balancing won't work in single-partition receiving mode.
:param str fully_qualified_namespace: The fully qualified host name for the Event Hubs namespace.
This is likely to be similar to <yournamespace>.servicebus.windows.net
The namespace format is: `<yournamespace>.servicebus.windows.net`.
:param str eventhub_name: The path of the specific Event Hub to connect the client to.
:param str consumer_group: Receive events from the event hub for this consumer group.
:param ~azure.core.credentials.TokenCredential credential: The credential object used for authentication which
Expand All @@ -66,7 +65,7 @@ class EventHubConsumerClient(ClientBase):
The failed internal partition consumer will be closed (`on_partition_close` will be called if provided) and
new internal partition consumer will be created (`on_partition_initialize` will be called if provided) to resume
receiving.
:keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close
:keyword float idle_timeout: Timeout in seconds, after which the underlying connection will close
if there is no further activity. By default the value is None, meaning that the service determines when to
close an idle connection.
:keyword transport_type: The type of transport protocol that will be used for communicating with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class CheckpointStore(object):
"""CheckpointStore deals with the interaction with the chosen storage service.
It can list and claim partition ownerships; and list and save checkpoints.
It can list and claim partition ownerships as well as list and save checkpoints.
"""

@abstractmethod
Expand All @@ -19,14 +19,14 @@ def list_ownership(self, fully_qualified_namespace, eventhub_name, consumer_grou
"""Retrieves a complete ownership list from the chosen storage service.
:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
:param str eventhub_name: The name of the specific Event Hub the partition ownerships are associated with,
relative to the Event Hubs namespace that contains it.
:param str consumer_group: The name of the consumer group the ownerships are associated with.
:rtype: Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information:
- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
Expand All @@ -46,7 +46,7 @@ def claim_ownership(self, ownership_list):
:rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information:
- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
Expand All @@ -69,7 +69,7 @@ def update_checkpoint(self, checkpoint):
:param Dict[str,Any] checkpoint: A dict containing checkpoint information:
- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the checkpoint is associated with.
Expand All @@ -90,14 +90,14 @@ def list_checkpoints(
"""List the updated checkpoints from the store.
:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
:param str eventhub_name: The name of the specific Event Hub the checkpoints are associated with, relative to
the Event Hubs namespace that contains it.
:param str consumer_group: The name of the consumer group the checkpoints are associated with.
:rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition checkpoint information:
- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoints are associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the checkpoints are associated with.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ def update_checkpoint(self, event):
# type: (EventData) -> None
"""Updates the receive checkpoint to the given events offset.
This operation will only update a checkpoint if a `checkpoint_store` was provided during
creation of the `EventHubConsumerClient`. Otherwise a warning will be logged.
:param ~azure.eventhub.EventData event: The EventData instance which contains the offset and
sequence number information used for checkpoint.
:rtype: None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,23 @@


class EventHubConsumerClient(ClientBaseAsync):
"""The EventHubProducerClient class defines a high level interface for
"""The EventHubConsumerClient class defines a high level interface for
receiving events from the Azure Event Hubs service.
The main goal of `EventHubConsumerClient` is to receive events from all partitions of an EventHub with
load-balancing and checkpointing.
When multiple `EventHubConsumerClient` operate within one or more processes or machines targeting the same
checkpointing location, they will balance automatically.
To enable the load-balancing and / or checkpointing, checkpoint_store must be set when creating the
To enable load-balancing and persisted checkpoints, checkpoint_store must be set when creating the
`EventHubConsumerClient`.
If a checkpoint store is not provided, the checkpoint will be maintained internally in memory.
An `EventHubConsumerClient` can also receive from a specific partition when you call its method `receive()`
and specify the partition_id.
Load-balancing won't work in single-partition mode. But users can still save checkpoints if the checkpoint_store
is set.
and specify the partition_id. Load-balancing won't work in single-partition receiving mode.
:param str fully_qualified_namespace: The fully qualified host name for the Event Hubs namespace.
This is likely to be similar to <yournamespace>.servicebus.windows.net
The namespace format is: `<yournamespace>.servicebus.windows.net`.
:param str eventhub_name: The path of the specific Event Hub to connect the client to.
:param str consumer_group: Receive events from the event hub for this consumer group.
:param ~azure.core.credentials.TokenCredential credential: The credential object used for authentication which
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class CheckpointStore(ABC):
"""CheckpointStore deals with the interaction with the chosen storage service.
It can list and claim partition ownerships; and list and save checkpoints.
It can list and claim partition ownerships as well as list and save checkpoints.
"""

@abstractmethod
Expand All @@ -20,14 +20,14 @@ async def list_ownership(
"""Retrieves a complete ownership list from the chosen storage service.
:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
:param str eventhub_name: The name of the specific Event Hub the partition ownerships are associated with,
relative to the Event Hubs namespace that contains it.
:param str consumer_group: The name of the consumer group the ownerships are associated with.
:rtype: Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information:
- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
Expand All @@ -48,7 +48,7 @@ async def claim_ownership(
:rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information:
- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
Expand All @@ -72,7 +72,7 @@ async def update_checkpoint(
:param Dict[str,Any] checkpoint: A dict containing checkpoint information:
- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the checkpoint is associated with.
Expand All @@ -92,14 +92,14 @@ async def list_checkpoints(
"""List the updated checkpoints from the store.
:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
:param str eventhub_name: The name of the specific Event Hub the checkpoints are associated with, relative to
the Event Hubs namespace that contains it.
:param str consumer_group: The name of the consumer group the checkpoints are associated with.
:rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition checkpoint information:
- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoints are associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the checkpoints are associated with.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ def last_enqueued_event_properties(self) -> Optional[Dict[str, Any]]:
async def update_checkpoint(self, event: "EventData") -> None:
"""Updates the receive checkpoint to the given events offset.
This operation will only update a checkpoint if a `checkpoint_store` was provided during
creation of the `EventHubConsumerClient`. Otherwise a warning will be logged.
:param ~azure.eventhub.EventData event: The EventData instance which contains the offset and
sequence number information used for checkpoint.
:rtype: None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def on_event(partition_context, event):
assert len(
[checkpoint for checkpoint in checkpoints if checkpoint["sequence_number"] == on_event.sequence_number]) > 0

task.cancel()
await task


@pytest.mark.liveTest
Expand All @@ -67,7 +67,7 @@ async def on_event(partition_context, event):
client.receive(on_event, partition_id="0", starting_position="-1"))
await asyncio.sleep(10)
assert on_event.received == 1
task.cancel()
await task


@pytest.mark.liveTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async def on_error(partition_context, error):
assert on_error.called is True
finally:
await event_processor.stop()
task.cancel()
await task
await eventhub_client.close()


Expand Down Expand Up @@ -250,7 +250,7 @@ async def error_handler(partition_context, err):
await asyncio.sleep(2)
assert len(event_processor._tasks) == 2
await event_processor.stop()
task.cancel()
await task
await eventhub_client.close()
assert event_map['0'] >= 1 and event_map['1'] >= 1
assert checkpoint is not None
Expand Down Expand Up @@ -316,7 +316,7 @@ async def error_handler(partition_context, error):
task = asyncio.ensure_future(event_processor.start())
await asyncio.sleep(10)
await event_processor.stop()
# task.cancel()
await task
await asyncio.sleep(1)
await eventhub_client.close()
assert isinstance(error_handler.error, RuntimeError)
Expand Down Expand Up @@ -370,7 +370,7 @@ async def close(self):
task = asyncio.ensure_future(event_processor.start())
await asyncio.sleep(5)
await event_processor.stop()
task.cancel()
await task
assert isinstance(error_handler.error, EventHubError)
assert partition_close_handler.reason == CloseReason.OWNERSHIP_LOST

Expand Down Expand Up @@ -441,7 +441,7 @@ async def release_ownership(self, partition_id):
task = asyncio.ensure_future(event_processor.start())
await asyncio.sleep(5)
await event_processor.stop()
# task.cancel()
await task
assert partition_initialize_handler.called
assert event_handler.called
assert error_handler.called
Expand Down Expand Up @@ -702,7 +702,7 @@ async def partition_close_handler(partition_context, reason):
task = asyncio.ensure_future(event_processor.start())
await asyncio.sleep(10)
await event_processor.stop()
# task.cancel()
await task
await asyncio.sleep(1)
await eventhub_client.close()
assert partition_close_handler.called
Loading

0 comments on commit a621b9f

Please sign in to comment.