diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index f2b1600b14a2..7e9f658e6bf8 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -347,7 +347,7 @@ def message_id(self): The identifier is a free-form string and can reflect a GUID or an identifier derived from the application context. If enabled, the duplicate detection (see `https://docs.microsoft.com/azure/service-bus-messaging/duplicate-detection`) - feature identifies and removes second and further submissions of messages with the same message id. + feature identifies and removes second and further submissions of messages with the same message id. :rtype: str """ diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index 7aa8d69a6dc0..cbf23d6bffb7 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -224,9 +224,9 @@ def get_queue_receiver(self, queue_name, **kwargs): :paramtype receive_mode: ~azure.servicebus.ReceiveMode :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the receiver will automatically stop receiving. The default value is None, meaning no timeout. - :keyword Optional[AutoLockRenewer] auto_lock_renewer: An AutoLockRenewer can be provided such that messages are - automatically registered on receipt. If the receiver is a session receiver, it will apply to the session - instead. + :keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer + can be provided such that messages are automatically registered on receipt. If the receiver is a session + receiver, it will apply to the session instead. :keyword int prefetch_count: The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they're not @@ -333,9 +333,9 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): :paramtype receive_mode: ~azure.servicebus.ReceiveMode :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the receiver will automatically stop receiving. The default value is None, meaning no timeout. - :keyword Optional[AutoLockRenewer] auto_lock_renewer: An AutoLockRenewer can be provided such that messages are - automatically registered on receipt. If the receiver is a session receiver, it will apply to the session - instead. + :keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer + can be provided such that messages are automatically registered on receipt. If the receiver is a session + receiver, it will apply to the session instead. :keyword int prefetch_count: The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they're not diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index d10c2a619c47..ba51b94089aa 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -61,6 +61,9 @@ class ServiceBusReceiver(BaseHandler, ReceiverMixin): # pylint: disable=too-man The two primary channels for message receipt are `receive()` to make a single request for messages, and `for message in receiver:` to continuously receive incoming messages in an ongoing fashion. + **Please use the `get__receiver` method of ~azure.servicebus.ServiceBusClient to create a + ServiceBusReceiver instance.** + :ivar fully_qualified_namespace: The fully qualified host name for the Service Bus namespace. The namespace format is: `.servicebus.windows.net`. :vartype fully_qualified_namespace: str @@ -95,9 +98,9 @@ class ServiceBusReceiver(BaseHandler, ReceiverMixin): # pylint: disable=too-man keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). Additionally the following keys may also be present: `'username', 'password'`. :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. - :keyword Optional[AutoLockRenewer] auto_lock_renewer: An AutoLockRenewer can be provided such that messages are - automatically registered on receipt. If the receiver is a session receiver, it will apply to the session - instead. + :keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer + can be provided such that messages are automatically registered on receipt. If the receiver is a session + receiver, it will apply to the session instead. :keyword int prefetch_count: The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they're not @@ -105,16 +108,6 @@ class ServiceBusReceiver(BaseHandler, ReceiverMixin): # pylint: disable=too-man The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch_count being 0, `ServiceBusReceiver.receive` would try to cache `max_message_count` (if provided) within its request to the service. - - .. admonition:: Example: - - .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py - :start-after: [START create_servicebus_receiver_sync] - :end-before: [END create_servicebus_receiver_sync] - :language: python - :dedent: 4 - :caption: Create a new instance of the ServiceBusReceiver. - """ def __init__( self, @@ -195,6 +188,72 @@ def _iter_next(self): self._auto_lock_renewer.register(self, message) return message + @classmethod + def _from_connection_string( + cls, + conn_str, + **kwargs + ): + # type: (str, Any) -> ServiceBusReceiver + """Create a ServiceBusReceiver from a connection string. + + :param conn_str: The connection string of a Service Bus. + :type conn_str: str + :keyword str queue_name: The path of specific Service Bus Queue the client connects to. + :keyword str topic_name: The path of specific Service Bus Topic which contains the Subscription + the client connects to. + :keyword str subscription_name: The path of specific Service Bus Subscription under the + specified Topic the client connects to. + :keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options + are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given + lock period before they will be removed from the queue. Messages received with ReceiveAndDelete + will be immediately removed from the queue, and cannot be subsequently abandoned or re-received + if the client fails to process the message. + The default mode is PeekLock. + :paramtype receive_mode: ~azure.servicebus.ReceiveMode + :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the + receiver will automatically stop receiving. The default value is None, meaning no timeout. + :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. + :keyword transport_type: The type of transport protocol that will be used for communicating with + the Service Bus service. Default is `TransportType.Amqp`. + :paramtype transport_type: ~azure.servicebus.TransportType + :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). + Additionally the following keys may also be present: `'username', 'password'`. + :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. + :keyword int prefetch_count: The maximum number of messages to cache with each request to the service. + This setting is only for advanced performance tuning. Increasing this value will improve message throughput + performance but increase the chance that messages will expire while they are cached if they're not + processed fast enough. + The default value is 0, meaning messages will be received from the service and processed one at a time. + In the case of prefetch_count being 0, `ServiceBusReceiver.receive` would try to cache `max_message_count` + (if provided) within its request to the service. + :rtype: ~azure.servicebus.ServiceBusReceiver + + :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. + :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. + + .. admonition:: Example: + + .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py + :start-after: [START create_servicebus_receiver_from_conn_str_sync] + :end-before: [END create_servicebus_receiver_from_conn_str_sync] + :language: python + :dedent: 4 + :caption: Create a new instance of the ServiceBusReceiver from connection string. + + """ + constructor_args = cls._convert_connection_string_to_kwargs( + conn_str, + **kwargs + ) + if kwargs.get("queue_name") and kwargs.get("subscription_name"): + raise ValueError("Queue entity does not have subscription.") + + if kwargs.get("topic_name") and not kwargs.get("subscription_name"): + raise ValueError("Subscription name is missing for the topic. Please specify subscription_name.") + return cls(**constructor_args) + def _create_handler(self, auth): # type: (AMQPAuth) -> None self._handler = ReceiveClient( @@ -416,72 +475,6 @@ def get_streaming_message_iter(self, max_wait_time=None): raise ValueError("The max_wait_time must be greater than 0.") return self._iter_contextual_wrapper(max_wait_time) - @classmethod - def _from_connection_string( - cls, - conn_str, - **kwargs - ): - # type: (str, Any) -> ServiceBusReceiver - """Create a ServiceBusReceiver from a connection string. - - :param conn_str: The connection string of a Service Bus. - :type conn_str: str - :keyword str queue_name: The path of specific Service Bus Queue the client connects to. - :keyword str topic_name: The path of specific Service Bus Topic which contains the Subscription - the client connects to. - :keyword str subscription_name: The path of specific Service Bus Subscription under the - specified Topic the client connects to. - :keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options - are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given - lock period before they will be removed from the queue. Messages received with ReceiveAndDelete - will be immediately removed from the queue, and cannot be subsequently abandoned or re-received - if the client fails to process the message. - The default mode is PeekLock. - :paramtype receive_mode: ~azure.servicebus.ReceiveMode - :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the - receiver will automatically stop receiving. The default value is None, meaning no timeout. - :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. - :keyword transport_type: The type of transport protocol that will be used for communicating with - the Service Bus service. Default is `TransportType.Amqp`. - :paramtype transport_type: ~azure.servicebus.TransportType - :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following - keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). - Additionally the following keys may also be present: `'username', 'password'`. - :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. - :keyword int prefetch_count: The maximum number of messages to cache with each request to the service. - This setting is only for advanced performance tuning. Increasing this value will improve message throughput - performance but increase the chance that messages will expire while they are cached if they're not - processed fast enough. - The default value is 0, meaning messages will be received from the service and processed one at a time. - In the case of prefetch_count being 0, `ServiceBusReceiver.receive` would try to cache `max_message_count` - (if provided) within its request to the service. - :rtype: ~azure.servicebus.ServiceBusReceiver - - :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. - :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. - - .. admonition:: Example: - - .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py - :start-after: [START create_servicebus_receiver_from_conn_str_sync] - :end-before: [END create_servicebus_receiver_from_conn_str_sync] - :language: python - :dedent: 4 - :caption: Create a new instance of the ServiceBusReceiver from connection string. - - """ - constructor_args = cls._convert_connection_string_to_kwargs( - conn_str, - **kwargs - ) - if kwargs.get("queue_name") and kwargs.get("subscription_name"): - raise ValueError("Queue entity does not have subscription.") - - if kwargs.get("topic_name") and not kwargs.get("subscription_name"): - raise ValueError("Subscription name is missing for the topic. Please specify subscription_name.") - return cls(**constructor_args) - def receive_messages(self, max_message_count=1, max_wait_time=None): # type: (Optional[int], Optional[float]) -> List[ServiceBusReceivedMessage] """Receive a batch of messages at once. @@ -651,6 +644,16 @@ def complete_message(self, message): :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails. + + .. admonition:: Example: + + .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py + :start-after: [START complete_message_sync] + :end-before: [END complete_message_sync] + :language: python + :dedent: 4 + :caption: Complete a received message. + """ self._settle_message_with_retry(message, MESSAGE_COMPLETE) @@ -665,6 +668,16 @@ def abandon_message(self, message): :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails. + + .. admonition:: Example: + + .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py + :start-after: [START abandon_message_sync] + :end-before: [END abandon_message_sync] + :language: python + :dedent: 4 + :caption: Abandon a received message. + """ self._settle_message_with_retry(message, MESSAGE_ABANDON) @@ -680,6 +693,16 @@ def defer_message(self, message): :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails. + + .. admonition:: Example: + + .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py + :start-after: [START defer_message_sync] + :end-before: [END defer_message_sync] + :language: python + :dedent: 4 + :caption: Defer a received message. + """ self._settle_message_with_retry(message, MESSAGE_DEFER) @@ -698,6 +721,16 @@ def dead_letter_message(self, message, reason=None, error_description=None): :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails. + + .. admonition:: Example: + + .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py + :start-after: [START dead_letter_message_sync] + :end-before: [END dead_letter_message_sync] + :language: python + :dedent: 4 + :caption: Dead letter a received message. + """ self._settle_message_with_retry( message, @@ -729,6 +762,16 @@ def renew_message_lock(self, message, **kwargs): :raises: TypeError if the message is sessionful. :raises: ~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired. :raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled. + + .. admonition:: Example: + + .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py + :start-after: [START renew_message_lock_sync] + :end-before: [END renew_message_lock_sync] + :language: python + :dedent: 4 + :caption: Renew the lock on a received message. + """ # type: ignore try: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index a32cb7a5421f..a010b9548103 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -84,6 +84,9 @@ class ServiceBusSender(BaseHandler, SenderMixin): """The ServiceBusSender class defines a high level interface for sending messages to the Azure Service Bus Queue or Topic. + **Please use the `get__sender` method of ~azure.servicebus.ServiceBusClient to create a + ServiceBusSender instance.** + :ivar fully_qualified_namespace: The fully qualified host name for the Service Bus namespace. The namespace format is: `.servicebus.windows.net`. :vartype fully_qualified_namespace: str @@ -106,16 +109,6 @@ class ServiceBusSender(BaseHandler, SenderMixin): keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). Additionally the following keys may also be present: `'username', 'password'`. :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. - - .. admonition:: Example: - - .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py - :start-after: [START create_servicebus_sender_sync] - :end-before: [END create_servicebus_sender_sync] - :language: python - :dedent: 4 - :caption: Create a new instance of the ServiceBusSender. - """ def __init__( self, @@ -149,6 +142,51 @@ def __init__( self._create_attribute() self._connection = kwargs.get("connection") + @classmethod + def _from_connection_string( + cls, + conn_str, + **kwargs + ): + # type: (str, Any) -> ServiceBusSender + """Create a ServiceBusSender from a connection string. + + :param conn_str: The connection string of a Service Bus. + :type conn_str: str + :keyword str queue_name: The path of specific Service Bus Queue the client connects to. + Only one of queue_name or topic_name can be provided. + :keyword str topic_name: The path of specific Service Bus Topic the client connects to. + Only one of queue_name or topic_name can be provided. + :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. + :keyword transport_type: The type of transport protocol that will be used for communicating with + the Service Bus service. Default is `TransportType.Amqp`. + :paramtype transport_type: ~azure.servicebus.TransportType + :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). + Additionally the following keys may also be present: `'username', 'password'`. + :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. + + :rtype: ~azure.servicebus.ServiceBusSender + + :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. + :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. + + .. admonition:: Example: + + .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py + :start-after: [START create_servicebus_sender_from_conn_str_sync] + :end-before: [END create_servicebus_sender_from_conn_str_sync] + :language: python + :dedent: 4 + :caption: Create a new instance of the ServiceBusSender from connection string. + + """ + constructor_args = cls._convert_connection_string_to_kwargs( + conn_str, + **kwargs + ) + return cls(**constructor_args) + def _create_handler(self, auth): # type: (AMQPAuth) -> None self._handler = SendClient( @@ -196,6 +234,7 @@ def schedule_messages(self, messages, schedule_time_utc, **kwargs): # type: (Union[ServiceBusMessage, List[ServiceBusMessage]], datetime.datetime, Any) -> List[int] """Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. + :param messages: The message or list of messages to schedule. :type messages: Union[~azure.servicebus.ServiceBusMessage, List[~azure.servicebus.ServiceBusMessage]] :param schedule_time_utc: The utc date and time to enqueue the messages. @@ -267,51 +306,6 @@ def cancel_scheduled_messages(self, sequence_numbers, **kwargs): timeout=timeout ) - @classmethod - def _from_connection_string( - cls, - conn_str, - **kwargs - ): - # type: (str, Any) -> ServiceBusSender - """Create a ServiceBusSender from a connection string. - - :param conn_str: The connection string of a Service Bus. - :type conn_str: str - :keyword str queue_name: The path of specific Service Bus Queue the client connects to. - Only one of queue_name or topic_name can be provided. - :keyword str topic_name: The path of specific Service Bus Topic the client connects to. - Only one of queue_name or topic_name can be provided. - :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. - :keyword transport_type: The type of transport protocol that will be used for communicating with - the Service Bus service. Default is `TransportType.Amqp`. - :paramtype transport_type: ~azure.servicebus.TransportType - :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following - keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). - Additionally the following keys may also be present: `'username', 'password'`. - :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. - - :rtype: ~azure.servicebus.ServiceBusSender - - :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. - :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. - - .. admonition:: Example: - - .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py - :start-after: [START create_servicebus_sender_from_conn_str_sync] - :end-before: [END create_servicebus_sender_from_conn_str_sync] - :language: python - :dedent: 4 - :caption: Create a new instance of the ServiceBusSender from connection string. - - """ - constructor_args = cls._convert_connection_string_to_kwargs( - conn_str, - **kwargs - ) - return cls(**constructor_args) - def send_messages(self, message, **kwargs): # type: (Union[ServiceBusMessage, ServiceBusMessageBatch, List[ServiceBusMessage]], Any) -> None """Sends message and blocks until acknowledgement is received or operation times out. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index bc57c58af476..78b05235dd4c 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -218,7 +218,7 @@ def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiv receiver will automatically stop receiving. The default value is None, meaning no timeout. :keyword Optional[~azure.servicebus.aio.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on - receipt. If the receiver is a session receiver, it will apply to the session instead. + receipt. If the receiver is a session receiver, it will apply to the session instead. :keyword int prefetch_count: The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they're not @@ -324,7 +324,7 @@ def get_subscription_receiver(self, topic_name: str, subscription_name: str, **k receiver will automatically stop receiving. The default value is None, meaning no timeout. :keyword Optional[~azure.servicebus.aio.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on - receipt. If the receiver is a session receiver, it will apply to the session instead. + receipt. If the receiver is a session receiver, it will apply to the session instead. :keyword int prefetch_count: The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they're not diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index 5fb513d58d66..d09f2801e6dd 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -58,6 +58,9 @@ class ServiceBusReceiver(collections.abc.AsyncIterator, BaseHandler, ReceiverMix The two primary channels for message receipt are `receive()` to make a single request for messages, and `async for message in receiver:` to continuously receive incoming messages in an ongoing fashion. + **Please use the `get__receiver` method of ~azure.servicebus.aio.ServiceBusClient to create a + ServiceBusReceiver instance.** + :ivar fully_qualified_namespace: The fully qualified host name for the Service Bus namespace. The namespace format is: `.servicebus.windows.net`. :vartype fully_qualified_namespace: str @@ -92,9 +95,9 @@ class ServiceBusReceiver(collections.abc.AsyncIterator, BaseHandler, ReceiverMix keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). Additionally the following keys may also be present: `'username', 'password'`. :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. - :keyword Optional[AutoLockRenewer] auto_lock_renewer: An AutoLockRenewer can be provided such that messages are - automatically registered on receipt. If the receiver is a session receiver, it will apply to the session - instead. + :keyword Optional[~azure.servicebus.aio.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.aio.AutoLockRenewer + can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, + it will apply to the session instead. :keyword int prefetch_count: The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they're not @@ -102,16 +105,6 @@ class ServiceBusReceiver(collections.abc.AsyncIterator, BaseHandler, ReceiverMix The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch_count being 0, `ServiceBusReceiver.receive` would try to cache `max_message_count` (if provided) within its request to the service. - - .. admonition:: Example: - - .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py - :start-after: [START create_servicebus_receiver_async] - :end-before: [END create_servicebus_receiver_async] - :language: python - :dedent: 4 - :caption: Create a new instance of the ServiceBusReceiver. - """ def __init__( self, @@ -192,6 +185,70 @@ async def _iter_next(self): self._auto_lock_renewer.register(self, message) return message + @classmethod + def _from_connection_string( + cls, + conn_str: str, + **kwargs: Any + ) -> "ServiceBusReceiver": + """Create a ServiceBusReceiver from a connection string. + + :param str conn_str: The connection string of a Service Bus. + :keyword str queue_name: The path of specific Service Bus Queue the client connects to. + :keyword str topic_name: The path of specific Service Bus Topic which contains the Subscription + the client connects to. + :keyword str subscription_name: The path of specific Service Bus Subscription under the + specified Topic the client connects to. + :keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options + are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given + lock period before they will be removed from the queue. Messages received with ReceiveAndDelete + will be immediately removed from the queue, and cannot be subsequently abandoned or re-received + if the client fails to process the message. + The default mode is PeekLock. + :paramtype receive_mode: ~azure.servicebus.ReceiveMode + :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the + receiver will automatically stop receiving. The default value is None, meaning no timeout. + :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. + :keyword transport_type: The type of transport protocol that will be used for communicating with + the Service Bus service. Default is `TransportType.Amqp`. + :paramtype transport_type: ~azure.servicebus.TransportType + :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). + Additionally the following keys may also be present: `'username', 'password'`. + :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. + :keyword int prefetch_count: The maximum number of messages to cache with each request to the service. + This setting is only for advanced performance tuning. Increasing this value will improve message throughput + performance but increase the chance that messages will expire while they are cached if they're not + processed fast enough. + The default value is 0, meaning messages will be received from the service and processed one at a time. + In the case of prefetch_count being 0, `ServiceBusReceiver.receive` would try to cache `max_message_count` + (if provided) within its request to the service. + :rtype: ~azure.servicebus.aio.ServiceBusReceiver + + :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. + :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py + :start-after: [START create_servicebus_receiver_from_conn_str_async] + :end-before: [END create_servicebus_receiver_from_conn_str_async] + :language: python + :dedent: 4 + :caption: Create a new instance of the ServiceBusReceiver from connection string. + + """ + constructor_args = cls._convert_connection_string_to_kwargs( + conn_str, + **kwargs + ) + if kwargs.get("queue_name") and kwargs.get("subscription_name"): + raise ValueError("Queue entity does not have subscription.") + + if kwargs.get("topic_name") and not kwargs.get("subscription_name"): + raise ValueError("Subscription name is missing for the topic. Please specify subscription_name.") + return cls(**constructor_args) + def _create_handler(self, auth): self._handler = ReceiveClientAsync( self._get_source(), @@ -404,7 +461,7 @@ def get_streaming_message_iter( .. admonition:: Example: - .. literalinclude:: ../samples/async_samples/sample_code_servicebus.py + .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py :start-after: [START receive_forever_async] :end-before: [END receive_forever_async] :language: python @@ -415,70 +472,6 @@ def get_streaming_message_iter( raise ValueError("The max_wait_time must be greater than 0.") return self._IterContextualWrapper(self, max_wait_time) - @classmethod - def _from_connection_string( - cls, - conn_str: str, - **kwargs: Any - ) -> "ServiceBusReceiver": - """Create a ServiceBusReceiver from a connection string. - - :param str conn_str: The connection string of a Service Bus. - :keyword str queue_name: The path of specific Service Bus Queue the client connects to. - :keyword str topic_name: The path of specific Service Bus Topic which contains the Subscription - the client connects to. - :keyword str subscription_name: The path of specific Service Bus Subscription under the - specified Topic the client connects to. - :keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options - are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given - lock period before they will be removed from the queue. Messages received with ReceiveAndDelete - will be immediately removed from the queue, and cannot be subsequently abandoned or re-received - if the client fails to process the message. - The default mode is PeekLock. - :paramtype receive_mode: ~azure.servicebus.ReceiveMode - :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the - receiver will automatically stop receiving. The default value is None, meaning no timeout. - :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. - :keyword transport_type: The type of transport protocol that will be used for communicating with - the Service Bus service. Default is `TransportType.Amqp`. - :paramtype transport_type: ~azure.servicebus.TransportType - :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following - keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). - Additionally the following keys may also be present: `'username', 'password'`. - :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. - :keyword int prefetch_count: The maximum number of messages to cache with each request to the service. - This setting is only for advanced performance tuning. Increasing this value will improve message throughput - performance but increase the chance that messages will expire while they are cached if they're not - processed fast enough. - The default value is 0, meaning messages will be received from the service and processed one at a time. - In the case of prefetch_count being 0, `ServiceBusReceiver.receive` would try to cache `max_message_count` - (if provided) within its request to the service. - :rtype: ~azure.servicebus.aio.ServiceBusReceiver - - :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. - :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. - - .. admonition:: Example: - - .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py - :start-after: [START create_servicebus_receiver_from_conn_str_async] - :end-before: [END create_servicebus_receiver_from_conn_str_async] - :language: python - :dedent: 4 - :caption: Create a new instance of the ServiceBusReceiver from connection string. - - """ - constructor_args = cls._convert_connection_string_to_kwargs( - conn_str, - **kwargs - ) - if kwargs.get("queue_name") and kwargs.get("subscription_name"): - raise ValueError("Queue entity does not have subscription.") - - if kwargs.get("topic_name") and not kwargs.get("subscription_name"): - raise ValueError("Subscription name is missing for the topic. Please specify subscription_name.") - return cls(**constructor_args) - async def receive_messages( self, max_message_count: Optional[int] = 1, @@ -655,6 +648,16 @@ async def complete_message(self, message): :raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails. + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py + :start-after: [START complete_message_async] + :end-before: [END complete_message_async] + :language: python + :dedent: 4 + :caption: Complete a received message. + """ await self._settle_message_with_retry(message, MESSAGE_COMPLETE) @@ -669,6 +672,16 @@ async def abandon_message(self, message): :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails. + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py + :start-after: [START abandon_message_async] + :end-before: [END abandon_message_async] + :language: python + :dedent: 4 + :caption: Abandon a received message. + """ await self._settle_message_with_retry(message, MESSAGE_ABANDON) @@ -684,6 +697,16 @@ async def defer_message(self, message): :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails. + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py + :start-after: [START defer_message_async] + :end-before: [END defer_message_async] + :language: python + :dedent: 4 + :caption: Defer a received message. + """ await self._settle_message_with_retry(message, MESSAGE_DEFER) @@ -702,6 +725,16 @@ async def dead_letter_message(self, message, reason=None, error_description=None :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails. + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py + :start-after: [START dead_letter_message_async] + :end-before: [END dead_letter_message_async] + :language: python + :dedent: 4 + :caption: Dead letter a received message. + """ await self._settle_message_with_retry( message, @@ -733,6 +766,16 @@ async def renew_message_lock(self, message, **kwargs): :raises: TypeError if the message is sessionful. :raises: ~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired. :raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled. + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py + :start-after: [START renew_message_lock_async] + :end-before: [END renew_message_lock_async] + :language: python + :dedent: 4 + :caption: Renew the lock on a received message. + """ try: if self.session: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index d2a016c9b50b..2dda26515996 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -32,6 +32,9 @@ class ServiceBusSender(BaseHandler, SenderMixin): """The ServiceBusSender class defines a high level interface for sending messages to the Azure Service Bus Queue or Topic. + **Please use the `get__sender` method of ~azure.servicebus.aio.ServiceBusClient to create a + ServiceBusSender instance.** + :ivar fully_qualified_namespace: The fully qualified host name for the Service Bus namespace. The namespace format is: `.servicebus.windows.net`. :vartype fully_qualified_namespace: str @@ -56,16 +59,6 @@ class ServiceBusSender(BaseHandler, SenderMixin): keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). Additionally the following keys may also be present: `'username', 'password'`. :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. - - .. admonition:: Example: - - .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py - :start-after: [START create_servicebus_sender_async] - :end-before: [END create_servicebus_sender_async] - :language: python - :dedent: 4 - :caption: Create a new instance of the ServiceBusSender. - """ def __init__( self, @@ -98,6 +91,46 @@ def __init__( self._create_attribute() self._connection = kwargs.get("connection") + @classmethod + def _from_connection_string( + cls, + conn_str: str, + **kwargs: Any + ) -> "ServiceBusSender": + """Create a ServiceBusSender from a connection string. + + :param str conn_str: The connection string of a Service Bus. + :keyword str queue_name: The path of specific Service Bus Queue the client connects to. + :keyword str topic_name: The path of specific Service Bus Topic the client connects to. + :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. + :keyword transport_type: The type of transport protocol that will be used for communicating with + the Service Bus service. Default is `TransportType.Amqp`. + :paramtype transport_type: ~azure.servicebus.TransportType + :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). + Additionally the following keys may also be present: `'username', 'password'`. + :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. + :rtype: ~azure.servicebus.aio.ServiceBusSender + + :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. + :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py + :start-after: [START create_servicebus_sender_from_conn_str_async] + :end-before: [END create_servicebus_sender_from_conn_str_async] + :language: python + :dedent: 4 + :caption: Create a new instance of the ServiceBusSender from connection string. + + """ + constructor_args = cls._convert_connection_string_to_kwargs( + conn_str, + **kwargs + ) + return cls(**constructor_args) + def _create_handler(self, auth): self._handler = SendClientAsync( self._entity_uri, @@ -146,6 +179,7 @@ async def schedule_messages( ) -> List[int]: """Send Message or multiple Messages to be enqueued at a specific time by the service. Returns a list of the sequence numbers of the enqueued messages. + :param messages: The message or list of messages to schedule. :type messages: ~azure.servicebus.ServiceBusMessage or list[~azure.servicebus.ServiceBusMessage] :param schedule_time_utc: The utc date and time to enqueue the messages. @@ -216,46 +250,6 @@ async def cancel_scheduled_messages(self, sequence_numbers: Union[int, List[int] timeout=timeout ) - @classmethod - def _from_connection_string( - cls, - conn_str: str, - **kwargs: Any - ) -> "ServiceBusSender": - """Create a ServiceBusSender from a connection string. - - :param str conn_str: The connection string of a Service Bus. - :keyword str queue_name: The path of specific Service Bus Queue the client connects to. - :keyword str topic_name: The path of specific Service Bus Topic the client connects to. - :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. - :keyword transport_type: The type of transport protocol that will be used for communicating with - the Service Bus service. Default is `TransportType.Amqp`. - :paramtype transport_type: ~azure.servicebus.TransportType - :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following - keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). - Additionally the following keys may also be present: `'username', 'password'`. - :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. - :rtype: ~azure.servicebus.aio.ServiceBusSender - - :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. - :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. - - .. admonition:: Example: - - .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py - :start-after: [START create_servicebus_sender_from_conn_str_async] - :end-before: [END create_servicebus_sender_from_conn_str_async] - :language: python - :dedent: 4 - :caption: Create a new instance of the ServiceBusSender from connection string. - - """ - constructor_args = cls._convert_connection_string_to_kwargs( - conn_str, - **kwargs - ) - return cls(**constructor_args) - async def send_messages( self, message: Union[ServiceBusMessage, ServiceBusMessageBatch, List[ServiceBusMessage]], diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/management/_management_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/management/_management_client_async.py index bb7b841234e6..ceaf570ee81b 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/management/_management_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/management/_management_client_async.py @@ -848,8 +848,9 @@ async def update_rule( :param str topic_name: The topic that owns the subscription. :param str subscription_name: The subscription that owns this rule. - :param ~azure.servicebus.management.RuleProperties rule: The rule that is returned from `get_rule`, - `create_rule`, or `list_rules` and has the updated properties. + :param rule: The rule that is returned from `get_rule`, + `create_rule`, or `list_rules` and has the updated properties. + :type rule: ~azure.servicebus.management.RuleProperties :rtype: None """ _validate_topic_and_subscription_types(topic_name, subscription_name) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/management/_management_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/management/_management_client.py index 18d2ae0268d3..2e88cc74f9fd 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/management/_management_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/management/_management_client.py @@ -810,7 +810,6 @@ def create_rule(self, topic_name, subscription_name, rule_name, **kwargs): ~azure.servicebus.management.SqlRuleFilter] :keyword action: The action of the rule. :type action: Optional[~azure.servicebus.management.SqlRuleAction] - :rtype: ~azure.servicebus.management.RuleProperties """ _validate_topic_and_subscription_types(topic_name, subscription_name) @@ -851,8 +850,10 @@ def update_rule(self, topic_name, subscription_name, rule, **kwargs): :param str topic_name: The topic that owns the subscription. :param str subscription_name: The subscription that owns this rule. - :param ~azure.servicebus.management.RuleProperties rule: The rule that is returned from `get_rule`, - `create_rule`, or `list_rules` and has the updated properties. + :param rule: The rule that is returned from `get_rule`, + `create_rule`, or `list_rules` and has the updated properties. + :type rule: ~azure.servicebus.management.RuleProperties + :rtype: None """ _validate_topic_and_subscription_types(topic_name, subscription_name) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/management/_models.py b/sdk/servicebus/azure-servicebus/azure/servicebus/management/_models.py index 4a3e9a379170..cf5cfd3cf4d0 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/management/_models.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/management/_models.py @@ -271,7 +271,6 @@ def __init__( validate_extraction_missing_args(extraction_missing_args) - @classmethod def _from_internal_entity(cls, name, internal_qd): # type: (str, InternalQueueDescription) -> QueueProperties @@ -334,7 +333,7 @@ class QueueRuntimeProperties(object): def __init__( self, ): - # type: () -> None + # type: () -> None self._name = None # type: Optional[str] self._internal_qr = None # type: Optional[InternalQueueDescription] diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py index 8677590e356f..a931a44d196c 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py @@ -7,9 +7,12 @@ Examples to show basic async use case of python azure-servicebus SDK, including: - Create ServiceBusClient - Create ServiceBusSender/ServiceBusReceiver - - Send single message - - Receive and settle messages + - Send single message and batch messages + - Peek, receive and settle messages + - Receive and settle dead-lettered messages - Receive and settle deferred messages + - Schedule and cancel scheduled messages + - Session related operations """ import os import datetime @@ -35,16 +38,12 @@ def example_create_servicebus_client_async(): # [START create_sb_client_async] import os - from azure.servicebus.aio import ServiceBusClient, ServiceBusSharedKeyCredential + from azure.identity.aio import DefaultAzureCredential + from azure.servicebus.aio import ServiceBusClient fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] - shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] - shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] servicebus_client = ServiceBusClient( fully_qualified_namespace=fully_qualified_namespace, - credential=ServiceBusSharedKeyCredential( - shared_access_policy, - shared_access_key - ) + credential=DefaultAzureCredential() ) # [END create_sb_client_async] return servicebus_client @@ -63,23 +62,6 @@ async def example_create_servicebus_sender_async(): ) # [END create_servicebus_sender_from_conn_str_async] - # [START create_servicebus_sender_async] - import os - from azure.servicebus.aio import ServiceBusSender, ServiceBusSharedKeyCredential - fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] - shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] - shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] - queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] - queue_sender = ServiceBusSender( - fully_qualified_namespace=fully_qualified_namespace, - credential=ServiceBusSharedKeyCredential( - shared_access_policy, - shared_access_key - ), - queue_name=queue_name - ) - # [END create_servicebus_sender_async] - # [START create_servicebus_sender_from_sb_client_async] import os from azure.servicebus.aio import ServiceBusClient @@ -92,7 +74,7 @@ async def example_create_servicebus_sender_async(): # [START create_topic_sender_from_sb_client_async] import os - from azure.servicebus import ServiceBusClient + from azure.servicebus.aio import ServiceBusClient servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] topic_name = os.environ['SERVICE_BUS_TOPIC_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) @@ -117,31 +99,15 @@ async def example_create_servicebus_receiver_async(): ) # [END create_servicebus_receiver_from_conn_str_async] - # [START create_servicebus_receiver_async] - import os - from azure.servicebus.aio import ServiceBusReceiver, ServiceBusSharedKeyCredential - fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] - shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] - shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] - queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] - queue_receiver = ServiceBusReceiver( - fully_qualified_namespace=fully_qualified_namespace, - credential=ServiceBusSharedKeyCredential( - shared_access_policy, - shared_access_key - ), - queue_name=queue_name - ) - # [END create_servicebus_receiver_async] - # [START create_queue_deadletter_receiver_from_sb_client_async] import os + from azure.servicebus import SubQueue from azure.servicebus.aio import ServiceBusClient servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) async with servicebus_client: - queue_receiver = servicebus_client.get_queue_deadletter_receiver(queue_name=queue_name) + queue_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name, sub_queue=SubQueue.DeadLetter) # [END create_queue_deadletter_receiver_from_sb_client_async] # [START create_servicebus_receiver_from_sb_client_async] @@ -156,21 +122,23 @@ async def example_create_servicebus_receiver_async(): # [START create_subscription_deadletter_receiver_from_sb_client_async] import os - from azure.servicebus import ServiceBusClient + from azure.servicebus import SubQueue + from azure.servicebus.aio import ServiceBusClient servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] topic_name = os.environ["SERVICE_BUS_TOPIC_NAME"] subscription_name = os.environ["SERVICE_BUS_SUBSCRIPTION_NAME"] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) async with servicebus_client: - subscription_receiver = servicebus_client.get_subscription_deadletter_receiver( + subscription_receiver = servicebus_client.get_subscription_receiver( topic_name=topic_name, subscription_name=subscription_name, + sub_queue=SubQueue.DeadLetter ) # [END create_subscription_deadletter_receiver_from_sb_client_async] # [START create_subscription_receiver_from_sb_client_async] import os - from azure.servicebus import ServiceBusClient + from azure.servicebus.aio import ServiceBusClient servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] topic_name = os.environ["SERVICE_BUS_TOPIC_NAME"] subscription_name = os.environ["SERVICE_BUS_SUBSCRIPTION_NAME"] @@ -181,7 +149,6 @@ async def example_create_servicebus_receiver_async(): subscription_name=subscription_name, ) # [END create_subscription_receiver_from_sb_client_async] - return queue_receiver @@ -195,6 +162,7 @@ async def example_send_and_receive_async(): message = ServiceBusMessage("Hello World") await servicebus_sender.send_messages(message) # [END send_async] + await servicebus_sender.send_messages([ServiceBusMessage("Hello World")] * 5) # [START create_batch_async] async with servicebus_sender: @@ -223,6 +191,37 @@ async def example_send_and_receive_async(): print(str(message)) await servicebus_receiver.complete_message(message) # [END receive_forever_async] + break + + # [START abandon_message_async] + messages = await servicebus_receiver.receive_messages(max_wait_time=5) + for message in messages: + await servicebus_receiver.abandon_message(message) + # [END abandon_message_async] + + # [START complete_message_async] + messages = await servicebus_receiver.receive_messages(max_wait_time=5) + for message in messages: + await servicebus_receiver.complete_message(message) + # [END complete_message_async] + + # [START defer_message_async] + messages = await servicebus_receiver.receive_messages(max_wait_time=5) + for message in messages: + await servicebus_receiver.defer_message(message) + # [END defer_message_async] + + # [START dead_letter_message_async] + messages = await servicebus_receiver.receive_messages(max_wait_time=5) + for message in messages: + await servicebus_receiver.dead_letter_message(message) + # [END dead_letter_message_async] + + # [START renew_message_lock_async] + messages = await servicebus_receiver.receive_messages(max_wait_time=5) + for message in messages: + await servicebus_receiver.renew_message_lock(message) + # [END renew_message_lock_async] # [START auto_lock_renew_message_async] from azure.servicebus.aio import AutoLockRenewer @@ -234,6 +233,8 @@ async def example_send_and_receive_async(): await process_message(message) await servicebus_receiver.complete_message(message) # [END auto_lock_renew_message_async] + break + await lock_renewal.close() async def example_receive_deferred_async(): @@ -254,17 +255,46 @@ async def example_receive_deferred_async(): sequence_numbers=deferred_sequenced_numbers ) - for msg in received_deferred_msg: + for message in received_deferred_msg: await servicebus_receiver.complete_message(message) # [END receive_defer_async] -async def example_session_ops_async(): +async def example_receive_deadletter_async(): + from azure.servicebus import SubQueue servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] + + async with ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) as servicebus_client: + async with servicebus_client.get_queue_sender(queue_name) as servicebus_sender: + await servicebus_sender.send_messages(ServiceBusMessage("Hello World")) + # [START receive_deadletter_async] + async with servicebus_client.get_queue_receiver(queue_name) as servicebus_receiver: + messages = await servicebus_receiver.receive_messages(max_wait_time=5) + for message in messages: + await servicebus_receiver.dead_letter_message( + message, + reason='reason for dead lettering', + error_description='description for dead lettering' + ) + + async with servicebus_client.get_queue_receiver(queue_name, sub_queue=SubQueue.DeadLetter) as servicebus_deadletter_receiver: + messages = await servicebus_deadletter_receiver.receive_messages(max_wait_time=5) + for message in messages: + await servicebus_deadletter_receiver.complete_message(message) + # [END receive_deadletter_async] + + +async def example_session_ops_async(): + servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] + queue_name = os.environ['SERVICE_BUS_SESSION_QUEUE_NAME'] session_id = "" async with ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) as servicebus_client: + + async with servicebus_client.get_queue_sender(queue_name=queue_name) as sender: + await sender.send_messages(ServiceBusMessage('msg', session_id=session_id)) + # [START get_session_async] async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session @@ -279,13 +309,13 @@ async def example_session_ops_async(): # [START set_session_state_async] async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session - session_state = await session.set_state("START") + await session.set_state("START") # [END set_session_state_async] # [START session_renew_lock_async] async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session - session_state = await session.renew_lock() + await session.renew_lock() # [END session_renew_lock_async] # [START auto_lock_renew_session_async] @@ -323,4 +353,5 @@ async def example_schedule_ops_async(): loop.run_until_complete(example_send_and_receive_async()) loop.run_until_complete(example_receive_deferred_async()) loop.run_until_complete(example_schedule_ops_async()) - # loop.run_until_complete(example_session_ops_async()) + loop.run_until_complete(example_receive_deadletter_async()) + loop.run_until_complete(example_session_ops_async()) diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py index 64d8835921d1..c8474dcc40b7 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py @@ -7,9 +7,12 @@ Examples to show basic use case of python azure-servicebus SDK, including: - Create ServiceBusClient - Create ServiceBusSender/ServiceBusReceiver - - Send single message - - Receive and settle messages + - Send single message and batch messages + - Peek, receive and settle messages + - Receive and settle dead-lettered messages - Receive and settle deferred messages + - Schedule and cancel scheduled messages + - Session related operations """ import os @@ -31,16 +34,12 @@ def example_create_servicebus_client_sync(): # [START create_sb_client_sync] import os - from azure.servicebus import ServiceBusClient, ServiceBusSharedKeyCredential + from azure.identity import DefaultAzureCredential + from azure.servicebus import ServiceBusClient fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] - shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] - shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] servicebus_client = ServiceBusClient( fully_qualified_namespace=fully_qualified_namespace, - credential=ServiceBusSharedKeyCredential( - shared_access_policy, - shared_access_key - ) + credential=DefaultAzureCredential() ) # [END create_sb_client_sync] return servicebus_client @@ -59,23 +58,6 @@ def example_create_servicebus_sender_sync(): ) # [END create_servicebus_sender_from_conn_str_sync] - # [START create_servicebus_sender_sync] - import os - from azure.servicebus import ServiceBusSender, ServiceBusSharedKeyCredential - fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] - shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] - shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] - queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] - queue_sender = ServiceBusSender( - fully_qualified_namespace=fully_qualified_namespace, - credential=ServiceBusSharedKeyCredential( - shared_access_policy, - shared_access_key - ), - queue_name=queue_name - ) - # [END create_servicebus_sender_sync] - # [START create_servicebus_sender_from_sb_client_sync] import os from azure.servicebus import ServiceBusClient @@ -113,31 +95,14 @@ def example_create_servicebus_receiver_sync(): ) # [END create_servicebus_receiver_from_conn_str_sync] - # [START create_servicebus_receiver_sync] - import os - from azure.servicebus import ServiceBusReceiver, ServiceBusSharedKeyCredential - fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] - shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] - shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] - queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] - queue_receiver = ServiceBusReceiver( - fully_qualified_namespace=fully_qualified_namespace, - credential=ServiceBusSharedKeyCredential( - shared_access_policy, - shared_access_key - ), - queue_name=queue_name - ) - # [END create_servicebus_receiver_sync] - # [START create_queue_deadletter_receiver_from_sb_client_sync] import os - from azure.servicebus import ServiceBusClient + from azure.servicebus import ServiceBusClient, SubQueue servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) with servicebus_client: - queue_receiver = servicebus_client.get_queue_deadletter_receiver(queue_name=queue_name) + queue_dlq_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name, sub_queue=SubQueue.DeadLetter) # [END create_queue_deadletter_receiver_from_sb_client_sync] # [START create_servicebus_receiver_from_sb_client_sync] @@ -166,15 +131,16 @@ def example_create_servicebus_receiver_sync(): # [START create_subscription_deadletter_receiver_from_sb_client_sync] import os - from azure.servicebus import ServiceBusClient + from azure.servicebus import ServiceBusClient, SubQueue servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] topic_name = os.environ["SERVICE_BUS_TOPIC_NAME"] subscription_name = os.environ["SERVICE_BUS_SUBSCRIPTION_NAME"] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) with servicebus_client: - subscription_receiver = servicebus_client.get_subscription_deadletter_receiver( + subscription_dlq_receiver = servicebus_client.get_subscription_receiver( topic_name=topic_name, subscription_name=subscription_name, + sub_queue=SubQueue.DeadLetter ) # [END create_subscription_deadletter_receiver_from_sb_client_sync] @@ -191,6 +157,7 @@ def example_send_and_receive_sync(): message = ServiceBusMessage("Hello World") servicebus_sender.send_messages(message) # [END send_sync] + servicebus_sender.send_messages([ServiceBusMessage("Hello World")] * 5) # [START create_batch_sync] with servicebus_sender: @@ -252,11 +219,35 @@ def example_send_and_receive_sync(): print("Enqueued time: {}".format(message.enqueued_time_utc)) # [END receive_complex_message] - # [START abandon_message] + # [START abandon_message_sync] messages = servicebus_receiver.receive_messages(max_wait_time=5) for message in messages: servicebus_receiver.abandon_message(message) - # [END abandon_message] + # [END abandon_message_sync] + + # [START complete_message_sync] + messages = servicebus_receiver.receive_messages(max_wait_time=5) + for message in messages: + servicebus_receiver.complete_message(message) + # [END complete_message_sync] + + # [START defer_message_sync] + messages = servicebus_receiver.receive_messages(max_wait_time=5) + for message in messages: + servicebus_receiver.defer_message(message) + # [END defer_message_sync] + + # [START dead_letter_message_sync] + messages = servicebus_receiver.receive_messages(max_wait_time=5) + for message in messages: + servicebus_receiver.dead_letter_message(message) + # [END dead_letter_message_sync] + + # [START renew_message_lock_sync] + messages = servicebus_receiver.receive_messages(max_wait_time=5) + for message in messages: + servicebus_receiver.renew_message_lock(message) + # [END renew_message_lock_sync] # [START receive_forever] with servicebus_receiver: @@ -291,6 +282,7 @@ def example_receive_deferred_sync(): def example_receive_deadletter_sync(): + from azure.servicebus import SubQueue servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] @@ -301,21 +293,29 @@ def example_receive_deadletter_sync(): with servicebus_client.get_queue_receiver(queue_name) as servicebus_receiver: messages = servicebus_receiver.receive_messages(max_wait_time=5) for message in messages: - servicebus_receiver.dead_letter_message(message, reason='reason for dead lettering', description='description for dead lettering') + servicebus_receiver.dead_letter_message( + message, + reason='reason for dead lettering', + error_description='description for dead lettering' + ) - with servicebus_client.get_queue_deadletter_receiver(queue_name) as servicebus_deadletter_receiver: + with servicebus_client.get_queue_receiver(queue_name, sub_queue=SubQueue.DeadLetter) as servicebus_deadletter_receiver: messages = servicebus_deadletter_receiver.receive_messages(max_wait_time=5) for message in messages: - servicebus_receiver.complete_message(message) + servicebus_deadletter_receiver.complete_message(message) # [END receive_deadletter_sync] def example_session_ops_sync(): servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] - queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] + queue_name = os.environ['SERVICE_BUS_SESSION_QUEUE_NAME'] session_id = "" with ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) as servicebus_client: + + with servicebus_client.get_queue_sender(queue_name=queue_name) as sender: + sender.send_messages(ServiceBusMessage('msg', session_id=session_id)) + # [START get_session_sync] with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session @@ -330,13 +330,13 @@ def example_session_ops_sync(): # [START set_session_state_sync] with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session - session_state = session.set_state("START") + session.set_state("START") # [END set_session_state_sync] # [START session_renew_lock_sync] with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session - session_state = session.renew_lock() + session.renew_lock() # [END session_renew_lock_sync] # [START auto_lock_renew_session_sync] @@ -372,4 +372,5 @@ def example_schedule_ops_sync(): example_send_and_receive_sync() example_receive_deferred_sync() example_schedule_ops_sync() -# example_session_ops_sync() +example_receive_deadletter_sync() +example_session_ops_sync()