From 4b7af13ecb1f646518e047205a9ba36991b46bcc Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Wed, 14 Oct 2020 14:09:07 -0700 Subject: [PATCH] update async type hint to use the native py3 pattern --- .../aio/_async_auto_lock_renewer.py | 10 ++-- .../azure/servicebus/aio/_async_message.py | 8 +-- .../servicebus/aio/_base_handler_async.py | 14 ++--- .../aio/_servicebus_client_async.py | 56 ++++++++++--------- .../aio/_servicebus_receiver_async.py | 21 +++---- .../aio/_servicebus_sender_async.py | 17 +++--- .../aio/_servicebus_session_async.py | 9 +-- .../aio/_servicebus_session_receiver_async.py | 3 +- 8 files changed, 69 insertions(+), 69 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py index 8827dd375f85..c8cba8e94652 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py @@ -113,10 +113,12 @@ async def _auto_lock_renew(self, if on_lock_renew_failure and not clean_shutdown: await on_lock_renew_failure(renewable, error) - def register(self, - renewable: Union[ReceivedMessage, ServiceBusSession], - timeout: float = 300, - on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None) -> None: + def register( + self, + renewable: Union[ReceivedMessage, ServiceBusSession], + timeout: float = 300, + on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None + ) -> None: """Register a renewable entity for automatic lock renewal. :param renewable: A locked entity that needs to be renewed. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py index afaa53275277..c56d0711f803 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py @@ -75,8 +75,8 @@ async def complete(self) -> None: # type: ignore self._settled = True async def dead_letter( # type: ignore - self, reason: Optional[str] = None, - error_description: Optional[str] = None + self, reason: Optional[str] = None, + error_description: Optional[str] = None ) -> None: # pylint: disable=unused-argument """Move the message to the Dead Letter queue. @@ -84,8 +84,8 @@ async def dead_letter( # type: ignore used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue. - :param str reason: The reason for dead-lettering the message. - :param str error_description: The detailed error description for dead-lettering the message. + :param Optional[str] reason: The reason for dead-lettering the message. + :param Optional[str] error_description: The detailed error description for dead-lettering the message. :rtype: None :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py index c9ac9923c81f..1bea0950562e 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py @@ -76,12 +76,11 @@ async def get_token(self, *scopes: str, **kwargs: Any) -> _AccessToken: # pylin class BaseHandler: def __init__( self, - fully_qualified_namespace, - entity_name, - credential, - **kwargs - ): - # type: (str, str, TokenCredential, Any) -> None + fully_qualified_namespace: str, + entity_name: str, + credential: TokenCredential, + **kwargs: Any + ) -> None: self.fully_qualified_namespace = fully_qualified_namespace self._entity_name = entity_name @@ -263,8 +262,7 @@ async def _close_handler(self): self._handler = None self._running = False - async def close(self): - # type: () -> None + async def close(self) -> None: """Close down the handler connection. If the handler has already closed, this operation will do nothing. An optional exception can be passed in to diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index fee1dde97a3e..76f3059794f9 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -2,7 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -from typing import Any, List, TYPE_CHECKING +from typing import Any, List, TYPE_CHECKING, Optional import logging import uamqp @@ -63,11 +63,10 @@ class ServiceBusClient(object): """ def __init__( self, - fully_qualified_namespace, - credential, - **kwargs - ): - # type: (str, TokenCredential, Any) -> None + fully_qualified_namespace: str, + credential: TokenCredential, + **kwargs: Any + ) -> None: self.fully_qualified_namespace = fully_qualified_namespace self._credential = credential self._config = Configuration(**kwargs) @@ -100,10 +99,9 @@ async def _create_uamqp_connection(self): @classmethod def from_connection_string( cls, - conn_str, - **kwargs - ): - # type: (str, Any) -> ServiceBusClient + conn_str: str, + **kwargs: Any + ) -> "ServiceBusClient": """ Create a ServiceBusClient from a connection string. @@ -145,8 +143,7 @@ def from_connection_string( **kwargs ) - async def close(self): - # type: () -> None + async def close(self) -> None: """ Close down the ServiceBus client. All spawned senders, receivers and underlying connection will be shutdown. @@ -167,8 +164,7 @@ async def close(self): if self._connection_sharing and self._connection: await self._connection.destroy_async() - def get_queue_sender(self, queue_name, **kwargs): - # type: (str, Any) -> ServiceBusSender + def get_queue_sender(self, queue_name: str, **kwargs: Any) -> ServiceBusSender: """Get ServiceBusSender for the specific queue. :param str queue_name: The path of specific Service Bus Queue the client connects to. @@ -202,8 +198,7 @@ def get_queue_sender(self, queue_name, **kwargs): self._handlers.append(handler) return handler - def get_queue_receiver(self, queue_name, **kwargs): - # type: (str, Any) -> ServiceBusReceiver + def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiver: """Get ServiceBusReceiver for the specific queue. :param str queue_name: The path of specific Service Bus Queue the client connects to. @@ -261,8 +256,7 @@ def get_queue_receiver(self, queue_name, **kwargs): self._handlers.append(handler) return handler - def get_topic_sender(self, topic_name, **kwargs): - # type: (str, Any) -> ServiceBusSender + def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender: """Get ServiceBusSender for the specific topic. :param str topic_name: The path of specific Service Bus Topic the client connects to. @@ -295,8 +289,7 @@ def get_topic_sender(self, topic_name, **kwargs): self._handlers.append(handler) return handler - def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): - # type: (str, str, Any) -> ServiceBusReceiver + def get_subscription_receiver(self, topic_name: str, subscription_name: str, **kwargs: Any) -> ServiceBusReceiver: """Get ServiceBusReceiver for the specific subscription under the topic. :param str topic_name: The name of specific Service Bus Topic the client connects to. @@ -374,14 +367,19 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): self._handlers.append(handler) return handler - def get_subscription_session_receiver(self, topic_name, subscription_name, session_id=None, **kwargs): - # type: (str, str, str, Any) -> ServiceBusSessionReceiver + def get_subscription_session_receiver( + self, + topic_name: str, + subscription_name: str, + session_id: Optional[str] = None, + **kwargs: Any + ) -> ServiceBusSessionReceiver: """Get ServiceBusReceiver for the specific subscription under the topic. :param str topic_name: The name of specific Service Bus Topic the client connects to. :param str subscription_name: The name of specific Service Bus Subscription under the given Service Bus Topic. - :param str session_id: A specific session from which to receive. This must be specified for a + :param Optional[str] session_id: A specific session from which to receive. This must be specified for a sessionful entity, otherwise it must be None. In order to receive messages from the next available session, set this to None. The default is None. :keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options @@ -432,14 +430,18 @@ def get_subscription_session_receiver(self, topic_name, subscription_name, sessi self._handlers.append(handler) return handler - def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs): - # type: (str, str, Any) -> ServiceBusSessionReceiver + def get_queue_session_receiver( + self, + queue_name: str, + session_id: Optional[str] = None, + **kwargs: Any + ) -> ServiceBusSessionReceiver: """Get ServiceBusSessionReceiver for the specific queue. :param str queue_name: The path of specific Service Bus Queue the client connects to. - :param str session_id: A specific session from which to receive. This must be specified for a + :param Optional[str] session_id: A specific session from which to receive. This must be specified for a sessionful entity, otherwise it must be None. In order to receive messages from the next available - session, set this to None. The default is None. + session, set this to None. The default is None. :keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index 7d9bdf557070..ae1ba3fefb04 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -278,16 +278,15 @@ async def _renew_locks(self, *lock_tokens, timeout=None): timeout=timeout ) - async def close(self): - # type: () -> None + async def close(self) -> None: await super(ServiceBusReceiver, self).close() self._message_iter = None - def get_streaming_message_iter(self, max_wait_time: float = None) -> AsyncIterator[ReceivedMessage]: + def get_streaming_message_iter(self, max_wait_time: Optional[float] = None) -> AsyncIterator[ReceivedMessage]: """Receive messages from an iterator indefinitely, or if a max_wait_time is specified, until such a timeout occurs. - :param float max_wait_time: Maximum time to wait in seconds for the next message to arrive. + :param Optional[float] max_wait_time: Maximum time to wait in seconds for the next message to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, and no messages arrive for the timeout period, the iterator will stop. @@ -369,8 +368,11 @@ def from_connection_string( raise ValueError("Subscription name is missing for the topic. Please specify subscription_name.") return cls(**constructor_args) - async def receive_messages(self, max_message_count=None, max_wait_time=None): - # type: (int, float) -> List[ReceivedMessage] + async def receive_messages( + self, + max_message_count: Optional[int] = None, + max_wait_time: Optional[float] = None + ) -> List[ReceivedMessage]: """Receive a batch of messages at once. This approach is optimal if you wish to process multiple messages simultaneously, or @@ -384,9 +386,9 @@ async def receive_messages(self, max_message_count=None, max_wait_time=None): return as soon as at least one message is received and there is a gap in incoming messages regardless of the specified batch size. - :param int max_message_count: Maximum number of messages in the batch. Actual number + :param Optional[int] max_message_count: Maximum number of messages in the batch. Actual number returned will depend on prefetch_count size and incoming stream rate. - :param float max_wait_time: Maximum time to wait in seconds for the first message to arrive. + :param Optional[float] max_wait_time: Maximum time to wait in seconds for the first message to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, and no messages arrive within the timeout period, an empty list will be returned. @@ -468,8 +470,7 @@ async def receive_deferred_messages( ) return messages - async def peek_messages(self, max_message_count=1, **kwargs): - # type: (int, Optional[float]) -> List[PeekedMessage] + async def peek_messages(self, max_message_count: int = 1, **kwargs: Any) -> List[PeekedMessage]: """Browse messages currently pending in the queue. Peeked messages are not removed from queue, nor are they locked. They cannot be completed, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 483de912fcac..e33076478105 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -138,8 +138,12 @@ async def _send(self, message, timeout=None, last_exception=None): finally: # reset the timeout of the handler back to the default value self._set_msg_timeout(default_timeout, None) - async def schedule_messages(self, messages, schedule_time_utc, **kwargs): - # type: (Union[Message, List[Message]], datetime.datetime, Any) -> List[int] + async def schedule_messages( + self, + messages: Union[Message, List[Message]], + schedule_time_utc: datetime.datetime, + **kwargs: Any + ) -> List[int]: """Send Message or multiple Messages to be enqueued at a specific time by the service. Returns a list of the sequence numbers of the enqueued messages. :param messages: The message or list of messages to schedule. @@ -175,8 +179,7 @@ async def schedule_messages(self, messages, schedule_time_utc, **kwargs): timeout=timeout ) - async def cancel_scheduled_messages(self, sequence_numbers, **kwargs): - # type: (Union[int, List[int]], Any) -> None + async def cancel_scheduled_messages(self, sequence_numbers: Union[int, List[int]], **kwargs: Any) -> None: """ Cancel one or more messages that have previously been scheduled and are still pending. @@ -253,8 +256,7 @@ def from_connection_string( ) return cls(**constructor_args) - async def send_messages(self, message, **kwargs): - # type: (Union[Message, BatchMessage, List[Message]], Any) -> None + async def send_messages(self, message: Union[Message, BatchMessage, List[Message]], **kwargs: Any) -> None: """Sends message and blocks until acknowledgement is received or operation times out. If a list of messages was provided, attempts to send them as a single batch, throwing a @@ -307,8 +309,7 @@ async def send_messages(self, message, **kwargs): require_last_exception=True ) - async def create_batch(self, max_size_in_bytes=None): - # type: (int) -> BatchMessage + async def create_batch(self, max_size_in_bytes: int = None) -> BatchMessage: """Create a BatchMessage object with the max size of all content being constrained by max_size_in_bytes. The max_size should be no greater than the max allowed message size defined by the service. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py index deec3792fa3a..7bdb1da53082 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py @@ -40,8 +40,7 @@ class ServiceBusSession(BaseSession): :caption: Get session from a receiver """ - async def get_state(self, **kwargs): - # type: (Any) -> str + async def get_state(self, **kwargs: Any) -> str: """Get the session state. Returns None if no state has been set. @@ -74,8 +73,7 @@ async def get_state(self, **kwargs): session_state = session_state.decode('UTF-8') return session_state - async def set_state(self, state, **kwargs): - # type: (Union[str, bytes, bytearray], Any) -> None + async def set_state(self, state: Union[str, bytes, bytearray], **kwargs: Any) -> None: """Set the session state. :param state: The state value. @@ -105,8 +103,7 @@ async def set_state(self, state, **kwargs): timeout=timeout ) - async def renew_lock(self, **kwargs): - # type: (Any) -> datetime.datetime + async def renew_lock(self, **kwargs: Any) -> datetime.datetime: """Renew the session lock. This operation must be performed periodically in order to retain a lock on the diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py index 8685fcc8664e..e63024d22164 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py @@ -149,8 +149,7 @@ def from_connection_string( return super(ServiceBusSessionReceiver, cls).from_connection_string(conn_str, **kwargs) # type: ignore @property - def session(self): - # type: ()->ServiceBusSession + def session(self) -> ServiceBusSession: """ Get the ServiceBusSession object linked with the receiver. Session is only available to session-enabled entities.