From 60a7f629d56a8380806df322d1d5a3fa1559b718 Mon Sep 17 00:00:00 2001 From: swathipil <76007337+swathipil@users.noreply.github.com> Date: Fri, 7 Jan 2022 16:10:02 -0800 Subject: [PATCH] [ServiceBus] expand kwargs in public API (#22353) * inital expand kwargs w/o mgmt models * fix mypy/pylint * mypy * adam + anna comments * lint * lint bad whitespace * remove retry * adams comments * lint --- .../azure/servicebus/_common/message.py | 48 ++-- .../azure/servicebus/_servicebus_client.py | 111 ++++++-- .../azure/servicebus/_servicebus_receiver.py | 107 ++++++-- .../azure/servicebus/_servicebus_sender.py | 60 +++-- .../azure/servicebus/_servicebus_session.py | 14 +- .../aio/_servicebus_client_async.py | 95 ++++++- .../aio/_servicebus_receiver_async.py | 71 ++++-- .../aio/_servicebus_sender_async.py | 17 +- .../aio/_servicebus_session_async.py | 11 +- .../management/_management_client_async.py | 222 ++++++++++------ .../azure/servicebus/amqp/_amqp_message.py | 96 ++++--- .../management/_management_client.py | 239 +++++++++++------- .../azure/servicebus/management/_utils.py | 4 +- 13 files changed, 761 insertions(+), 334 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index eb4644c48ce7..73f47c5e2edc 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -9,7 +9,7 @@ import datetime import uuid import logging -from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Any, Mapping, cast +from typing import Optional, Dict, List, Union, Iterable, TYPE_CHECKING, Any, Mapping, cast import six @@ -92,8 +92,24 @@ class ServiceBusMessage( """ - def __init__(self, body, **kwargs): - # type: (Optional[Union[str, bytes]], Any) -> None + def __init__( + self, + body: Optional[Union[str, bytes]], + *, + application_properties: Optional[Dict[str, Any]] = None, + session_id: Optional[str] = None, + message_id: Optional[str] = None, + scheduled_enqueue_time_utc: Optional[datetime.datetime] = None, + time_to_live: Optional[datetime.timedelta] = None, + content_type: Optional[str] = None, + correlation_id: Optional[str] = None, + subject: Optional[str] = None, + partition_key: Optional[str] = None, + to: Optional[str] = None, + reply_to: Optional[str] = None, + reply_to_session_id: Optional[str] = None, + **kwargs: Any + ) -> None: # Although we might normally thread through **kwargs this causes # problems as MessageProperties won't absorb spurious args. self._encoding = kwargs.pop("encoding", "UTF-8") @@ -108,20 +124,18 @@ def __init__(self, body, **kwargs): self._raw_amqp_message = AmqpAnnotatedMessage(message=self.message) else: self._build_message(body) - self.application_properties = kwargs.pop("application_properties", None) - self.session_id = kwargs.pop("session_id", None) - self.message_id = kwargs.pop("message_id", None) - self.content_type = kwargs.pop("content_type", None) - self.correlation_id = kwargs.pop("correlation_id", None) - self.to = kwargs.pop("to", None) - self.reply_to = kwargs.pop("reply_to", None) - self.reply_to_session_id = kwargs.pop("reply_to_session_id", None) - self.subject = kwargs.pop("subject", None) - self.scheduled_enqueue_time_utc = kwargs.pop( - "scheduled_enqueue_time_utc", None - ) - self.time_to_live = kwargs.pop("time_to_live", None) - self.partition_key = kwargs.pop("partition_key", None) + self.application_properties = application_properties + self.session_id = session_id + self.message_id = message_id + self.content_type = content_type + self.correlation_id = correlation_id + self.to = to + self.reply_to = reply_to + self.reply_to_session_id = reply_to_session_id + self.subject = subject + self.scheduled_enqueue_time_utc = scheduled_enqueue_time_utc + self.time_to_live = time_to_live + self.partition_key = partition_key def __str__(self): # type: () -> str diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index df3ab6aff152..8edb65699fff 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -2,7 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -from typing import Any, Union, TYPE_CHECKING +from typing import Any, Union, Optional, TYPE_CHECKING import logging from weakref import WeakSet @@ -15,16 +15,25 @@ ) from ._servicebus_sender import ServiceBusSender from ._servicebus_receiver import ServiceBusReceiver +from ._common.auto_lock_renewer import AutoLockRenewer from ._common._configuration import Configuration from ._common.utils import ( create_authentication, generate_dead_letter_entity_name, strip_protocol_from_uri, ) -from ._common.constants import ServiceBusSubQueue +from ._common.constants import ( + ServiceBusSubQueue, + ServiceBusReceiveMode, +) if TYPE_CHECKING: - from azure.core.credentials import TokenCredential, AzureSasCredential, AzureNamedKeyCredential + from azure.core.credentials import ( + TokenCredential, + AzureSasCredential, + AzureNamedKeyCredential, + ) + _LOGGER = logging.getLogger(__name__) @@ -75,15 +84,32 @@ class ServiceBusClient(object): """ - def __init__(self, fully_qualified_namespace, credential, **kwargs): - # type: (str, Union[TokenCredential, AzureSasCredential, AzureNamedKeyCredential], Any) -> None + def __init__( + self, + fully_qualified_namespace: str, + credential: Union[ + "TokenCredential", "AzureSasCredential", "AzureNamedKeyCredential" + ], + *, + retry_total: int = 3, + retry_backoff_factor: float = 0.8, + retry_backoff_max: int = 120, + retry_mode: str = "exponential", + **kwargs: Any + ) -> None: # If the user provided http:// or sb://, let's be polite and strip that. self.fully_qualified_namespace = strip_protocol_from_uri( fully_qualified_namespace.strip() ) self._credential = credential - self._config = Configuration(**kwargs) + self._config = Configuration( + retry_total=retry_total, + retry_backoff_factor=retry_backoff_factor, + retry_backoff_max=retry_backoff_max, + retry_mode=retry_mode, + **kwargs + ) self._connection = None # Optional entity name, can be the name of Queue or Topic. Intentionally not advertised, typically be needed. self._entity_name = kwargs.get("entity_name") @@ -134,8 +160,16 @@ def close(self): self._connection.destroy() @classmethod - def from_connection_string(cls, conn_str, **kwargs): - # type: (str, Any) -> ServiceBusClient + def from_connection_string( + cls, + conn_str: str, + *, + retry_total: int = 3, + retry_backoff_factor: float = 0.8, + retry_backoff_max: int = 120, + retry_mode: str = "exponential", + **kwargs: Any + ) -> "ServiceBusClient": """ Create a ServiceBusClient from a connection string. @@ -181,6 +215,10 @@ def from_connection_string(cls, conn_str, **kwargs): fully_qualified_namespace=host, entity_name=entity_in_conn_str or kwargs.pop("entity_name", None), credential=credential, # type: ignore + retry_total=retry_total, + retry_backoff_factor=retry_backoff_factor, + retry_backoff_max=retry_backoff_max, + retry_mode=retry_mode, **kwargs ) @@ -227,8 +265,20 @@ def get_queue_sender(self, queue_name, **kwargs): self._handlers.add(handler) return handler - def get_queue_receiver(self, queue_name, **kwargs): - # type: (str, Any) -> ServiceBusReceiver + def get_queue_receiver( + self, + queue_name: str, + *, + session_id: Optional[str] = None, + sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None, + receive_mode: Union[ + ServiceBusReceiveMode, str + ] = ServiceBusReceiveMode.PEEK_LOCK, + max_wait_time: Optional[float] = None, + auto_lock_renewer: Optional[AutoLockRenewer] = None, + prefetch_count: int = 0, + **kwargs: Any + ) -> ServiceBusReceiver: """Get ServiceBusReceiver for the specific queue. :param str queue_name: The path of specific Service Bus Queue the client connects to. @@ -280,8 +330,7 @@ def get_queue_receiver(self, queue_name, **kwargs): "the connection string used to construct the ServiceBusClient." ) - sub_queue = kwargs.get("sub_queue", None) - if sub_queue and kwargs.get("session_id"): + if sub_queue and session_id: raise ValueError( "session_id and sub_queue can not be specified simultaneously. " "To connect to the sub queue of a sessionful queue, " @@ -314,6 +363,12 @@ def get_queue_receiver(self, queue_name, **kwargs): retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, + session_id=session_id, + sub_queue=sub_queue, + receive_mode=receive_mode, + max_wait_time=max_wait_time, + auto_lock_renewer=auto_lock_renewer, + prefetch_count=prefetch_count, **kwargs ) self._handlers.add(handler) @@ -361,8 +416,21 @@ def get_topic_sender(self, topic_name, **kwargs): self._handlers.add(handler) return handler - def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): - # type: (str, str, Any) -> ServiceBusReceiver + def get_subscription_receiver( + self, + topic_name: str, + subscription_name: str, + *, + session_id: Optional[str] = None, + sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None, + receive_mode: Union[ + ServiceBusReceiveMode, str + ] = ServiceBusReceiveMode.PEEK_LOCK, + max_wait_time: Optional[float] = None, + auto_lock_renewer: Optional[AutoLockRenewer] = None, + prefetch_count: int = 0, + **kwargs: Any + ) -> ServiceBusReceiver: """Get ServiceBusReceiver for the specific subscription under the topic. :param str topic_name: The name of specific Service Bus Topic the client connects to. @@ -417,8 +485,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): "the connection string used to construct the ServiceBusClient." ) - sub_queue = kwargs.get("sub_queue", None) - if sub_queue and kwargs.get("session_id"): + if sub_queue and session_id: raise ValueError( "session_id and sub_queue can not be specified simultaneously. " "To connect to the sub queue of a sessionful subscription, " @@ -446,6 +513,12 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, + session_id=session_id, + sub_queue=sub_queue, + receive_mode=receive_mode, + max_wait_time=max_wait_time, + auto_lock_renewer=auto_lock_renewer, + prefetch_count=prefetch_count, **kwargs ) except ValueError: @@ -467,6 +540,12 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, + session_id=session_id, + sub_queue=sub_queue, + receive_mode=receive_mode, + max_wait_time=max_wait_time, + auto_lock_renewer=auto_lock_renewer, + prefetch_count=prefetch_count, **kwargs ) self._handlers.add(handler) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index ead8b2d95949..1b4c0157b769 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -6,7 +6,8 @@ import logging import functools import uuid -from typing import Any, List, TYPE_CHECKING, Optional, Dict, Iterator, Union +import datetime +from typing import Any, List, Optional, Dict, Iterator, Union, TYPE_CHECKING import six @@ -52,10 +53,13 @@ from ._common.utils import utc_from_timestamp from ._servicebus_session import ServiceBusSession - if TYPE_CHECKING: - import datetime - from azure.core.credentials import TokenCredential, AzureSasCredential, AzureNamedKeyCredential + from ._common.auto_lock_renewer import AutoLockRenewer + from azure.core.credentials import ( + TokenCredential, + AzureSasCredential, + AzureNamedKeyCredential, + ) _LOGGER = logging.getLogger(__name__) @@ -120,19 +124,37 @@ class ServiceBusReceiver( (if provided) within its request to the service. """ - def __init__(self, fully_qualified_namespace, credential, **kwargs): - # type: (str, Union[TokenCredential, AzureSasCredential, AzureNamedKeyCredential], Any) -> None + def __init__( + self, + fully_qualified_namespace: str, + credential: Union["TokenCredential", "AzureSasCredential", "AzureNamedKeyCredential"], + *, + queue_name: Optional[str] = None, + topic_name: Optional[str] = None, + subscription_name: Optional[str] = None, + receive_mode: Union[ + ServiceBusReceiveMode, str + ] = ServiceBusReceiveMode.PEEK_LOCK, + max_wait_time: Optional[float] = None, + auto_lock_renewer: Optional["AutoLockRenewer"] = None, + prefetch_count: int = 0, + **kwargs: Any + ) -> None: self._message_iter = None # type: Optional[Iterator[ServiceBusReceivedMessage]] if kwargs.get("entity_name"): super(ServiceBusReceiver, self).__init__( fully_qualified_namespace=fully_qualified_namespace, credential=credential, + queue_name=queue_name, + topic_name=topic_name, + subscription_name=subscription_name, + receive_mode=receive_mode, + max_wait_time=max_wait_time, + auto_lock_renewer=auto_lock_renewer, + prefetch_count=prefetch_count, **kwargs ) else: - queue_name = kwargs.get("queue_name") # type: Optional[str] - topic_name = kwargs.get("topic_name") # type: Optional[str] - subscription_name = kwargs.get("subscription_name") if queue_name and topic_name: raise ValueError( "Queue/Topic name can not be specified simultaneously." @@ -151,10 +173,26 @@ def __init__(self, fully_qualified_namespace, credential, **kwargs): fully_qualified_namespace=fully_qualified_namespace, credential=credential, entity_name=entity_name, + queue_name=queue_name, + topic_name=topic_name, + subscription_name=subscription_name, + receive_mode=receive_mode, + max_wait_time=max_wait_time, + auto_lock_renewer=auto_lock_renewer, + prefetch_count=prefetch_count, **kwargs ) - self._populate_attributes(**kwargs) + self._populate_attributes( + queue_name=queue_name, + topic_name=topic_name, + subscription_name=subscription_name, + receive_mode=receive_mode, + max_wait_time=max_wait_time, + auto_lock_renewer=auto_lock_renewer, + prefetch_count=prefetch_count, + **kwargs + ) self._session = ( None if self._session_id is None else ServiceBusSession(self._session_id, self) ) @@ -558,8 +596,11 @@ def _get_streaming_message_iter(self, max_wait_time=None): raise ValueError("The max_wait_time must be greater than 0.") return self._iter_contextual_wrapper(max_wait_time) - def receive_messages(self, max_message_count=1, max_wait_time=None): - # type: (Optional[int], Optional[float]) -> List[ServiceBusReceivedMessage] + def receive_messages( + self, + max_message_count: Optional[int] = 1, + max_wait_time: Optional[float] = None, + ) -> List[ServiceBusReceivedMessage]: """Receive a batch of messages at once. This approach is optimal if you wish to process multiple messages simultaneously, or @@ -615,8 +656,12 @@ def receive_messages(self, max_message_count=1, max_wait_time=None): self._auto_lock_renewer.register(self, message) return messages - def receive_deferred_messages(self, sequence_numbers, **kwargs): - # type: (Union[int,List[int]], Any) -> List[ServiceBusReceivedMessage] + def receive_deferred_messages( + self, + sequence_numbers: Union[int, List[int]], + *, + timeout: Optional[float] = None + ) -> List[ServiceBusReceivedMessage]: """Receive messages that have previously been deferred. When receiving deferred messages from a partitioned entity, all of the supplied @@ -639,7 +684,6 @@ def receive_deferred_messages(self, sequence_numbers, **kwargs): """ self._check_live() - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") if isinstance(sequence_numbers, six.integer_types): @@ -685,8 +729,13 @@ def receive_deferred_messages(self, sequence_numbers, **kwargs): self._auto_lock_renewer.register(self, message) return messages - def peek_messages(self, max_message_count=1, **kwargs): - # type: (int, Any) -> List[ServiceBusReceivedMessage] + def peek_messages( + self, + max_message_count: int = 1, + *, + sequence_number: int = 0, + timeout: Optional[float] = None + ) -> List[ServiceBusReceivedMessage]: """Browse messages currently pending in the queue. Peeked messages are not removed from queue, nor are they locked. They cannot be completed, @@ -711,8 +760,6 @@ def peek_messages(self, max_message_count=1, **kwargs): """ self._check_live() - sequence_number = kwargs.pop("sequence_number", 0) - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") if not sequence_number: @@ -737,7 +784,7 @@ def peek_messages(self, max_message_count=1, **kwargs): ): return messages - def complete_message(self, message): + def complete_message(self, message: ServiceBusReceivedMessage) -> None: """Complete the message. This removes the message from the queue. @@ -761,7 +808,7 @@ def complete_message(self, message): """ self._settle_message_with_retry(message, MESSAGE_COMPLETE) - def abandon_message(self, message): + def abandon_message(self, message: ServiceBusReceivedMessage) -> None: """Abandon the message. This message will be returned to the queue and made available to be received again. @@ -785,7 +832,7 @@ def abandon_message(self, message): """ self._settle_message_with_retry(message, MESSAGE_ABANDON) - def defer_message(self, message): + def defer_message(self, message: ServiceBusReceivedMessage) -> None: """Defers the message. This message will remain in the queue but must be requested @@ -810,7 +857,12 @@ def defer_message(self, message): """ self._settle_message_with_retry(message, MESSAGE_DEFER) - def dead_letter_message(self, message, reason=None, error_description=None): + def dead_letter_message( + self, + message: ServiceBusReceivedMessage, + reason: Optional[str] = None, + error_description: Optional[str] = None + ) -> None: """Move the message to the Dead Letter queue. The Dead Letter queue is a sub-queue that can be @@ -843,8 +895,12 @@ def dead_letter_message(self, message, reason=None, error_description=None): dead_letter_error_description=error_description, ) - def renew_message_lock(self, message, **kwargs): - # type: (ServiceBusReceivedMessage, Any) -> datetime.datetime + def renew_message_lock( + self, + message: ServiceBusReceivedMessage, + *, + timeout: Optional[float] = None + ) -> datetime.datetime: # pylint: disable=protected-access,no-member """Renew the message lock. @@ -892,7 +948,6 @@ def renew_message_lock(self, message, **kwargs): if not token: raise ValueError("Unable to renew lock - no lock token found.") - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 110103814a54..45f069e85d7a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -5,6 +5,7 @@ import logging import time import uuid +import datetime from typing import Any, TYPE_CHECKING, Union, List, Optional, Mapping, cast import uamqp @@ -41,9 +42,11 @@ ) if TYPE_CHECKING: - import datetime - from azure.core.credentials import TokenCredential, AzureSasCredential, AzureNamedKeyCredential - + from azure.core.credentials import ( + TokenCredential, + AzureSasCredential, + AzureNamedKeyCredential, + ) MessageTypes = Union[ Mapping[str, Any], ServiceBusMessage, @@ -149,8 +152,15 @@ class ServiceBusSender(BaseHandler, SenderMixin): :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. """ - def __init__(self, fully_qualified_namespace, credential, **kwargs): - # type: (str, Union[TokenCredential, AzureSasCredential, AzureNamedKeyCredential], Any) -> None + def __init__( + self, + fully_qualified_namespace: str, + credential: Union["TokenCredential", "AzureSasCredential", "AzureNamedKeyCredential"], + *, + queue_name: Optional[str] = None, + topic_name: Optional[str] = None, + **kwargs: Any + ) -> None: if kwargs.get("entity_name"): super(ServiceBusSender, self).__init__( fully_qualified_namespace=fully_qualified_namespace, @@ -158,8 +168,6 @@ def __init__(self, fully_qualified_namespace, credential, **kwargs): **kwargs ) else: - queue_name = kwargs.get("queue_name") - topic_name = kwargs.get("topic_name") if queue_name and topic_name: raise ValueError( "Queue/Topic name can not be specified simultaneously." @@ -172,7 +180,9 @@ def __init__(self, fully_qualified_namespace, credential, **kwargs): super(ServiceBusSender, self).__init__( fully_qualified_namespace=fully_qualified_namespace, credential=credential, - entity_name=entity_name, + entity_name=str(entity_name), + queue_name=queue_name, + topic_name=topic_name, **kwargs ) @@ -263,8 +273,13 @@ def _send(self, message, timeout=None, last_exception=None): finally: # reset the timeout of the handler back to the default value self._set_msg_timeout(default_timeout, None) - def schedule_messages(self, messages, schedule_time_utc, **kwargs): - # type: (MessageTypes, datetime.datetime, Any) -> List[int] + def schedule_messages( + self, + messages: "MessageTypes", + schedule_time_utc: datetime.datetime, + *, + timeout: Optional[float] = None + ) -> List[int]: """Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. @@ -290,7 +305,6 @@ def schedule_messages(self, messages, schedule_time_utc, **kwargs): self._check_live() obj_messages = transform_messages_if_needed(messages, ServiceBusMessage) - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") @@ -314,8 +328,12 @@ def schedule_messages(self, messages, schedule_time_utc, **kwargs): timeout=timeout, ) - def cancel_scheduled_messages(self, sequence_numbers, **kwargs): - # type: (Union[int, List[int]], Any) -> None + def cancel_scheduled_messages( + self, + sequence_numbers: Union[int, List[int]], + *, + timeout: Optional[float] = None + ) -> None: """ Cancel one or more messages that have previously been scheduled and are still pending. @@ -337,7 +355,6 @@ def cancel_scheduled_messages(self, sequence_numbers, **kwargs): :caption: Cancelling messages scheduled to be sent in future """ self._check_live() - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") if isinstance(sequence_numbers, int): @@ -354,8 +371,12 @@ def cancel_scheduled_messages(self, sequence_numbers, **kwargs): timeout=timeout, ) - def send_messages(self, message, **kwargs): - # type: (Union[MessageTypes, ServiceBusMessageBatch], Any) -> None + def send_messages( + self, + message: Union["MessageTypes", ServiceBusMessageBatch], + *, + timeout: Optional[float] = None + ) -> None: """Sends message and blocks until acknowledgement is received or operation times out. If a list of messages was provided, attempts to send them as a single batch, throwing a @@ -387,7 +408,6 @@ def send_messages(self, message, **kwargs): """ self._check_live() - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") @@ -422,8 +442,10 @@ def send_messages(self, message, **kwargs): require_last_exception=True, ) - def create_message_batch(self, max_size_in_bytes=None): - # type: (Optional[int]) -> ServiceBusMessageBatch + def create_message_batch( + self, + max_size_in_bytes: Optional[int] = None + ) -> ServiceBusMessageBatch: """Create a ServiceBusMessageBatch object with the max size of all content being constrained by max_size_in_bytes. The max_size should be no greater than the max allowed message size defined by the service. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py index 4ea4f06b623e..3e0d9657b26b 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py @@ -4,7 +4,7 @@ # -------------------------------------------------------------------------------------------- import logging import datetime -from typing import TYPE_CHECKING, Union, Optional, Any +from typing import TYPE_CHECKING, Union, Optional import six from ._common.utils import utc_from_timestamp, utc_now @@ -88,8 +88,7 @@ class ServiceBusSession(BaseSession): :caption: Get session from a receiver """ - def get_state(self, **kwargs): - # type: (Any) -> bytes + def get_state(self, *, timeout: Optional[float] = None) -> bytes: # pylint: disable=protected-access """Get the session state. @@ -109,7 +108,6 @@ def get_state(self, **kwargs): :caption: Get the session state """ self._receiver._check_live() # pylint: disable=protected-access - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") response = self._receiver._mgmt_request_response_with_retry( @@ -121,8 +119,7 @@ def get_state(self, **kwargs): session_state = response.get(MGMT_RESPONSE_SESSION_STATE) # type: ignore return session_state - def set_state(self, state, **kwargs): - # type: (Union[str, bytes, bytearray], Any) -> None + def set_state(self, state: Union[str, bytes, bytearray], *, timeout: Optional[float] = None) -> None: # pylint: disable=protected-access """Set the session state. @@ -141,7 +138,6 @@ def set_state(self, state, **kwargs): :caption: Set the session state """ self._receiver._check_live() # pylint: disable=protected-access - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") state = ( @@ -157,8 +153,7 @@ def set_state(self, state, **kwargs): timeout=timeout, ) - def renew_lock(self, **kwargs): - # type: (Any) -> datetime.datetime + def renew_lock(self, *, timeout: Optional[float] = None) -> datetime.datetime: # pylint: disable=protected-access """Renew the session lock. @@ -185,7 +180,6 @@ def renew_lock(self, **kwargs): :caption: Renew the session lock before it expires """ self._receiver._check_live() # pylint: disable=protected-access - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") expiry = self._receiver._mgmt_request_response_with_retry( diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index 54ca2cc98d2d..9b32c03d600a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -2,7 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -from typing import Any, Union, TYPE_CHECKING +from typing import Any, Union, Optional, TYPE_CHECKING import logging from weakref import WeakSet @@ -17,8 +17,12 @@ from ._servicebus_sender_async import ServiceBusSender from ._servicebus_receiver_async import ServiceBusReceiver from .._common._configuration import Configuration +from .._common.auto_lock_renewer import AutoLockRenewer from .._common.utils import generate_dead_letter_entity_name, strip_protocol_from_uri -from .._common.constants import ServiceBusSubQueue +from .._common.constants import ( + ServiceBusSubQueue, + ServiceBusReceiveMode +) from ._async_utils import create_authentication if TYPE_CHECKING: @@ -76,7 +80,14 @@ class ServiceBusClient(object): def __init__( self, fully_qualified_namespace: str, - credential: Union["AsyncTokenCredential", AzureSasCredential, AzureNamedKeyCredential], + credential: Union[ + "AsyncTokenCredential", AzureSasCredential, AzureNamedKeyCredential + ], + *, + retry_total: int = 3, + retry_backoff_factor: float = 0.8, + retry_backoff_max: int = 120, + retry_mode: str = 'exponential', **kwargs: Any ) -> None: # If the user provided http:// or sb://, let's be polite and strip that. @@ -84,7 +95,13 @@ def __init__( fully_qualified_namespace.strip() ) self._credential = credential - self._config = Configuration(**kwargs) + self._config = Configuration( + retry_total=retry_total, + retry_backoff_factor=retry_backoff_factor, + retry_backoff_max=retry_backoff_max, + retry_mode=retry_mode, + **kwargs + ) self._connection = None # Optional entity name, can be the name of Queue or Topic. Intentionally not advertised, typically be needed. self._entity_name = kwargs.get("entity_name") @@ -112,7 +129,16 @@ async def _create_uamqp_connection(self): ) @classmethod - def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "ServiceBusClient": + def from_connection_string( + cls, + conn_str: str, + *, + retry_total: int = 3, + retry_backoff_factor: float = 0.8, + retry_backoff_max: int = 120, + retry_mode: str = 'exponential', + **kwargs: Any + ) -> "ServiceBusClient": """ Create a ServiceBusClient from a connection string. @@ -154,10 +180,14 @@ def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "ServiceBusClie credential = ServiceBusSASTokenCredential(token, token_expiry) elif policy and key: credential = ServiceBusSharedKeyCredential(policy, key) # type: ignore - return cls( # type: ignore # for credential + return cls( # type: ignore # for credential fully_qualified_namespace=host, entity_name=entity_in_conn_str or kwargs.pop("entity_name", None), credential=credential, # type: ignore + retry_total=retry_total, + retry_backoff_factor=retry_backoff_factor, + retry_backoff_max=retry_backoff_max, + retry_mode=retry_mode, **kwargs ) @@ -224,7 +254,20 @@ def get_queue_sender(self, queue_name: str, **kwargs: Any) -> ServiceBusSender: self._handlers.add(handler) return handler - def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiver: + def get_queue_receiver( + self, + queue_name: str, + *, + session_id: Optional[str] = None, + sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None, + receive_mode: Union[ + ServiceBusReceiveMode, str + ] = ServiceBusReceiveMode.PEEK_LOCK, + max_wait_time: Optional[float] = None, + auto_lock_renewer: Optional[AutoLockRenewer] = None, + prefetch_count: int = 0, + **kwargs: Any + ) -> ServiceBusReceiver: """Get ServiceBusReceiver for the specific queue. :param str queue_name: The path of specific Service Bus Queue the client connects to. @@ -276,8 +319,7 @@ def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiv "the connection string used to construct the ServiceBusClient." ) - sub_queue = kwargs.get("sub_queue", None) - if sub_queue and kwargs.get("session_id"): + if sub_queue and session_id: raise ValueError( "session_id and sub_queue can not be specified simultaneously. " "To connect to the sub queue of a sessionful queue, " @@ -309,6 +351,12 @@ def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiv retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, + session_id=session_id, + sub_queue=sub_queue, + receive_mode=receive_mode, + max_wait_time=max_wait_time, + auto_lock_renewer=auto_lock_renewer, + prefetch_count=prefetch_count, **kwargs ) self._handlers.add(handler) @@ -356,7 +404,19 @@ def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender: return handler def get_subscription_receiver( - self, topic_name: str, subscription_name: str, **kwargs: Any + self, + topic_name: str, + subscription_name: str, + *, + session_id: Optional[str] = None, + sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None, + receive_mode: Union[ + ServiceBusReceiveMode, str + ] = ServiceBusReceiveMode.PEEK_LOCK, + max_wait_time: Optional[float] = None, + auto_lock_renewer: Optional[AutoLockRenewer] = None, + prefetch_count: int = 0, + **kwargs: Any ) -> ServiceBusReceiver: """Get ServiceBusReceiver for the specific subscription under the topic. @@ -412,8 +472,7 @@ def get_subscription_receiver( "the connection string used to construct the ServiceBusClient." ) - sub_queue = kwargs.get("sub_queue", None) - if sub_queue and kwargs.get("session_id"): + if sub_queue and session_id: raise ValueError( "session_id and sub_queue can not be specified simultaneously. " "To connect to the sub queue of a sessionful subscription, " @@ -441,6 +500,12 @@ def get_subscription_receiver( retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, + session_id=session_id, + sub_queue=sub_queue, + receive_mode=receive_mode, + max_wait_time=max_wait_time, + auto_lock_renewer=auto_lock_renewer, + prefetch_count=prefetch_count, **kwargs ) except ValueError: @@ -462,6 +527,12 @@ def get_subscription_receiver( retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, + session_id=session_id, + sub_queue=sub_queue, + receive_mode=receive_mode, + max_wait_time=max_wait_time, + auto_lock_renewer=auto_lock_renewer, + prefetch_count=prefetch_count, **kwargs ) self._handlers.add(handler) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index a920bd32c44d..975c72352a68 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -7,13 +7,12 @@ import datetime import functools import logging -from typing import Any, TYPE_CHECKING, List, Optional, AsyncIterator, Union, Callable +from typing import Any, List, Optional, AsyncIterator, Union, Callable, TYPE_CHECKING import six from uamqp import ReceiveClientAsync, types, Message from uamqp.constants import SenderSettleMode -from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential from ..exceptions import ServiceBusError from ._servicebus_session_async import ServiceBusSession @@ -55,6 +54,8 @@ if TYPE_CHECKING: from azure.core.credentials_async import AsyncTokenCredential + from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential + from .._common.auto_lock_renewer import AutoLockRenewer _LOGGER = logging.getLogger(__name__) @@ -120,7 +121,17 @@ class ServiceBusReceiver(collections.abc.AsyncIterator, BaseHandler, ReceiverMix def __init__( self, fully_qualified_namespace: str, - credential: Union["AsyncTokenCredential", AzureSasCredential, AzureNamedKeyCredential], + credential: Union["AsyncTokenCredential", "AzureSasCredential", "AzureNamedKeyCredential"], + *, + queue_name: Optional[str] = None, + topic_name: Optional[str] = None, + subscription_name: Optional[str] = None, + receive_mode: Union[ + ServiceBusReceiveMode, str + ] = ServiceBusReceiveMode.PEEK_LOCK, + max_wait_time: Optional[float] = None, + auto_lock_renewer: Optional["AutoLockRenewer"] = None, + prefetch_count: int = 0, **kwargs: Any ) -> None: self._message_iter = ( @@ -130,12 +141,16 @@ def __init__( super(ServiceBusReceiver, self).__init__( fully_qualified_namespace=fully_qualified_namespace, credential=credential, + queue_name=queue_name, + topic_name=topic_name, + subscription_name=subscription_name, + receive_mode=receive_mode, + max_wait_time=max_wait_time, + auto_lock_renewer=auto_lock_renewer, + prefetch_count=prefetch_count, **kwargs ) else: - queue_name = kwargs.get("queue_name") - topic_name = kwargs.get("topic_name") - subscription_name = kwargs.get("subscription_name") if queue_name and topic_name: raise ValueError( "Queue/Topic name can not be specified simultaneously." @@ -155,10 +170,26 @@ def __init__( fully_qualified_namespace=fully_qualified_namespace, credential=credential, entity_name=str(entity_name), + queue_name=queue_name, + topic_name=topic_name, + subscription_name=subscription_name, + receive_mode=receive_mode, + max_wait_time=max_wait_time, + auto_lock_renewer=auto_lock_renewer, + prefetch_count=prefetch_count, **kwargs ) - self._populate_attributes(**kwargs) + self._populate_attributes( + queue_name=queue_name, + topic_name=topic_name, + subscription_name=subscription_name, + receive_mode=receive_mode, + max_wait_time=max_wait_time, + auto_lock_renewer=auto_lock_renewer, + prefetch_count=prefetch_count, + **kwargs + ) self._session = ( None if self._session_id is None else ServiceBusSession(self._session_id, self) ) @@ -616,7 +647,7 @@ async def receive_messages( return messages async def receive_deferred_messages( - self, sequence_numbers: Union[int, List[int]], **kwargs: Any + self, sequence_numbers: Union[int, List[int]], *, timeout: Optional[float] = None ) -> List[ServiceBusReceivedMessage]: """Receive messages that have previously been deferred. @@ -640,7 +671,6 @@ async def receive_deferred_messages( """ self._check_live() - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") if isinstance(sequence_numbers, six.integer_types): @@ -688,7 +718,7 @@ async def receive_deferred_messages( return messages async def peek_messages( - self, max_message_count: int = 1, **kwargs: Any + self, max_message_count: int = 1, *, sequence_number: int = 0, timeout: Optional[float] = None ) -> List[ServiceBusReceivedMessage]: """Browse messages currently pending in the queue. @@ -712,8 +742,6 @@ async def peek_messages( :caption: Peek messages in the queue. """ self._check_live() - sequence_number = kwargs.pop("sequence_number", 0) - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") if not sequence_number: @@ -739,7 +767,7 @@ async def peek_messages( ): return messages - async def complete_message(self, message): + async def complete_message(self, message: ServiceBusReceivedMessage) -> None: """Complete the message. This removes the message from the queue. @@ -763,7 +791,7 @@ async def complete_message(self, message): """ await self._settle_message_with_retry(message, MESSAGE_COMPLETE) - async def abandon_message(self, message): + async def abandon_message(self, message: ServiceBusReceivedMessage) -> None: """Abandon the message. This message will be returned to the queue and made available to be received again. @@ -787,7 +815,7 @@ async def abandon_message(self, message): """ await self._settle_message_with_retry(message, MESSAGE_ABANDON) - async def defer_message(self, message): + async def defer_message(self, message: ServiceBusReceivedMessage) -> None: """Defers the message. This message will remain in the queue but must be requested @@ -812,7 +840,12 @@ async def defer_message(self, message): """ await self._settle_message_with_retry(message, MESSAGE_DEFER) - async def dead_letter_message(self, message, reason=None, error_description=None): + async def dead_letter_message( + self, + message: ServiceBusReceivedMessage, + reason: Optional[str] = None, + error_description: Optional[str] = None + ) -> None: """Move the message to the Dead Letter queue. The Dead Letter queue is a sub-queue that can be @@ -845,8 +878,9 @@ async def dead_letter_message(self, message, reason=None, error_description=None dead_letter_error_description=error_description, ) - async def renew_message_lock(self, message, **kwargs): - # type: (ServiceBusReceivedMessage, Any) -> datetime.datetime + async def renew_message_lock( + self, message: ServiceBusReceivedMessage, *, timeout: Optional[float] = None + ) -> datetime.datetime: # pylint: disable=protected-access,no-member """Renew the message lock. @@ -894,7 +928,6 @@ async def renew_message_lock(self, message, **kwargs): if not token: raise ValueError("Unable to renew lock - no lock token found.") - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index b377f1fae964..92256d6b8d1e 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -91,6 +91,9 @@ def __init__( self, fully_qualified_namespace: str, credential: Union["AsyncTokenCredential", AzureSasCredential, AzureNamedKeyCredential], + *, + queue_name: Optional[str] = None, + topic_name: Optional[str] = None, **kwargs: Any ) -> None: if kwargs.get("entity_name"): @@ -100,8 +103,6 @@ def __init__( **kwargs ) else: - queue_name = kwargs.get("queue_name") - topic_name = kwargs.get("topic_name") if queue_name and topic_name: raise ValueError( "Queue/Topic name can not be specified simultaneously." @@ -115,6 +116,8 @@ def __init__( fully_qualified_namespace=fully_qualified_namespace, credential=credential, entity_name=str(entity_name), + queue_name=queue_name, + topic_name=topic_name, **kwargs ) @@ -203,7 +206,8 @@ async def schedule_messages( self, messages: MessageTypes, schedule_time_utc: datetime.datetime, - **kwargs: Any + *, + timeout: Optional[float] = None ) -> List[int]: """Send Message or multiple Messages to be enqueued at a specific time by the service. Returns a list of the sequence numbers of the enqueued messages. @@ -230,7 +234,6 @@ async def schedule_messages( self._check_live() obj_messages = transform_messages_if_needed(messages, ServiceBusMessage) - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") with send_trace_context_manager(span_name=SPAN_NAME_SCHEDULE) as send_span: @@ -254,7 +257,7 @@ async def schedule_messages( ) async def cancel_scheduled_messages( - self, sequence_numbers: Union[int, List[int]], **kwargs: Any + self, sequence_numbers: Union[int, List[int]], *, timeout: Optional[float] = None ) -> None: """ Cancel one or more messages that have previously been scheduled and are still pending. @@ -277,7 +280,6 @@ async def cancel_scheduled_messages( :caption: Cancelling messages scheduled to be sent in future """ self._check_live() - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") if isinstance(sequence_numbers, int): @@ -295,7 +297,7 @@ async def cancel_scheduled_messages( ) async def send_messages( - self, message: Union[MessageTypes, ServiceBusMessageBatch], **kwargs: Any + self, message: Union[MessageTypes, ServiceBusMessageBatch], *, timeout: Optional[float] = None ) -> None: """Sends message and blocks until acknowledgement is received or operation times out. @@ -329,7 +331,6 @@ async def send_messages( """ self._check_live() - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py index 712eebd2c374..8d10b3231d73 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py @@ -4,7 +4,7 @@ # -------------------------------------------------------------------------------------------- import logging import datetime -from typing import Union, Any +from typing import Union, Optional import six from .._servicebus_session import BaseSession @@ -40,7 +40,7 @@ class ServiceBusSession(BaseSession): :caption: Get session from a receiver """ - async def get_state(self, **kwargs: Any) -> bytes: + async def get_state(self, *, timeout: Optional[float] = None) -> bytes: """Get the session state. Returns None if no state has been set. @@ -59,7 +59,6 @@ async def get_state(self, **kwargs: Any) -> bytes: :caption: Get the session state """ self._receiver._check_live() # pylint: disable=protected-access - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") response = await self._receiver._mgmt_request_response_with_retry( # pylint: disable=protected-access @@ -72,7 +71,7 @@ async def get_state(self, **kwargs: Any) -> bytes: return session_state async def set_state( - self, state: Union[str, bytes, bytearray], **kwargs: Any + self, state: Union[str, bytes, bytearray], *, timeout: Optional[float] = None ) -> None: """Set the session state. @@ -92,7 +91,6 @@ async def set_state( :caption: Set the session state """ self._receiver._check_live() # pylint: disable=protected-access - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") state = ( @@ -108,7 +106,7 @@ async def set_state( timeout=timeout, ) - async def renew_lock(self, **kwargs: Any) -> datetime.datetime: + async def renew_lock(self, *, timeout: Optional[float] = None) -> datetime.datetime: """Renew the session lock. This operation must be performed periodically in order to retain a lock on the @@ -134,7 +132,6 @@ async def renew_lock(self, **kwargs: Any) -> datetime.datetime: :caption: Renew the session lock before it expires """ self._receiver._check_live() # pylint: disable=protected-access - timeout = kwargs.pop("timeout", None) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") expiry = await self._receiver._mgmt_request_response_with_retry( # pylint: disable=protected-access diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/management/_management_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/management/_management_client_async.py index a30b92cf89c7..531fc71f6afc 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/management/_management_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/management/_management_client_async.py @@ -6,8 +6,9 @@ # pylint:disable=specify-parameter-names-in-call # pylint:disable=too-many-lines import functools +import datetime from copy import deepcopy -from typing import TYPE_CHECKING, Any, Union, cast, Mapping +from typing import Any, Union, cast, Mapping, Optional, List, TYPE_CHECKING from xml.etree.ElementTree import ElementTree from azure.core.async_paging import AsyncItemPaged @@ -62,6 +63,7 @@ from ...management._api_version import DEFAULT_VERSION from ._shared_key_policy_async import AsyncServiceBusSharedKeyCredentialPolicy from ...management._models import ( + AuthorizationRule, QueueRuntimeProperties, QueueProperties, TopicProperties, @@ -71,6 +73,9 @@ RuleProperties, NamespaceProperties, TrueRuleFilter, + CorrelationRuleFilter, + SqlRuleFilter, + SqlRuleAction, ) from ...management._xml_workaround_policy import ServiceBusXMLWorkaroundPolicy from ...management._handle_response_error import _handle_response_error @@ -86,9 +91,7 @@ ) if TYPE_CHECKING: - from azure.core.credentials_async import ( - AsyncTokenCredential, - ) # pylint:disable=ungrouped-imports + from azure.core.credentials_async import AsyncTokenCredential class ServiceBusAdministrationClient: # pylint:disable=too-many-public-methods @@ -106,11 +109,13 @@ def __init__( self, fully_qualified_namespace: str, credential: "AsyncTokenCredential", + *, + api_version: str = DEFAULT_VERSION, **kwargs: Any ) -> None: self.fully_qualified_namespace = fully_qualified_namespace - self._api_version = kwargs.pop("api_version", DEFAULT_VERSION) + self._api_version = api_version self._credential = credential self._endpoint = "https://" + fully_qualified_namespace self._config = ServiceBusManagementClientConfiguration(self._endpoint, **kwargs) @@ -163,10 +168,7 @@ async def _get_entity_element(self, entity_name, enrich=False, **kwargs): element = cast( ElementTree, await self._impl.entity.get( - entity_name, - enrich=enrich, - api_version=self._api_version, - **kwargs + entity_name, enrich=enrich, api_version=self._api_version, **kwargs ), ) return element @@ -237,7 +239,7 @@ async def _populate_header_within_kwargs(uri, header): @classmethod def from_connection_string( - cls, conn_str: str, **kwargs: Any + cls, conn_str: str, *, api_version: str = DEFAULT_VERSION, **kwargs: Any ) -> "ServiceBusAdministrationClient": """Create a client from connection string. @@ -261,7 +263,7 @@ def from_connection_string( credential = ServiceBusSharedKeyCredential(shared_access_key_name, shared_access_key) # type: ignore if "//" in endpoint: endpoint = endpoint[endpoint.index("//") + 2 :] - return cls(endpoint, credential, **kwargs) # type: ignore + return cls(endpoint, credential, api_version=api_version, **kwargs) # type: ignore async def get_queue(self, queue_name: str, **kwargs) -> QueueProperties: """Get the properties of a queue. @@ -295,7 +297,31 @@ async def get_queue_runtime_properties( ) return runtime_properties - async def create_queue(self, queue_name: str, **kwargs) -> QueueProperties: + async def create_queue( # pylint: disable=too-many-locals + self, + queue_name: str, + *, + authorization_rules: Optional[List[AuthorizationRule]] = None, + auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None, + dead_lettering_on_message_expiration: Optional[bool] = None, + default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None, + duplicate_detection_history_time_window: Optional[ + Union[datetime.timedelta, str] + ] = None, + enable_batched_operations: Optional[bool] = None, + enable_express: Optional[bool] = None, + enable_partitioning: Optional[bool] = None, + lock_duration: Optional[Union[datetime.timedelta, str]] = None, + max_delivery_count: Optional[int] = None, + max_size_in_megabytes: Optional[int] = None, + requires_duplicate_detection: Optional[bool] = None, + requires_session: Optional[bool] = None, + forward_to: Optional[str] = None, + user_metadata: Optional[str] = None, + forward_dead_lettered_messages_to: Optional[str] = None, + max_message_size_in_kilobytes: Optional[int] = None, + **kwargs: Any + ) -> QueueProperties: """Create a queue. :param queue_name: Name of the queue. @@ -362,43 +388,35 @@ async def create_queue(self, queue_name: str, **kwargs) -> QueueProperties: :rtype: ~azure.servicebus.management.QueueProperties """ forward_to = _normalize_entity_path_to_full_path_if_needed( - kwargs.pop("forward_to", None), self.fully_qualified_namespace + forward_to, self.fully_qualified_namespace ) forward_dead_lettered_messages_to = ( _normalize_entity_path_to_full_path_if_needed( - kwargs.pop("forward_dead_lettered_messages_to", None), + forward_dead_lettered_messages_to, self.fully_qualified_namespace, ) ) queue = QueueProperties( queue_name, - authorization_rules=kwargs.pop("authorization_rules", None), - auto_delete_on_idle=kwargs.pop("auto_delete_on_idle", None), - dead_lettering_on_message_expiration=kwargs.pop( - "dead_lettering_on_message_expiration", None - ), - default_message_time_to_live=kwargs.pop( - "default_message_time_to_live", None - ), - duplicate_detection_history_time_window=kwargs.pop( - "duplicate_detection_history_time_window", None - ), + authorization_rules=authorization_rules, + auto_delete_on_idle=auto_delete_on_idle, + dead_lettering_on_message_expiration=dead_lettering_on_message_expiration, + default_message_time_to_live=default_message_time_to_live, + duplicate_detection_history_time_window=duplicate_detection_history_time_window, availability_status=None, - enable_batched_operations=kwargs.pop("enable_batched_operations", None), - enable_express=kwargs.pop("enable_express", None), - enable_partitioning=kwargs.pop("enable_partitioning", None), - lock_duration=kwargs.pop("lock_duration", None), - max_delivery_count=kwargs.pop("max_delivery_count", None), - max_size_in_megabytes=kwargs.pop("max_size_in_megabytes", None), - requires_duplicate_detection=kwargs.pop( - "requires_duplicate_detection", None - ), - requires_session=kwargs.pop("requires_session", None), + enable_batched_operations=enable_batched_operations, + enable_express=enable_express, + enable_partitioning=enable_partitioning, + lock_duration=lock_duration, + max_delivery_count=max_delivery_count, + max_size_in_megabytes=max_size_in_megabytes, + requires_duplicate_detection=requires_duplicate_detection, + requires_session=requires_session, status=kwargs.pop("status", None), forward_to=forward_to, forward_dead_lettered_messages_to=forward_dead_lettered_messages_to, - user_metadata=kwargs.pop("user_metadata", None), - max_message_size_in_kilobytes=kwargs.pop("max_message_size_in_kilobytes", None) + user_metadata=user_metadata, + max_message_size_in_kilobytes=max_message_size_in_kilobytes, ) to_create = queue._to_internal_entity(self.fully_qualified_namespace) create_entity_body = CreateQueueBody( @@ -560,7 +578,30 @@ async def get_topic_runtime_properties( ) return topic_description - async def create_topic(self, topic_name: str, **kwargs) -> TopicProperties: + async def create_topic( + self, + topic_name: str, + *, + default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None, + max_size_in_megabytes: Optional[ + int + ] = None, + requires_duplicate_detection: Optional[bool] = None, + duplicate_detection_history_time_window: Optional[ + Union[datetime.timedelta, str] + ] = None, + enable_batched_operations: Optional[bool] = None, + size_in_bytes: Optional[int] = None, + filtering_messages_before_publishing: Optional[bool] = None, + authorization_rules: Optional[List[AuthorizationRule]] = None, + support_ordering: Optional[bool] = None, + auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None, + enable_partitioning: Optional[bool] = None, + enable_express: Optional[bool] = None, + user_metadata: Optional[str] = None, + max_message_size_in_kilobytes: Optional[int] = None, + **kwargs: Any + ) -> TopicProperties: """Create a topic. :param topic_name: Name of the topic. @@ -572,7 +613,7 @@ async def create_topic(self, topic_name: str, **kwargs) -> TopicProperties: :paramtype default_message_time_to_live: Union[~datetime.timedelta, str] :keyword max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of memory allocated for the topic. - :paramtype max_size_in_megabytes: long + :paramtype max_size_in_megabytes: int :keyword requires_duplicate_detection: A value indicating if this topic requires duplicate detection. :paramtype requires_duplicate_detection: bool @@ -615,27 +656,24 @@ async def create_topic(self, topic_name: str, **kwargs) -> TopicProperties: topic = TopicProperties( topic_name, - default_message_time_to_live=kwargs.pop( - "default_message_time_to_live", None - ), - max_size_in_megabytes=kwargs.pop("max_size_in_megabytes", None), - requires_duplicate_detection=kwargs.pop( - "requires_duplicate_detection", None - ), - duplicate_detection_history_time_window=kwargs.pop( - "duplicate_detection_history_time_window", None - ), - enable_batched_operations=kwargs.pop("enable_batched_operations", None), - size_in_bytes=kwargs.pop("size_in_bytes", None), - authorization_rules=kwargs.pop("authorization_rules", None), + default_message_time_to_live=default_message_time_to_live, + max_size_in_megabytes=max_size_in_megabytes, + requires_duplicate_detection=requires_duplicate_detection, + # TODO: ask why default of 10 mins isn't followed below, + duplicate_detection_history_time_window=duplicate_detection_history_time_window, + enable_batched_operations=enable_batched_operations, + size_in_bytes=size_in_bytes, + authorization_rules=authorization_rules, + filtering_messages_before_publishing=filtering_messages_before_publishing, status=kwargs.pop("status", None), - support_ordering=kwargs.pop("support_ordering", None), - auto_delete_on_idle=kwargs.pop("auto_delete_on_idle", None), - enable_partitioning=kwargs.pop("enable_partitioning", None), + support_ordering=support_ordering, + auto_delete_on_idle=auto_delete_on_idle, + enable_partitioning=enable_partitioning, availability_status=None, - enable_express=kwargs.pop("enable_express", None), - user_metadata=kwargs.pop("user_metadata", None), - max_message_size_in_kilobytes=kwargs.pop("max_message_size_in_kilobytes", None) + enable_express=enable_express, + user_metadata=user_metadata, + max_message_size_in_kilobytes=max_message_size_in_kilobytes, + **kwargs ) to_create = topic._to_internal_entity() @@ -808,7 +846,22 @@ async def get_subscription_runtime_properties( return subscription async def create_subscription( - self, topic_name: str, subscription_name: str, **kwargs + self, + topic_name: str, + subscription_name: str, + *, + lock_duration: Optional[Union[datetime.timedelta, str]] = None, + requires_session: Optional[bool] = None, + default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None, + dead_lettering_on_message_expiration: Optional[bool] = None, + dead_lettering_on_filter_evaluation_exceptions: Optional[bool] = None, + max_delivery_count: Optional[int] = None, + enable_batched_operations: Optional[bool] = None, + forward_to: Optional[str] = None, + user_metadata: Optional[str] = None, + forward_dead_lettered_messages_to: Optional[str] = None, + auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None, + **kwargs: Any ) -> SubscriptionProperties: """Create a topic subscription. @@ -859,35 +912,29 @@ async def create_subscription( # pylint:disable=protected-access _validate_entity_name_type(topic_name, display_name="topic_name") forward_to = _normalize_entity_path_to_full_path_if_needed( - kwargs.pop("forward_to", None), self.fully_qualified_namespace + forward_to, self.fully_qualified_namespace ) forward_dead_lettered_messages_to = ( _normalize_entity_path_to_full_path_if_needed( - kwargs.pop("forward_dead_lettered_messages_to", None), + forward_dead_lettered_messages_to, self.fully_qualified_namespace, ) ) subscription = SubscriptionProperties( subscription_name, - lock_duration=kwargs.pop("lock_duration", None), - requires_session=kwargs.pop("requires_session", None), - default_message_time_to_live=kwargs.pop( - "default_message_time_to_live", None - ), - dead_lettering_on_message_expiration=kwargs.pop( - "dead_lettering_on_message_expiration", None - ), - dead_lettering_on_filter_evaluation_exceptions=kwargs.pop( - "dead_lettering_on_filter_evaluation_exceptions", None - ), - max_delivery_count=kwargs.pop("max_delivery_count", None), - enable_batched_operations=kwargs.pop("enable_batched_operations", None), + lock_duration=lock_duration, + requires_session=requires_session, + default_message_time_to_live=default_message_time_to_live, + dead_lettering_on_message_expiration=dead_lettering_on_message_expiration, + dead_lettering_on_filter_evaluation_exceptions=dead_lettering_on_filter_evaluation_exceptions, + max_delivery_count=max_delivery_count, + enable_batched_operations=enable_batched_operations, status=kwargs.pop("status", None), forward_to=forward_to, - user_metadata=kwargs.pop("user_metadata", None), + user_metadata=user_metadata, forward_dead_lettered_messages_to=forward_dead_lettered_messages_to, - auto_delete_on_idle=kwargs.pop("auto_delete_on_idle", None), + auto_delete_on_idle=auto_delete_on_idle, availability_status=None, ) to_create = subscription._to_internal_entity(self.fully_qualified_namespace) # type: ignore @@ -940,8 +987,12 @@ async def update_subscription( _validate_entity_name_type(topic_name, display_name="topic_name") # we should not mutate the input, making a copy first for update - subscription = deepcopy(create_properties_from_dict_if_needed(subscription, SubscriptionProperties)) - to_update = subscription._to_internal_entity(self.fully_qualified_namespace, kwargs) + subscription = deepcopy( + create_properties_from_dict_if_needed(subscription, SubscriptionProperties) + ) + to_update = subscription._to_internal_entity( + self.fully_qualified_namespace, kwargs + ) create_entity_body = CreateSubscriptionBody( content=CreateSubscriptionBodyContent( @@ -1062,7 +1113,16 @@ async def get_rule( return rule_description async def create_rule( - self, topic_name: str, subscription_name: str, rule_name: str, **kwargs + self, + topic_name: str, + subscription_name: str, + rule_name: str, + *, + filter: Union[ # pylint: disable=redefined-builtin + CorrelationRuleFilter, SqlRuleFilter + ]=TrueRuleFilter(), + action: Optional[SqlRuleAction] = None, + **kwargs: Any ) -> RuleProperties: """Create a rule for a topic subscription. @@ -1084,8 +1144,8 @@ async def create_rule( rule = RuleProperties( rule_name, - filter=kwargs.pop("filter", TrueRuleFilter()), - action=kwargs.pop("action", None), + filter=filter, + action=action, created_at_utc=None, ) to_create = rule._to_internal_entity() diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/amqp/_amqp_message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/amqp/_amqp_message.py index ec60df31e3cc..bbf69aec6463 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/amqp/_amqp_message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/amqp/_amqp_message.py @@ -5,8 +5,9 @@ # ------------------------------------------------------------------------- import time +import uuid from datetime import datetime -from typing import Optional, Any, cast, Mapping +from typing import Optional, Any, cast, Mapping, Union, Dict from msrest.serialization import TZ_UTC import uamqp @@ -112,8 +113,17 @@ class AmqpAnnotatedMessage(object): :paramtype delivery_annotations: Optional[dict] """ - def __init__(self, **kwargs): - # type: (Any) -> None + def __init__( + self, + *, + header: Optional["AmqpMessageHeader"] = None, + footer: Optional[Dict[str, Any]] = None, + properties: Optional["AmqpMessageProperties"] = None, + application_properties: Optional[Dict[str, Any]] = None, + annotations: Optional[Dict[str, Any]] = None, + delivery_annotations: Optional[Dict[str, Any]] = None, + **kwargs: Any + ) -> None: self._message = kwargs.pop("message", None) self._encoding = kwargs.pop("encoding", "UTF-8") @@ -143,14 +153,14 @@ def __init__(self, **kwargs): self._body_type = uamqp.MessageBodyType.Value self._message = uamqp.message.Message(body=self._body, body_type=self._body_type) - header_dict = cast(Mapping, kwargs.get("header")) - self._header = AmqpMessageHeader(**header_dict) if "header" in kwargs else None - self._footer = kwargs.get("footer") - properties_dict = cast(Mapping, kwargs.get("properties")) - self._properties = AmqpMessageProperties(**properties_dict) if "properties" in kwargs else None - self._application_properties = kwargs.get("application_properties") - self._annotations = kwargs.get("annotations") - self._delivery_annotations = kwargs.get("delivery_annotations") + header_dict = cast(Mapping, header) + self._header = AmqpMessageHeader(**header_dict) if header else None + self._footer = footer + properties_dict = cast(Mapping, properties) + self._properties = AmqpMessageProperties(**properties_dict) if properties else None + self._application_properties = application_properties + self._annotations = annotations + self._delivery_annotations = delivery_annotations def __str__(self): # type: () -> str @@ -483,12 +493,20 @@ class AmqpMessageHeader(DictMixin): priority messages. Messages with higher priorities MAY be delivered before those with lower priorities. :vartype priority: Optional[int] """ - def __init__(self, **kwargs): - self.delivery_count = kwargs.get("delivery_count") - self.time_to_live = kwargs.get("time_to_live") - self.first_acquirer = kwargs.get("first_acquirer") - self.durable = kwargs.get("durable") - self.priority = kwargs.get("priority") + def __init__( + self, + *, + delivery_count: Optional[int] = None, + time_to_live: Optional[int] = None, + durable: Optional[bool] = None, + first_acquirer: Optional[bool] = None, + priority: Optional[int] = None + ): + self.delivery_count = delivery_count + self.time_to_live = time_to_live + self.first_acquirer = first_acquirer + self.durable = durable + self.priority = priority class AmqpMessageProperties(DictMixin): @@ -568,17 +586,33 @@ class AmqpMessageProperties(DictMixin): to this message to a specific group. :vartype reply_to_group_id: Optional[Union[str, bytes]] """ - def __init__(self, **kwargs): - self.message_id = kwargs.get("message_id") - self.user_id = kwargs.get("user_id") - self.to = kwargs.get("to") - self.subject = kwargs.get("subject") - self.reply_to = kwargs.get("reply_to") - self.correlation_id = kwargs.get("correlation_id") - self.content_type = kwargs.get("content_type") - self.content_encoding = kwargs.get("content_encoding") - self.creation_time = kwargs.get("creation_time") - self.absolute_expiry_time = kwargs.get("absolute_expiry_time") - self.group_id = kwargs.get("group_id") - self.group_sequence = kwargs.get("group_sequence") - self.reply_to_group_id = kwargs.get("reply_to_group_id") + def __init__( + self, + *, + message_id: Optional[Union[str, bytes, uuid.UUID]] = None, + user_id: Optional[Union[str, bytes]] = None, + to: Optional[Union[str, bytes]] = None, + subject: Optional[Union[str, bytes]] = None, + reply_to: Optional[Union[str, bytes]] = None, + correlation_id: Optional[Union[str, bytes]] = None, + content_type: Optional[Union[str, bytes]] = None, + content_encoding: Optional[Union[str, bytes]] = None, + creation_time: Optional[int] = None, + absolute_expiry_time: Optional[int] = None, + group_id: Optional[Union[str, bytes]] = None, + group_sequence: Optional[int] = None, + reply_to_group_id: Optional[Union[str, bytes]] = None + ): + self.message_id = message_id + self.user_id = user_id + self.to = to + self.subject = subject + self.reply_to = reply_to + self.correlation_id = correlation_id + self.content_type = content_type + self.content_encoding = content_encoding + self.creation_time = creation_time + self.absolute_expiry_time = absolute_expiry_time + self.group_id = group_id + self.group_sequence = group_sequence + self.reply_to_group_id = reply_to_group_id diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/management/_management_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/management/_management_client.py index f7545396270a..64d20f1ceab4 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/management/_management_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/management/_management_client.py @@ -6,8 +6,9 @@ # pylint:disable=specify-parameter-names-in-call # pylint:disable=too-many-lines import functools +import datetime from copy import deepcopy -from typing import TYPE_CHECKING, Dict, Any, Union, cast, Mapping +from typing import Any, Union, cast, Mapping, Optional, List, TYPE_CHECKING from xml.etree.ElementTree import ElementTree from azure.core.paging import ItemPaged @@ -73,6 +74,7 @@ from . import _constants as constants from ._api_version import DEFAULT_VERSION from ._models import ( + AuthorizationRule, QueueRuntimeProperties, QueueProperties, TopicProperties, @@ -82,13 +84,14 @@ RuleProperties, NamespaceProperties, TrueRuleFilter, + CorrelationRuleFilter, + SqlRuleFilter, + SqlRuleAction, ) from ._handle_response_error import _handle_response_error if TYPE_CHECKING: - from azure.core.credentials import ( - TokenCredential, - ) # pylint:disable=ungrouped-imports + from azure.core.credentials import TokenCredential class ServiceBusAdministrationClient: # pylint:disable=too-many-public-methods @@ -102,13 +105,21 @@ class ServiceBusAdministrationClient: # pylint:disable=too-many-public-methods in reduced feature compatibility. """ - def __init__(self, fully_qualified_namespace, credential, **kwargs): - # type: (str, TokenCredential, Dict[str, Any]) -> None + def __init__( + self, + fully_qualified_namespace: str, + credential: "TokenCredential", + *, + api_version: str = DEFAULT_VERSION, + **kwargs: Any + ) -> None: self.fully_qualified_namespace = fully_qualified_namespace - self._api_version = kwargs.pop("api_version", DEFAULT_VERSION) + self._api_version = api_version self._credential = credential self._endpoint = "https://" + fully_qualified_namespace - self._config = ServiceBusManagementClientConfiguration(self._endpoint, **kwargs) + self._config = ServiceBusManagementClientConfiguration( + self._endpoint, api_version=api_version, **kwargs + ) self._pipeline = self._build_pipeline() self._impl = ServiceBusManagementClientImpl( endpoint=fully_qualified_namespace, pipeline=self._pipeline @@ -228,8 +239,9 @@ def _populate_header_within_kwargs(uri, header): ) @classmethod - def from_connection_string(cls, conn_str, **kwargs): - # type: (str, Any) -> ServiceBusAdministrationClient + def from_connection_string( + cls, conn_str: str, *, api_version: str = DEFAULT_VERSION, **kwargs: Any + ) -> "ServiceBusAdministrationClient": """Create a client from connection string. :param str conn_str: The connection string of the Service Bus Namespace. @@ -252,10 +264,9 @@ def from_connection_string(cls, conn_str, **kwargs): credential = ServiceBusSharedKeyCredential(shared_access_key_name, shared_access_key) # type: ignore if "//" in endpoint: endpoint = endpoint[endpoint.index("//") + 2 :] - return cls(endpoint, credential, **kwargs) + return cls(endpoint, credential, api_version=api_version, **kwargs) - def get_queue(self, queue_name, **kwargs): - # type: (str, Any) -> QueueProperties + def get_queue(self, queue_name: str, **kwargs: Any) -> QueueProperties: """Get the properties of a queue. :param str queue_name: The name of the queue. @@ -286,8 +297,31 @@ def get_queue_runtime_properties(self, queue_name, **kwargs): ) return runtime_properties - def create_queue(self, queue_name, **kwargs): - # type: (str, Any) -> QueueProperties + def create_queue( # pylint: disable=too-many-locals + self, + queue_name: str, + *, + authorization_rules: Optional[List[AuthorizationRule]] = None, + auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None, + dead_lettering_on_message_expiration: Optional[bool] = None, + default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None, + duplicate_detection_history_time_window: Optional[ + Union[datetime.timedelta, str] + ] = None, + enable_batched_operations: Optional[bool] = None, + enable_express: Optional[bool] = None, + enable_partitioning: Optional[bool] = None, + lock_duration: Optional[Union[datetime.timedelta, str]] = None, + max_delivery_count: Optional[int] = None, + max_size_in_megabytes: Optional[int] = None, + requires_duplicate_detection: Optional[bool] = None, + requires_session: Optional[bool] = None, + forward_to: Optional[str] = None, + user_metadata: Optional[str] = None, + forward_dead_lettered_messages_to: Optional[str] = None, + max_message_size_in_kilobytes: Optional[int] = None, + **kwargs: Any + ) -> QueueProperties: """Create a queue. :param queue_name: Name of the queue. @@ -354,43 +388,35 @@ def create_queue(self, queue_name, **kwargs): :rtype: ~azure.servicebus.management.QueueProperties """ forward_to = _normalize_entity_path_to_full_path_if_needed( - kwargs.pop("forward_to", None), self.fully_qualified_namespace + forward_to, self.fully_qualified_namespace ) forward_dead_lettered_messages_to = ( _normalize_entity_path_to_full_path_if_needed( - kwargs.pop("forward_dead_lettered_messages_to", None), + forward_dead_lettered_messages_to, self.fully_qualified_namespace, ) ) queue = QueueProperties( queue_name, - authorization_rules=kwargs.pop("authorization_rules", None), - auto_delete_on_idle=kwargs.pop("auto_delete_on_idle", None), - dead_lettering_on_message_expiration=kwargs.pop( - "dead_lettering_on_message_expiration", None - ), - default_message_time_to_live=kwargs.pop( - "default_message_time_to_live", None - ), - duplicate_detection_history_time_window=kwargs.pop( - "duplicate_detection_history_time_window", None - ), + authorization_rules=authorization_rules, + auto_delete_on_idle=auto_delete_on_idle, + dead_lettering_on_message_expiration=dead_lettering_on_message_expiration, + default_message_time_to_live=default_message_time_to_live, + duplicate_detection_history_time_window=duplicate_detection_history_time_window, availability_status=None, - enable_batched_operations=kwargs.pop("enable_batched_operations", None), - enable_express=kwargs.pop("enable_express", None), - enable_partitioning=kwargs.pop("enable_partitioning", None), - lock_duration=kwargs.pop("lock_duration", None), - max_delivery_count=kwargs.pop("max_delivery_count", None), - max_size_in_megabytes=kwargs.pop("max_size_in_megabytes", None), - requires_duplicate_detection=kwargs.pop( - "requires_duplicate_detection", None - ), - requires_session=kwargs.pop("requires_session", None), + enable_batched_operations=enable_batched_operations, + enable_express=enable_express, + enable_partitioning=enable_partitioning, + lock_duration=lock_duration, + max_delivery_count=max_delivery_count, + max_size_in_megabytes=max_size_in_megabytes, + requires_duplicate_detection=requires_duplicate_detection, + requires_session=requires_session, status=kwargs.pop("status", None), forward_to=forward_to, forward_dead_lettered_messages_to=forward_dead_lettered_messages_to, - user_metadata=kwargs.pop("user_metadata", None), - max_message_size_in_kilobytes=kwargs.pop("max_message_size_in_kilobytes", None) + user_metadata=user_metadata, + max_message_size_in_kilobytes=max_message_size_in_kilobytes, ) to_create = queue._to_internal_entity(self.fully_qualified_namespace) create_entity_body = CreateQueueBody( @@ -552,8 +578,30 @@ def get_topic_runtime_properties(self, topic_name, **kwargs): ) return topic_description - def create_topic(self, topic_name, **kwargs): - # type: (str, Any) -> TopicProperties + def create_topic( + self, + topic_name: str, + *, + default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None, + max_size_in_megabytes: Optional[ + int + ] = None, + requires_duplicate_detection: Optional[bool] = None, + duplicate_detection_history_time_window: Optional[ + Union[datetime.timedelta, str] + ] = None, + enable_batched_operations: Optional[bool] = None, + size_in_bytes: Optional[int] = None, + filtering_messages_before_publishing: Optional[bool] = None, + authorization_rules: Optional[List[AuthorizationRule]] = None, + support_ordering: Optional[bool] = None, + auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None, + enable_partitioning: Optional[bool] = None, + enable_express: Optional[bool] = None, + user_metadata: Optional[str] = None, + max_message_size_in_kilobytes: Optional[int] = None, + **kwargs: Any + ) -> TopicProperties: """Create a topic. :param topic_name: Name of the topic. @@ -565,7 +613,7 @@ def create_topic(self, topic_name, **kwargs): :paramtype default_message_time_to_live: Union[~datetime.timedelta, str] :keyword max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of memory allocated for the topic. - :paramtype max_size_in_megabytes: long + :paramtype max_size_in_megabytes: int :keyword requires_duplicate_detection: A value indicating if this topic requires duplicate detection. :paramtype requires_duplicate_detection: bool @@ -607,27 +655,24 @@ def create_topic(self, topic_name, **kwargs): """ topic = TopicProperties( topic_name, - default_message_time_to_live=kwargs.pop( - "default_message_time_to_live", None - ), - max_size_in_megabytes=kwargs.pop("max_size_in_megabytes", None), - requires_duplicate_detection=kwargs.pop( - "requires_duplicate_detection", None - ), - duplicate_detection_history_time_window=kwargs.pop( - "duplicate_detection_history_time_window", None - ), - enable_batched_operations=kwargs.pop("enable_batched_operations", None), - size_in_bytes=kwargs.pop("size_in_bytes", None), - authorization_rules=kwargs.pop("authorization_rules", None), + default_message_time_to_live=default_message_time_to_live, + max_size_in_megabytes=max_size_in_megabytes, + requires_duplicate_detection=requires_duplicate_detection, + # TODO: ask why default of 10 mins isn't followed below, + duplicate_detection_history_time_window=duplicate_detection_history_time_window, + enable_batched_operations=enable_batched_operations, + size_in_bytes=size_in_bytes, + authorization_rules=authorization_rules, + filtering_messages_before_publishing=filtering_messages_before_publishing, status=kwargs.pop("status", None), - support_ordering=kwargs.pop("support_ordering", None), - auto_delete_on_idle=kwargs.pop("auto_delete_on_idle", None), - enable_partitioning=kwargs.pop("enable_partitioning", None), + support_ordering=support_ordering, + auto_delete_on_idle=auto_delete_on_idle, + enable_partitioning=enable_partitioning, availability_status=None, - enable_express=kwargs.pop("enable_express", None), - user_metadata=kwargs.pop("user_metadata", None), - max_message_size_in_kilobytes=kwargs.pop("max_message_size_in_kilobytes", None) + enable_express=enable_express, + user_metadata=user_metadata, + max_message_size_in_kilobytes=max_message_size_in_kilobytes, + **kwargs ) to_create = topic._to_internal_entity() @@ -799,8 +844,24 @@ def get_subscription_runtime_properties( ) return subscription - def create_subscription(self, topic_name, subscription_name, **kwargs): - # type: (str, str, Any) -> SubscriptionProperties + def create_subscription( + self, + topic_name: str, + subscription_name: str, + *, + lock_duration: Optional[Union[datetime.timedelta, str]] = None, + requires_session: Optional[bool] = None, + default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None, + dead_lettering_on_message_expiration: Optional[bool] = None, + dead_lettering_on_filter_evaluation_exceptions: Optional[bool] = None, + max_delivery_count: Optional[int] = None, + enable_batched_operations: Optional[bool] = None, + forward_to: Optional[str] = None, + user_metadata: Optional[str] = None, + forward_dead_lettered_messages_to: Optional[str] = None, + auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None, + **kwargs: Any + ) -> SubscriptionProperties: """Create a topic subscription. :param str topic_name: The topic that will own the @@ -849,35 +910,29 @@ def create_subscription(self, topic_name, subscription_name, **kwargs): """ _validate_entity_name_type(topic_name, display_name="topic_name") forward_to = _normalize_entity_path_to_full_path_if_needed( - kwargs.pop("forward_to", None), self.fully_qualified_namespace + forward_to, self.fully_qualified_namespace ) forward_dead_lettered_messages_to = ( _normalize_entity_path_to_full_path_if_needed( - kwargs.pop("forward_dead_lettered_messages_to", None), + forward_dead_lettered_messages_to, self.fully_qualified_namespace, ) ) subscription = SubscriptionProperties( subscription_name, - lock_duration=kwargs.pop("lock_duration", None), - requires_session=kwargs.pop("requires_session", None), - default_message_time_to_live=kwargs.pop( - "default_message_time_to_live", None - ), - dead_lettering_on_message_expiration=kwargs.pop( - "dead_lettering_on_message_expiration", None - ), - dead_lettering_on_filter_evaluation_exceptions=kwargs.pop( - "dead_lettering_on_filter_evaluation_exceptions", None - ), - max_delivery_count=kwargs.pop("max_delivery_count", None), - enable_batched_operations=kwargs.pop("enable_batched_operations", None), + lock_duration=lock_duration, + requires_session=requires_session, + default_message_time_to_live=default_message_time_to_live, + dead_lettering_on_message_expiration=dead_lettering_on_message_expiration, + dead_lettering_on_filter_evaluation_exceptions=dead_lettering_on_filter_evaluation_exceptions, + max_delivery_count=max_delivery_count, + enable_batched_operations=enable_batched_operations, status=kwargs.pop("status", None), forward_to=forward_to, - user_metadata=kwargs.pop("user_metadata", None), + user_metadata=user_metadata, forward_dead_lettered_messages_to=forward_dead_lettered_messages_to, - auto_delete_on_idle=kwargs.pop("auto_delete_on_idle", None), + auto_delete_on_idle=auto_delete_on_idle, availability_status=None, ) to_create = subscription._to_internal_entity(self.fully_qualified_namespace) # type: ignore # pylint:disable=protected-access @@ -928,7 +983,9 @@ def update_subscription(self, topic_name, subscription, **kwargs): subscription = deepcopy( create_properties_from_dict_if_needed(subscription, SubscriptionProperties) # type: ignore ) - to_update = subscription._to_internal_entity(self.fully_qualified_namespace, kwargs) + to_update = subscription._to_internal_entity( + self.fully_qualified_namespace, kwargs + ) create_entity_body = CreateSubscriptionBody( content=CreateSubscriptionBodyContent( @@ -1044,8 +1101,18 @@ def get_rule(self, topic_name, subscription_name, rule_name, **kwargs): ) # to remove after #3535 is released. return rule_description - def create_rule(self, topic_name, subscription_name, rule_name, **kwargs): - # type: (str, str, str, Any) -> RuleProperties + def create_rule( + self, + topic_name: str, + subscription_name: str, + rule_name: str, + *, + filter: Union[ # pylint: disable=redefined-builtin + CorrelationRuleFilter, SqlRuleFilter + ]=TrueRuleFilter(), + action: Optional[SqlRuleAction] = None, + **kwargs: Any + ) -> RuleProperties: """Create a rule for a topic subscription. :param str topic_name: The topic that will own the @@ -1065,8 +1132,8 @@ def create_rule(self, topic_name, subscription_name, rule_name, **kwargs): rule = RuleProperties( rule_name, - filter=kwargs.pop("filter", TrueRuleFilter()), - action=kwargs.pop("action", None), + filter=filter, + action=action, created_at_utc=None, ) to_create = rule._to_internal_entity() diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/management/_utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/management/_utils.py index fbf5e8fbd755..983465978ef8 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/management/_utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/management/_utils.py @@ -3,7 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from datetime import datetime, timedelta -from typing import TYPE_CHECKING, cast, Union, Mapping, Type, Any +from typing import TYPE_CHECKING, cast, Union, Mapping, Type, Any, Optional from xml.etree.ElementTree import ElementTree, SubElement, QName import isodate import six @@ -335,7 +335,7 @@ def _validate_topic_subscription_and_rule_types( def _normalize_entity_path_to_full_path_if_needed( entity_path, fully_qualified_namespace ): - # type: (str, str) -> str + # type: (Optional[str], str) -> Optional[str] if not entity_path: return entity_path parsed = urlparse.urlparse(entity_path)