diff --git a/sdk/eventhub/azure-eventhubs/HISTORY.md b/sdk/eventhub/azure-eventhubs/HISTORY.md index 34e9d718e7f1..c5af555047bc 100644 --- a/sdk/eventhub/azure-eventhubs/HISTORY.md +++ b/sdk/eventhub/azure-eventhubs/HISTORY.md @@ -1,18 +1,24 @@ # Release History + ## 5.0.0b3 (2019-09-10) **New features** -- `EventProcessor` has a load balancer that balances load among multiple EventProcessors automatically -- In addition to `SamplePartitionManager`, A new `PartitionManager` implementation that uses Azure Blob Storage is added -to centrally store the checkpoint data for event processors. It's not packaged separately as a plug-in to this package. -Refer to [Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) for details. + +- Added support for automatic load balancing among multiple `EventProcessor`. +- Added `BlobPartitionManager` which implements `PartitionManager`. + - Azure Blob Storage is applied for storing data used by `EventProcessor`. + - Packaged separately as a plug-in to `EventProcessor`. + - For details, please refer to [Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio). +- Added property `system_properties` on `EventData`. **Breaking changes** -- `PartitionProcessor` constructor removed argument "checkpoint_manager". Its methods (initialize, process_events, -process_error, close) added argument "partition_context", which has method update_checkpoint. -- `CheckpointManager` was replaced by `PartitionContext` -- Renamed `Sqlite3PartitionManager` to `SamplePartitionManager` +- Removed constructor method of `PartitionProcessor`. For initialization please implement the method `initialize`. +- Replaced `CheckpointManager` by `PartitionContext`. + - `PartitionContext` has partition context information and method `update_checkpoint`. +- Updated all methods of `PartitionProcessor` to include `PartitionContext` as part of the arguments. +- Updated accessibility of class members in `EventHub/EventHubConsumer/EventHubProducer`to be private. +- Moved `azure.eventhub.eventprocessor` under `aio` package, which now becomes `azure.eventhub.aio.eventprocessor`. ## 5.0.0b2 (2019-08-06) diff --git a/sdk/eventhub/azure-eventhubs/README.md b/sdk/eventhub/azure-eventhubs/README.md index 94280c2df912..6dc04539f0b8 100644 --- a/sdk/eventhub/azure-eventhubs/README.md +++ b/sdk/eventhub/azure-eventhubs/README.md @@ -9,7 +9,7 @@ The Azure Event Hubs client library allows for publishing and consuming of Azure - Observe interesting operations and interactions happening within your business or other ecosystem, allowing loosely coupled systems to interact without the need to bind them together. - Receive events from one or more publishers, transform them to better meet the needs of your ecosystem, then publish the transformed events to a new stream for consumers to observe. -[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) | [Package (PyPi)](https://pypi.org/project/azure-eventhub/) | [API reference documentation](https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub) | [Product documentation](https://docs.microsoft.com/en-ca/azure/event-hubs/) +[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) | [Package (PyPi)](https://pypi.org/project/azure-eventhub/) | [API reference documentation](https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub) | [Product documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) ## Getting started @@ -20,10 +20,6 @@ Install the Azure Event Hubs client library for Python with pip: ``` $ pip install --pre azure-eventhub ``` -For Python2.7, please install package "typing". This is a workaround for [issue 6767](https://github.com/Azure/azure-sdk-for-python/issues/6767). -``` -$ pip install typing -``` **Prerequisites** @@ -113,6 +109,8 @@ partition_ids = client.get_partition_ids() Publish events to an Event Hub. +#### Send a single event or an array of events + ```python from azure.eventhub import EventHubClient, EventData @@ -134,6 +132,34 @@ finally: pass ``` +#### Send a batch of events + +Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method. Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached. +```python +from azure.eventhub import EventHubClient, EventData + +try: + connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' + event_hub_path = '<< NAME OF THE EVENT HUB >>' + client = EventHubClient.from_connection_string(connection_str, event_hub_path) + producer = client.create_producer(partition_id="0") + + event_data_batch = producer.create_batch(max_size=10000) + can_add = True + while can_add: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + can_add = False # EventDataBatch object reaches max_size. + + with producer: + producer.send(event_data_batch) +except: + raise +finally: + pass +``` + ### Consume events from an Event Hub Consume events from an Event Hub. @@ -163,6 +189,7 @@ finally: Publish events to an Event Hub asynchronously. +#### Send a single event or an array of events ```python from azure.eventhub.aio import EventHubClient from azure.eventhub import EventData @@ -178,7 +205,37 @@ try: event_list.append(EventData(b"A single event")) async with producer: - await producer.send(event_list) + await producer.send(event_list) # Send a list of events + await producer.send(EventData(b"A single event")) # Send a single event +except: + raise +finally: + pass +``` + +#### Send a batch of events + +Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method. Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached. +```python +from azure.eventhub.aio import EventHubClient +from azure.eventhub import EventData + +try: + connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' + event_hub_path = '<< NAME OF THE EVENT HUB >>' + client = EventHubClient.from_connection_string(connection_str, event_hub_path) + producer = client.create_producer(partition_id="0") + + event_data_batch = await producer.create_batch(max_size=10000) + can_add = True + while can_add: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + can_add = False # EventDataBatch object reaches max_size. + + async with producer: + await producer.send(event_data_batch) except: raise finally: @@ -217,9 +274,11 @@ Using an `EventHubConsumer` to consume events like in the previous examples puts The `EventProcessor` will delegate the processing of events to a `PartitionProcessor` that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer operations including checkpointing and load balancing. +Load balancing is typically useful when running multiple instances of `EventProcessor` across multiple processes or even machines. It is recommended to store checkpoints to a persistent store when running in production. Search pypi with the prefix `azure-eventhubs-checkpoint` to find packages that support persistent storage of checkpoints. + You can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory. -[Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) is another `PartitionManager` implementation that allows multiple EventProcessors to share the load balancing and checkpoint data in a central storage. +[Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) is one of the `PartitionManager` implementation we provide that applies Azure Blob Storage as the persistent store. ```python @@ -242,7 +301,7 @@ class MyPartitionProcessor(PartitionProcessor): async def main(): client = EventHubClient.from_connection_string(connection_str, receive_timeout=5, retry_total=3) - partition_manager = SamplePartitionManager() # in-memory PartitionManager. + partition_manager = SamplePartitionManager() # in-memory or file based PartitionManager try: event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager) asyncio.ensure_future(event_processor.start()) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py index dfc198f71fa8..62b2a6b811d8 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py @@ -3,6 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- +__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore __version__ = "5.0.0b3" from uamqp import constants # type: ignore from azure.eventhub.common import EventData, EventDataBatch, EventPosition diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py index 67f6ab52dd30..88b693d157ec 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py @@ -166,7 +166,7 @@ async def get_properties(self): -'partition_ids' :rtype: dict - :raises: ~azure.eventhub.ConnectError + :raises: ~azure.eventhub.EventHubError """ if self._is_iothub and not self._iothub_redirect_info: await self._iothub_redirect() @@ -207,7 +207,7 @@ async def get_partition_properties(self, partition): :param partition: The target partition id. :type partition: str :rtype: dict - :raises: ~azure.eventhub.ConnectError + :raises: ~azure.eventhub.EventHubError """ if self._is_iothub and not self._iothub_redirect_info: await self._iothub_redirect() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 3aa1a7d6bbe5..efad6a3cb7db 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -31,6 +31,7 @@ class EventHubConsumer(ConsumerProducerMixin): # pylint:disable=too-many-instan group to be actively reading events from the partition. These non-exclusive consumers are sometimes referred to as "Non-Epoch Consumers." + Please use the method `create_consumer` on `EventHubClient` for creating `EventHubConsumer`. """ _timeout = 0 _epoch_symbol = b'com.microsoft:epoch' @@ -51,8 +52,8 @@ def __init__( # pylint: disable=super-init-not-called :param prefetch: The number of events to prefetch from the service for processing. Default is 300. :type prefetch: int - :param owner_level: The priority of the exclusive consumer. It will an exclusive - consumer if owner_level is set. + :param owner_level: The priority of the exclusive consumer. An exclusive + consumer will be created if owner_level is set. :type owner_level: int :param loop: An event loop. """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py index 85f6f1983250..18446249ff23 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -23,7 +23,7 @@ class EventProcessor(object): # pylint:disable=too-many-instance-attributes """ - An EventProcessor constantly receives events from all partitions of the Event Hub in the context of a given + An EventProcessor constantly receives events from multiple partitions of the Event Hub in the context of a given consumer group. The received data will be sent to PartitionProcessor to be processed. It provides the user a convenient way to receive events from multiple partitions and save checkpoints. @@ -90,11 +90,13 @@ def __init__( :type consumer_group_name: str :param partition_processor_type: A subclass type of ~azure.eventhub.eventprocessor.PartitionProcessor. :type partition_processor_type: type - :param partition_manager: Interacts with the storage system, dealing with ownership and checkpoints. - For an easy start, SamplePartitionManager comes with the package. - :type partition_manager: Class implementing the ~azure.eventhub.eventprocessor.PartitionManager. + :param partition_manager: Interacts with the data storage that stores ownership and checkpoints data. + ~azure.eventhub.aio.eventprocessor.SamplePartitionManager demonstrates the basic usage of `PartitionManager` + which stores data in memory or a file. + Users can either use the provided `PartitionManager` plug-ins or develop their own `PartitionManager`. + :type partition_manager: Subclass of ~azure.eventhub.eventprocessor.PartitionManager. :param initial_event_position: The event position to start a partition consumer. - if the partition has no checkpoint yet. This will be replaced by "reset" checkpoint in the near future. + if the partition has no checkpoint yet. This could be replaced by "reset" checkpoint in the near future. :type initial_event_position: EventPosition :param polling_interval: The interval between any two pollings of balancing and claiming :type polling_interval: float @@ -119,13 +121,8 @@ def __repr__(self): async def start(self): """Start the EventProcessor. - 1. Calls the OwnershipManager to keep claiming and balancing ownership of partitions in an - infinitely loop until self.stop() is called. - 2. Cancels tasks for partitions that are no longer owned by this EventProcessor - 3. Creates tasks for partitions that are newly claimed by this EventProcessor - 4. Keeps tasks running for partitions that haven't changed ownership - 5. Each task repeatedly calls EvenHubConsumer.receive() to retrieve events and - call user defined partition processor + The EventProcessor will try to claim and balance partition ownership with other `EventProcessor` + and asynchronously start receiving EventData from EventHub and processing events. :return: None @@ -145,12 +142,12 @@ async def start(self): await asyncio.sleep(self._polling_interval) continue - to_cancel_list = self._tasks.keys() if claimed_ownership_list: claimed_partition_ids = [x["partition_id"] for x in claimed_ownership_list] to_cancel_list = self._tasks.keys() - claimed_partition_ids self._create_tasks_for_claimed_ownership(claimed_ownership_list) else: + to_cancel_list = set(self._tasks.keys()) log.info("EventProcessor %r hasn't claimed an ownership. It keeps claiming.", self._id) if to_cancel_list: self._cancel_tasks_for_partitions(to_cancel_list) @@ -158,10 +155,13 @@ async def start(self): await asyncio.sleep(self._polling_interval) async def stop(self): - """Stop claiming ownership and all the partition consumers owned by this EventProcessor + """Stop the EventProcessor. - This method stops claiming ownership of owned partitions and cancels tasks that are running - EventHubConsumer.receive() for the partitions owned by this EventProcessor. + The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions + it is working on. + Other running EventProcessor will take over these released partitions. + + A stopped EventProcessor can be restarted by calling method `start` again. :return: None @@ -182,7 +182,7 @@ def _cancel_tasks_for_partitions(self, to_cancel_partitions): def _create_tasks_for_claimed_ownership(self, to_claim_ownership_list): for ownership in to_claim_ownership_list: partition_id = ownership["partition_id"] - if partition_id not in self._tasks: + if partition_id not in self._tasks or self._tasks[partition_id].done(): self._tasks[partition_id] = get_running_loop().create_task(self._receive(ownership)) async def _receive(self, ownership): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_manager.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_manager.py index 4bb84779dd53..bb55e00d52c5 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_manager.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_manager.py @@ -10,7 +10,7 @@ class PartitionManager(ABC): """ PartitionManager deals with the interaction with the chosen storage service. - It's able to list/claim ownership and create checkpoint. + It's able to list/claim ownership and save checkpoint. """ @abstractmethod @@ -76,11 +76,11 @@ async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_ will be associated with. :type sequence_number: int :return: None - :raise: `OwnershipLostError`, `CheckpointError` + :raise: `OwnershipLostError` """ class OwnershipLostError(Exception): - """Raises when update_checkpoint detects the ownership has been lost + """Raises when update_checkpoint detects the ownership to a partition has been lost """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_processor.py index 8b0fb2ca7e5c..a16be38e6220 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_processor.py @@ -21,17 +21,19 @@ class PartitionProcessor(ABC): """ PartitionProcessor processes events received from the Azure Event Hubs service. A single instance of a class implementing this abstract class will be created for every partition the associated - ~azure.eventhub.eventprocessor.EventProcessor owns. + ~azure.eventhub.aio.eventprocessor.EventProcessor owns. """ async def initialize(self, partition_context: PartitionContext): - """ + """This method will be called when `EventProcessor` creates a `PartitionProcessor`. :param partition_context: The context information of this partition. :type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext """ + # Please put the code for initialization of PartitionProcessor here. + async def close(self, reason, partition_context: PartitionContext): """Called when EventProcessor stops processing this PartitionProcessor. @@ -46,6 +48,8 @@ async def close(self, reason, partition_context: PartitionContext): """ + # Please put the code for closing PartitionProcessor here. + @abstractmethod async def process_events(self, events: List[EventData], partition_context: PartitionContext): """Called when a batch of events have been received. @@ -58,8 +62,10 @@ async def process_events(self, events: List[EventData], partition_context: Parti """ + # Please put the code for processing events here. + async def process_error(self, error, partition_context: PartitionContext): - """Called when an error happens + """Called when an error happens when receiving or processing events :param error: The error that happens. :type error: Exception @@ -68,3 +74,5 @@ async def process_error(self, error, partition_context: PartitionContext): :type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext """ + + # Please put the code for processing error here. diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 999bdc09c787..ec4e39c87116 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -26,6 +26,7 @@ class EventHubProducer(ConsumerProducerMixin): # pylint: disable=too-many-insta be created to allow event data to be automatically routed to an available partition or specific to a partition. + Please use the method `create_producer` on `EventHubClient` for creating `EventHubProducer`. """ _timeout_symbol = b'com.microsoft:timeout' diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index 90a1ac86742f..06d264b5b9ac 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -168,7 +168,7 @@ def get_properties(self): -'partition_ids' :rtype: dict - :raises: ~azure.eventhub.ConnectError + :raises: ~azure.eventhub.EventHubError """ if self._is_iothub and not self._iothub_redirect_info: self._iothub_redirect() @@ -188,7 +188,7 @@ def get_partition_ids(self): Get partition ids of the specified EventHub. :rtype: list[str] - :raises: ~azure.eventhub.ConnectError + :raises: ~azure.eventhub.EventHubError """ return self.get_properties()['partition_ids'] diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 7d4c8cd2712e..c6879730266c 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -258,7 +258,7 @@ def from_connection_string(cls, conn_str, **kwargs): will return as soon as service returns no new events. Default value is the same as prefetch. :type max_batch_size: int :param receive_timeout: The timeout in seconds to receive a batch of events from an Event Hub. - Default value is 0 seconds. + Default value is 0 seconds, meaning there is no timeout. :type receive_timeout: float :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout. diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 73fed892db11..5923d7f57972 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -39,7 +39,6 @@ def parse_sas_token(sas_token): class EventData(object): """ The EventData class is a holder of event content. - Acts as a wrapper to an uamqp.message.Message object. Example: .. literalinclude:: ../examples/test_examples_eventhub.py @@ -262,9 +261,15 @@ def encode_message(self): class EventDataBatch(object): """ - The EventDataBatch class is a holder of a batch of event data within max size bytes. - Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. - Do not instantiate an EventDataBatch object directly. + Sending events in batch get better performance than sending individual events. + EventDataBatch helps you create the maximum allowed size batch of `EventData` to improve sending performance. + + Use `try_add` method to add events until the maximum batch size limit in bytes has been reached - + a `ValueError` will be raised. + Use `send` method of ~azure.eventhub.EventHubProducer or ~azure.eventhub.aio.EventHubProducer for sending. + + Please use the `create_batch` method of `EventHubProducer` + to create an `EventDataBatch` object instead of instantiating an `EventDataBatch` object directly. """ def __init__(self, max_size=None, partition_key=None): @@ -307,8 +312,9 @@ def _set_partition_key(self, value): def try_add(self, event_data): """ The message size is a sum up of body, properties, header, etc. - :param event_data: - :return: + :param event_data: ~azure.eventhub.EventData + :return: None + :raise: ValueError, when exceeding the size limit. """ if event_data is None: log.warning("event_data is None when calling EventDataBatch.try_add. Ignored") diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 499c3ba5429e..604d9c7d7b82 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -33,6 +33,7 @@ class EventHubConsumer(ConsumerProducerMixin): # pylint:disable=too-many-instan group to be actively reading events from the partition. These non-exclusive consumers are sometimes referred to as "Non-Epoch Consumers." + Please use the method `create_consumer` on `EventHubClient` for creating `EventHubConsumer`. """ _timeout = 0 _epoch_symbol = b'com.microsoft:epoch' @@ -50,8 +51,8 @@ def __init__(self, client, source, **kwargs): :param prefetch: The number of events to prefetch from the service for processing. Default is 300. :type prefetch: int - :param owner_level: The priority of the exclusive consumer. It will an exclusive - consumer if owner_level is set. + :param owner_level: The priority of the exclusive consumer. An exclusive + consumer will be created if owner_level is set. :type owner_level: int """ event_position = kwargs.get("event_position", None) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 8008fac7ecd0..cab9638f2acc 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -39,6 +39,7 @@ class EventHubProducer(ConsumerProducerMixin): # pylint:disable=too-many-instan be created to allow event data to be automatically routed to an available partition or specific to a partition. + Please use the method `create_producer` on `EventHubClient` for creating `EventHubProducer`. """ _timeout_symbol = b'com.microsoft:timeout' diff --git a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py index e737ee6889d7..93e7e85b4287 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py @@ -61,12 +61,12 @@ def send(sender, args): total += 1 except ValueError: sender.send(batch, timeout=0) - print("Sent total {} of partition {}".format(total, sender.partition)) + print("Sent total {} of partition {}".format(total, sender._partition)) batch = sender.create_batch() except Exception as err: - print("Partition {} send failed {}".format(sender.partition, err)) + print("Partition {} send failed {}".format(sender._partition, err)) raise - print("Sent total {} of partition {}".format(total, sender.partition)) + print("Sent total {} of partition {}".format(total, sender._partition)) @pytest.mark.liveTest