Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Comment, Test Updates #9366

Merged
merged 8 commits into from
Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions sdk/eventhub/azure-eventhub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,6 @@ There, you can also find detailed instructions for using the Azure CLI, Azure Po

Interaction with Event Hubs starts with an instance of the EventHubClient class. You need the host name, SAS/AAD credential and event hub name to instantiate the client object.
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved

#### Obtain a connection string

For the Event Hubs client library to interact with an Event Hub, it will need to understand how to connect and authorize with it.
The easiest means for doing so is to use a connection string, which is created automatically when creating an Event Hubs namespace.
If you aren't familiar with shared access policies in Azure, you may wish to follow the step-by-step guide to [get an Event Hubs connection string](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string).

#### Create client

There are several ways to instantiate the EventHubClient object and the following code snippets demonstrate two ways:

**Create client from connection string:**

```python
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -62,6 +52,10 @@ consumer_client = EventHubConsumerClient.from_connection_string(connection_str,

```

- For the Event Hubs client library to interact with an Event Hub, it will need to understand how to connect and authorize with it.
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
The easiest means for doing so is to use a connection string, which is created automatically when creating an Event Hubs namespace.
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
If you aren't familiar with shared access policies in Azure, you may wish to follow the step-by-step guide to [get an Event Hubs connection string](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string).

- The `from_connection_string` method takes the connection string of the form
`Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey>` and
entity name to your Event Hub instance. You can get the connection string from the [Azure portal](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string#get-connection-string-from-the-portal).
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class EventDataBatch(object):
at which point a `ValueError` will be raised.
Use the `send_batch` method of :class:`EventHubProducerClient<azure.eventhub.EventHubProducerClient>`
or the async :class:`EventHubProducerClient<azure.eventhub.aio.EventHubProducerClient>`
for sending. The `create_batch` method accepts partition_key as a parameter for sending a particular partition.
for sending.

**Please use the create_batch method of EventHubProducerClient
to create an EventDataBatch object instead of instantiating an EventDataBatch object directly.**
Expand Down
13 changes: 6 additions & 7 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,23 @@


class EventHubConsumerClient(ClientBase):
"""The EventHubProducerClient class defines a high level interface for
"""The EventHubConsumerClient class defines a high level interface for
receiving events from the Azure Event Hubs service.

The main goal of `EventHubConsumerClient` is to receive events from all partitions of an EventHub with
load-balancing and checkpointing.

When multiple `EventHubConsumerClient` operate within one or more processes or machines targeting the same
checkpointing location, they will balance automatically.
To enable the load-balancing and / or checkpointing, checkpoint_store must be set when creating the
To enable load-balancing and persisted checkpoints, checkpoint_store must be set when creating the
`EventHubConsumerClient`.
If a checkpoint store is not provided, the checkpoint will be maintained internally in memory.

An `EventHubConsumerClient` can also receive from a specific partition when you call its method `receive()`
and specify the partition_id.
Load-balancing won't work in single-partition mode. But users can still save checkpoints if the checkpoint_store
is set.
and specify the partition_id. Load-balancing won't work in single-partition receiving mode.

:param str fully_qualified_namespace: The fully qualified host name for the Event Hubs namespace.
This is likely to be similar to <yournamespace>.servicebus.windows.net
The namespace format is: `<yournamespace>.servicebus.windows.net`.
:param str eventhub_name: The path of the specific Event Hub to connect the client to.
:param str consumer_group: Receive events from the event hub for this consumer group.
:param ~azure.core.credentials.TokenCredential credential: The credential object used for authentication which
Expand All @@ -66,7 +65,7 @@ class EventHubConsumerClient(ClientBase):
The failed internal partition consumer will be closed (`on_partition_close` will be called if provided) and
new internal partition consumer will be created (`on_partition_initialize` will be called if provided) to resume
receiving.
:keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close
:keyword float idle_timeout: Timeout in seconds, after which the underlying connection will close
if there is no further activity. By default the value is None, meaning that the service determines when to
close an idle connection.
:keyword transport_type: The type of transport protocol that will be used for communicating with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class CheckpointStore(object):
"""CheckpointStore deals with the interaction with the chosen storage service.

It can list and claim partition ownerships; and list and save checkpoints.
It can list and claim partition ownerships as well as list and save checkpoints.
"""

@abstractmethod
Expand All @@ -19,14 +19,14 @@ def list_ownership(self, fully_qualified_namespace, eventhub_name, consumer_grou
"""Retrieves a complete ownership list from the chosen storage service.

:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
:param str eventhub_name: The name of the specific Event Hub the partition ownerships are associated with,
relative to the Event Hubs namespace that contains it.
:param str consumer_group: The name of the consumer group the ownerships are associated with.
:rtype: Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information:

- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
Expand All @@ -46,7 +46,7 @@ def claim_ownership(self, ownership_list):
:rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information:

- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
Expand All @@ -69,7 +69,7 @@ def update_checkpoint(self, checkpoint):
:param Dict[str,Any] checkpoint: A dict containing checkpoint information:

- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the checkpoint is associated with.
Expand All @@ -90,14 +90,14 @@ def list_checkpoints(
"""List the updated checkpoints from the store.

:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
:param str eventhub_name: The name of the specific Event Hub the checkpoints are associated with, relative to
the Event Hubs namespace that contains it.
:param str consumer_group: The name of the consumer group the checkpoints are associated with.
:rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition checkpoint information:

- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoints are associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the checkpoints are associated with.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ def update_checkpoint(self, event):
# type: (EventData) -> None
"""Updates the receive checkpoint to the given events offset.

This operation will only update a checkpoint if a `checkpoint_store` was provided during
creation of the `EventHubConsumerClient`. Otherwise a warning will be logged.

:param ~azure.eventhub.EventData event: The EventData instance which contains the offset and
sequence number information used for checkpoint.
:rtype: None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,23 @@


class EventHubConsumerClient(ClientBaseAsync):
"""The EventHubProducerClient class defines a high level interface for
"""The EventHubConsumerClient class defines a high level interface for
receiving events from the Azure Event Hubs service.

The main goal of `EventHubConsumerClient` is to receive events from all partitions of an EventHub with
load-balancing and checkpointing.

When multiple `EventHubConsumerClient` operate within one or more processes or machines targeting the same
checkpointing location, they will balance automatically.
To enable the load-balancing and / or checkpointing, checkpoint_store must be set when creating the
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
To enable load-balancing and persisted checkpoints, checkpoint_store must be set when creating the
`EventHubConsumerClient`.
If a checkpoint store is not provided, the checkpoint will be maintained internally in memory.

An `EventHubConsumerClient` can also receive from a specific partition when you call its method `receive()`
and specify the partition_id.
Load-balancing won't work in single-partition mode. But users can still save checkpoints if the checkpoint_store
is set.
and specify the partition_id. Load-balancing won't work in single-partition receiving mode.

:param str fully_qualified_namespace: The fully qualified host name for the Event Hubs namespace.
This is likely to be similar to <yournamespace>.servicebus.windows.net
The namespace format is: `<yournamespace>.servicebus.windows.net`.
:param str eventhub_name: The path of the specific Event Hub to connect the client to.
:param str consumer_group: Receive events from the event hub for this consumer group.
:param ~azure.core.credentials.TokenCredential credential: The credential object used for authentication which
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class CheckpointStore(ABC):
"""CheckpointStore deals with the interaction with the chosen storage service.

It can list and claim partition ownerships; and list and save checkpoints.
It can list and claim partition ownerships as well as list and save checkpoints.
"""

@abstractmethod
Expand All @@ -20,14 +20,14 @@ async def list_ownership(
"""Retrieves a complete ownership list from the chosen storage service.

:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
:param str eventhub_name: The name of the specific Event Hub the partition ownerships are associated with,
relative to the Event Hubs namespace that contains it.
:param str consumer_group: The name of the consumer group the ownerships are associated with.
:rtype: Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information:

- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
Expand All @@ -48,7 +48,7 @@ async def claim_ownership(
:rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information:

- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
Expand All @@ -72,7 +72,7 @@ async def update_checkpoint(
:param Dict[str,Any] checkpoint: A dict containing checkpoint information:

- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the checkpoint is associated with.
Expand All @@ -92,14 +92,14 @@ async def list_checkpoints(
"""List the updated checkpoints from the store.

:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
:param str eventhub_name: The name of the specific Event Hub the checkpoints are associated with, relative to
the Event Hubs namespace that contains it.
:param str consumer_group: The name of the consumer group the checkpoints are associated with.
:rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition checkpoint information:

- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net"
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoints are associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the checkpoints are associated with.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ def last_enqueued_event_properties(self) -> Optional[Dict[str, Any]]:
async def update_checkpoint(self, event: "EventData") -> None:
"""Updates the receive checkpoint to the given events offset.

This operation will only update a checkpoint if a `checkpoint_store` was provided during
creation of the `EventHubConsumerClient`. Otherwise a warning will be logged.

:param ~azure.eventhub.EventData event: The EventData instance which contains the offset and
sequence number information used for checkpoint.
:rtype: None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def on_event(partition_context, event):
assert len(
[checkpoint for checkpoint in checkpoints if checkpoint["sequence_number"] == on_event.sequence_number]) > 0

task.cancel()
await task


@pytest.mark.liveTest
Expand All @@ -67,7 +67,7 @@ async def on_event(partition_context, event):
client.receive(on_event, partition_id="0", starting_position="-1"))
await asyncio.sleep(10)
assert on_event.received == 1
task.cancel()
await task


@pytest.mark.liveTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async def on_error(partition_context, error):
assert on_error.called is True
finally:
await event_processor.stop()
task.cancel()
await task
await eventhub_client.close()


Expand Down Expand Up @@ -250,7 +250,7 @@ async def error_handler(partition_context, err):
await asyncio.sleep(2)
assert len(event_processor._tasks) == 2
await event_processor.stop()
task.cancel()
await task
await eventhub_client.close()
assert event_map['0'] >= 1 and event_map['1'] >= 1
assert checkpoint is not None
Expand Down Expand Up @@ -316,7 +316,7 @@ async def error_handler(partition_context, error):
task = asyncio.ensure_future(event_processor.start())
await asyncio.sleep(10)
await event_processor.stop()
# task.cancel()
await task
await asyncio.sleep(1)
await eventhub_client.close()
assert isinstance(error_handler.error, RuntimeError)
Expand Down Expand Up @@ -370,7 +370,7 @@ async def close(self):
task = asyncio.ensure_future(event_processor.start())
await asyncio.sleep(5)
await event_processor.stop()
task.cancel()
await task
assert isinstance(error_handler.error, EventHubError)
assert partition_close_handler.reason == CloseReason.OWNERSHIP_LOST

Expand Down Expand Up @@ -441,7 +441,7 @@ async def release_ownership(self, partition_id):
task = asyncio.ensure_future(event_processor.start())
await asyncio.sleep(5)
await event_processor.stop()
# task.cancel()
await task
assert partition_initialize_handler.called
assert event_handler.called
assert error_handler.called
Expand Down Expand Up @@ -702,7 +702,7 @@ async def partition_close_handler(partition_context, reason):
task = asyncio.ensure_future(event_processor.start())
await asyncio.sleep(10)
await event_processor.stop()
# task.cancel()
await task
await asyncio.sleep(1)
await eventhub_client.close()
assert partition_close_handler.called
Loading