Skip to content

Commit

Permalink
[ServiceBus] Sphinx/Docstring/Code snippets Fix (Azure#15059)
Browse files Browse the repository at this point in the history
* fix some sphinx warnings

* fix code snippet

* update code snippets for settlement methods
  • Loading branch information
yunhaoling authored Nov 9, 2020
1 parent f9b37ab commit 626ee68
Show file tree
Hide file tree
Showing 12 changed files with 495 additions and 388 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def message_id(self):
The identifier is a free-form string and can reflect a GUID or an identifier derived from the
application context. If enabled, the duplicate detection (see
`https://docs.microsoft.com/azure/service-bus-messaging/duplicate-detection`)
feature identifies and removes second and further submissions of messages with the same message id.
feature identifies and removes second and further submissions of messages with the same message id.
:rtype: str
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ def get_queue_receiver(self, queue_name, **kwargs):
:paramtype receive_mode: ~azure.servicebus.ReceiveMode
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout.
:keyword Optional[AutoLockRenewer] auto_lock_renewer: An AutoLockRenewer can be provided such that messages are
automatically registered on receipt. If the receiver is a session receiver, it will apply to the session
instead.
:keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer
can be provided such that messages are automatically registered on receipt. If the receiver is a session
receiver, it will apply to the session instead.
:keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
performance but increase the chance that messages will expire while they are cached if they're not
Expand Down Expand Up @@ -333,9 +333,9 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
:paramtype receive_mode: ~azure.servicebus.ReceiveMode
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout.
:keyword Optional[AutoLockRenewer] auto_lock_renewer: An AutoLockRenewer can be provided such that messages are
automatically registered on receipt. If the receiver is a session receiver, it will apply to the session
instead.
:keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer
can be provided such that messages are automatically registered on receipt. If the receiver is a session
receiver, it will apply to the session instead.
:keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
performance but increase the chance that messages will expire while they are cached if they're not
Expand Down
201 changes: 122 additions & 79 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class ServiceBusReceiver(BaseHandler, ReceiverMixin): # pylint: disable=too-man
The two primary channels for message receipt are `receive()` to make a single request for messages,
and `for message in receiver:` to continuously receive incoming messages in an ongoing fashion.
**Please use the `get_<queue/subscription>_receiver` method of ~azure.servicebus.ServiceBusClient to create a
ServiceBusReceiver instance.**
:ivar fully_qualified_namespace: The fully qualified host name for the Service Bus namespace.
The namespace format is: `<yournamespace>.servicebus.windows.net`.
:vartype fully_qualified_namespace: str
Expand Down Expand Up @@ -95,26 +98,16 @@ class ServiceBusReceiver(BaseHandler, ReceiverMixin): # pylint: disable=too-man
keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value).
Additionally the following keys may also be present: `'username', 'password'`.
:keyword str user_agent: If specified, this will be added in front of the built-in user agent string.
:keyword Optional[AutoLockRenewer] auto_lock_renewer: An AutoLockRenewer can be provided such that messages are
automatically registered on receipt. If the receiver is a session receiver, it will apply to the session
instead.
:keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer
can be provided such that messages are automatically registered on receipt. If the receiver is a session
receiver, it will apply to the session instead.
:keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
performance but increase the chance that messages will expire while they are cached if they're not
processed fast enough.
The default value is 0, meaning messages will be received from the service and processed one at a time.
In the case of prefetch_count being 0, `ServiceBusReceiver.receive` would try to cache `max_message_count`
(if provided) within its request to the service.
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START create_servicebus_receiver_sync]
:end-before: [END create_servicebus_receiver_sync]
:language: python
:dedent: 4
:caption: Create a new instance of the ServiceBusReceiver.
"""
def __init__(
self,
Expand Down Expand Up @@ -195,6 +188,72 @@ def _iter_next(self):
self._auto_lock_renewer.register(self, message)
return message

@classmethod
def _from_connection_string(
cls,
conn_str,
**kwargs
):
# type: (str, Any) -> ServiceBusReceiver
"""Create a ServiceBusReceiver from a connection string.
:param conn_str: The connection string of a Service Bus.
:type conn_str: str
:keyword str queue_name: The path of specific Service Bus Queue the client connects to.
:keyword str topic_name: The path of specific Service Bus Topic which contains the Subscription
the client connects to.
:keyword str subscription_name: The path of specific Service Bus Subscription under the
specified Topic the client connects to.
:keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options
are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given
lock period before they will be removed from the queue. Messages received with ReceiveAndDelete
will be immediately removed from the queue, and cannot be subsequently abandoned or re-received
if the client fails to process the message.
The default mode is PeekLock.
:paramtype receive_mode: ~azure.servicebus.ReceiveMode
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout.
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Service Bus service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.servicebus.TransportType
:keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value).
Additionally the following keys may also be present: `'username', 'password'`.
:keyword str user_agent: If specified, this will be added in front of the built-in user agent string.
:keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
performance but increase the chance that messages will expire while they are cached if they're not
processed fast enough.
The default value is 0, meaning messages will be received from the service and processed one at a time.
In the case of prefetch_count being 0, `ServiceBusReceiver.receive` would try to cache `max_message_count`
(if provided) within its request to the service.
:rtype: ~azure.servicebus.ServiceBusReceiver
:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START create_servicebus_receiver_from_conn_str_sync]
:end-before: [END create_servicebus_receiver_from_conn_str_sync]
:language: python
:dedent: 4
:caption: Create a new instance of the ServiceBusReceiver from connection string.
"""
constructor_args = cls._convert_connection_string_to_kwargs(
conn_str,
**kwargs
)
if kwargs.get("queue_name") and kwargs.get("subscription_name"):
raise ValueError("Queue entity does not have subscription.")

if kwargs.get("topic_name") and not kwargs.get("subscription_name"):
raise ValueError("Subscription name is missing for the topic. Please specify subscription_name.")
return cls(**constructor_args)

def _create_handler(self, auth):
# type: (AMQPAuth) -> None
self._handler = ReceiveClient(
Expand Down Expand Up @@ -416,72 +475,6 @@ def get_streaming_message_iter(self, max_wait_time=None):
raise ValueError("The max_wait_time must be greater than 0.")
return self._iter_contextual_wrapper(max_wait_time)

@classmethod
def _from_connection_string(
cls,
conn_str,
**kwargs
):
# type: (str, Any) -> ServiceBusReceiver
"""Create a ServiceBusReceiver from a connection string.
:param conn_str: The connection string of a Service Bus.
:type conn_str: str
:keyword str queue_name: The path of specific Service Bus Queue the client connects to.
:keyword str topic_name: The path of specific Service Bus Topic which contains the Subscription
the client connects to.
:keyword str subscription_name: The path of specific Service Bus Subscription under the
specified Topic the client connects to.
:keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options
are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given
lock period before they will be removed from the queue. Messages received with ReceiveAndDelete
will be immediately removed from the queue, and cannot be subsequently abandoned or re-received
if the client fails to process the message.
The default mode is PeekLock.
:paramtype receive_mode: ~azure.servicebus.ReceiveMode
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout.
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Service Bus service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.servicebus.TransportType
:keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value).
Additionally the following keys may also be present: `'username', 'password'`.
:keyword str user_agent: If specified, this will be added in front of the built-in user agent string.
:keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
performance but increase the chance that messages will expire while they are cached if they're not
processed fast enough.
The default value is 0, meaning messages will be received from the service and processed one at a time.
In the case of prefetch_count being 0, `ServiceBusReceiver.receive` would try to cache `max_message_count`
(if provided) within its request to the service.
:rtype: ~azure.servicebus.ServiceBusReceiver
:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START create_servicebus_receiver_from_conn_str_sync]
:end-before: [END create_servicebus_receiver_from_conn_str_sync]
:language: python
:dedent: 4
:caption: Create a new instance of the ServiceBusReceiver from connection string.
"""
constructor_args = cls._convert_connection_string_to_kwargs(
conn_str,
**kwargs
)
if kwargs.get("queue_name") and kwargs.get("subscription_name"):
raise ValueError("Queue entity does not have subscription.")

if kwargs.get("topic_name") and not kwargs.get("subscription_name"):
raise ValueError("Subscription name is missing for the topic. Please specify subscription_name.")
return cls(**constructor_args)

def receive_messages(self, max_message_count=1, max_wait_time=None):
# type: (Optional[int], Optional[float]) -> List[ServiceBusReceivedMessage]
"""Receive a batch of messages at once.
Expand Down Expand Up @@ -651,6 +644,16 @@ def complete_message(self, message):
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START complete_message_sync]
:end-before: [END complete_message_sync]
:language: python
:dedent: 4
:caption: Complete a received message.
"""
self._settle_message_with_retry(message, MESSAGE_COMPLETE)

Expand All @@ -665,6 +668,16 @@ def abandon_message(self, message):
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START abandon_message_sync]
:end-before: [END abandon_message_sync]
:language: python
:dedent: 4
:caption: Abandon a received message.
"""
self._settle_message_with_retry(message, MESSAGE_ABANDON)

Expand All @@ -680,6 +693,16 @@ def defer_message(self, message):
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START defer_message_sync]
:end-before: [END defer_message_sync]
:language: python
:dedent: 4
:caption: Defer a received message.
"""
self._settle_message_with_retry(message, MESSAGE_DEFER)

Expand All @@ -698,6 +721,16 @@ def dead_letter_message(self, message, reason=None, error_description=None):
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START dead_letter_message_sync]
:end-before: [END dead_letter_message_sync]
:language: python
:dedent: 4
:caption: Dead letter a received message.
"""
self._settle_message_with_retry(
message,
Expand Down Expand Up @@ -729,6 +762,16 @@ def renew_message_lock(self, message, **kwargs):
:raises: TypeError if the message is sessionful.
:raises: ~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled.
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START renew_message_lock_sync]
:end-before: [END renew_message_lock_sync]
:language: python
:dedent: 4
:caption: Renew the lock on a received message.
"""
# type: ignore
try:
Expand Down
Loading

0 comments on commit 626ee68

Please sign in to comment.