Skip to content

Commit

Permalink
update async type hint to use the native py3 pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
yunhaoling committed Oct 14, 2020
1 parent edd7eee commit 4b7af13
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ async def _auto_lock_renew(self,
if on_lock_renew_failure and not clean_shutdown:
await on_lock_renew_failure(renewable, error)

def register(self,
renewable: Union[ReceivedMessage, ServiceBusSession],
timeout: float = 300,
on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None) -> None:
def register(
self,
renewable: Union[ReceivedMessage, ServiceBusSession],
timeout: float = 300,
on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None
) -> None:
"""Register a renewable entity for automatic lock renewal.
:param renewable: A locked entity that needs to be renewed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ async def complete(self) -> None: # type: ignore
self._settled = True

async def dead_letter( # type: ignore
self, reason: Optional[str] = None,
error_description: Optional[str] = None
self, reason: Optional[str] = None,
error_description: Optional[str] = None
) -> None: # pylint: disable=unused-argument
"""Move the message to the Dead Letter queue.
The Dead Letter queue is a sub-queue that can be
used to store messages that failed to process correctly, or otherwise require further inspection
or processing. The queue can also be configured to send expired messages to the Dead Letter queue.
:param str reason: The reason for dead-lettering the message.
:param str error_description: The detailed error description for dead-lettering the message.
:param Optional[str] reason: The reason for dead-lettering the message.
:param Optional[str] error_description: The detailed error description for dead-lettering the message.
:rtype: None
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ async def get_token(self, *scopes: str, **kwargs: Any) -> _AccessToken: # pylin
class BaseHandler:
def __init__(
self,
fully_qualified_namespace,
entity_name,
credential,
**kwargs
):
# type: (str, str, TokenCredential, Any) -> None
fully_qualified_namespace: str,
entity_name: str,
credential: TokenCredential,
**kwargs: Any
) -> None:
self.fully_qualified_namespace = fully_qualified_namespace
self._entity_name = entity_name

Expand Down Expand Up @@ -263,8 +262,7 @@ async def _close_handler(self):
self._handler = None
self._running = False

async def close(self):
# type: () -> None
async def close(self) -> None:
"""Close down the handler connection.
If the handler has already closed, this operation will do nothing. An optional exception can be passed in to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Any, List, TYPE_CHECKING
from typing import Any, List, TYPE_CHECKING, Optional
import logging

import uamqp
Expand Down Expand Up @@ -63,11 +63,10 @@ class ServiceBusClient(object):
"""
def __init__(
self,
fully_qualified_namespace,
credential,
**kwargs
):
# type: (str, TokenCredential, Any) -> None
fully_qualified_namespace: str,
credential: TokenCredential,
**kwargs: Any
) -> None:
self.fully_qualified_namespace = fully_qualified_namespace
self._credential = credential
self._config = Configuration(**kwargs)
Expand Down Expand Up @@ -100,10 +99,9 @@ async def _create_uamqp_connection(self):
@classmethod
def from_connection_string(
cls,
conn_str,
**kwargs
):
# type: (str, Any) -> ServiceBusClient
conn_str: str,
**kwargs: Any
) -> "ServiceBusClient":
"""
Create a ServiceBusClient from a connection string.
Expand Down Expand Up @@ -145,8 +143,7 @@ def from_connection_string(
**kwargs
)

async def close(self):
# type: () -> None
async def close(self) -> None:
"""
Close down the ServiceBus client.
All spawned senders, receivers and underlying connection will be shutdown.
Expand All @@ -167,8 +164,7 @@ async def close(self):
if self._connection_sharing and self._connection:
await self._connection.destroy_async()

def get_queue_sender(self, queue_name, **kwargs):
# type: (str, Any) -> ServiceBusSender
def get_queue_sender(self, queue_name: str, **kwargs: Any) -> ServiceBusSender:
"""Get ServiceBusSender for the specific queue.
:param str queue_name: The path of specific Service Bus Queue the client connects to.
Expand Down Expand Up @@ -202,8 +198,7 @@ def get_queue_sender(self, queue_name, **kwargs):
self._handlers.append(handler)
return handler

def get_queue_receiver(self, queue_name, **kwargs):
# type: (str, Any) -> ServiceBusReceiver
def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiver:
"""Get ServiceBusReceiver for the specific queue.
:param str queue_name: The path of specific Service Bus Queue the client connects to.
Expand Down Expand Up @@ -261,8 +256,7 @@ def get_queue_receiver(self, queue_name, **kwargs):
self._handlers.append(handler)
return handler

def get_topic_sender(self, topic_name, **kwargs):
# type: (str, Any) -> ServiceBusSender
def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender:
"""Get ServiceBusSender for the specific topic.
:param str topic_name: The path of specific Service Bus Topic the client connects to.
Expand Down Expand Up @@ -295,8 +289,7 @@ def get_topic_sender(self, topic_name, **kwargs):
self._handlers.append(handler)
return handler

def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
# type: (str, str, Any) -> ServiceBusReceiver
def get_subscription_receiver(self, topic_name: str, subscription_name: str, **kwargs: Any) -> ServiceBusReceiver:
"""Get ServiceBusReceiver for the specific subscription under the topic.
:param str topic_name: The name of specific Service Bus Topic the client connects to.
Expand Down Expand Up @@ -374,14 +367,19 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
self._handlers.append(handler)
return handler

def get_subscription_session_receiver(self, topic_name, subscription_name, session_id=None, **kwargs):
# type: (str, str, str, Any) -> ServiceBusSessionReceiver
def get_subscription_session_receiver(
self,
topic_name: str,
subscription_name: str,
session_id: Optional[str] = None,
**kwargs: Any
) -> ServiceBusSessionReceiver:
"""Get ServiceBusReceiver for the specific subscription under the topic.
:param str topic_name: The name of specific Service Bus Topic the client connects to.
:param str subscription_name: The name of specific Service Bus Subscription
under the given Service Bus Topic.
:param str session_id: A specific session from which to receive. This must be specified for a
:param Optional[str] session_id: A specific session from which to receive. This must be specified for a
sessionful entity, otherwise it must be None. In order to receive messages from the next available
session, set this to None. The default is None.
:keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options
Expand Down Expand Up @@ -432,14 +430,18 @@ def get_subscription_session_receiver(self, topic_name, subscription_name, sessi
self._handlers.append(handler)
return handler

def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs):
# type: (str, str, Any) -> ServiceBusSessionReceiver
def get_queue_session_receiver(
self,
queue_name: str,
session_id: Optional[str] = None,
**kwargs: Any
) -> ServiceBusSessionReceiver:
"""Get ServiceBusSessionReceiver for the specific queue.
:param str queue_name: The path of specific Service Bus Queue the client connects to.
:param str session_id: A specific session from which to receive. This must be specified for a
:param Optional[str] session_id: A specific session from which to receive. This must be specified for a
sessionful entity, otherwise it must be None. In order to receive messages from the next available
session, set this to None. The default is None.
session, set this to None. The default is None.
:keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options
are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given
lock period before they will be removed from the queue. Messages received with ReceiveAndDelete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,15 @@ async def _renew_locks(self, *lock_tokens, timeout=None):
timeout=timeout
)

async def close(self):
# type: () -> None
async def close(self) -> None:
await super(ServiceBusReceiver, self).close()
self._message_iter = None

def get_streaming_message_iter(self, max_wait_time: float = None) -> AsyncIterator[ReceivedMessage]:
def get_streaming_message_iter(self, max_wait_time: Optional[float] = None) -> AsyncIterator[ReceivedMessage]:
"""Receive messages from an iterator indefinitely, or if a max_wait_time is specified, until
such a timeout occurs.
:param float max_wait_time: Maximum time to wait in seconds for the next message to arrive.
:param Optional[float] max_wait_time: Maximum time to wait in seconds for the next message to arrive.
If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. If specified, and no messages arrive for the
timeout period, the iterator will stop.
Expand Down Expand Up @@ -369,8 +368,11 @@ def from_connection_string(
raise ValueError("Subscription name is missing for the topic. Please specify subscription_name.")
return cls(**constructor_args)

async def receive_messages(self, max_message_count=None, max_wait_time=None):
# type: (int, float) -> List[ReceivedMessage]
async def receive_messages(
self,
max_message_count: Optional[int] = None,
max_wait_time: Optional[float] = None
) -> List[ReceivedMessage]:
"""Receive a batch of messages at once.
This approach is optimal if you wish to process multiple messages simultaneously, or
Expand All @@ -384,9 +386,9 @@ async def receive_messages(self, max_message_count=None, max_wait_time=None):
return as soon as at least one message is received and there is a gap in incoming messages regardless
of the specified batch size.
:param int max_message_count: Maximum number of messages in the batch. Actual number
:param Optional[int] max_message_count: Maximum number of messages in the batch. Actual number
returned will depend on prefetch_count size and incoming stream rate.
:param float max_wait_time: Maximum time to wait in seconds for the first message to arrive.
:param Optional[float] max_wait_time: Maximum time to wait in seconds for the first message to arrive.
If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. If specified, and no messages arrive within the
timeout period, an empty list will be returned.
Expand Down Expand Up @@ -468,8 +470,7 @@ async def receive_deferred_messages(
)
return messages

async def peek_messages(self, max_message_count=1, **kwargs):
# type: (int, Optional[float]) -> List[PeekedMessage]
async def peek_messages(self, max_message_count: int = 1, **kwargs: Any) -> List[PeekedMessage]:
"""Browse messages currently pending in the queue.
Peeked messages are not removed from queue, nor are they locked. They cannot be completed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,12 @@ async def _send(self, message, timeout=None, last_exception=None):
finally: # reset the timeout of the handler back to the default value
self._set_msg_timeout(default_timeout, None)

async def schedule_messages(self, messages, schedule_time_utc, **kwargs):
# type: (Union[Message, List[Message]], datetime.datetime, Any) -> List[int]
async def schedule_messages(
self,
messages: Union[Message, List[Message]],
schedule_time_utc: datetime.datetime,
**kwargs: Any
) -> List[int]:
"""Send Message or multiple Messages to be enqueued at a specific time by the service.
Returns a list of the sequence numbers of the enqueued messages.
:param messages: The message or list of messages to schedule.
Expand Down Expand Up @@ -175,8 +179,7 @@ async def schedule_messages(self, messages, schedule_time_utc, **kwargs):
timeout=timeout
)

async def cancel_scheduled_messages(self, sequence_numbers, **kwargs):
# type: (Union[int, List[int]], Any) -> None
async def cancel_scheduled_messages(self, sequence_numbers: Union[int, List[int]], **kwargs: Any) -> None:
"""
Cancel one or more messages that have previously been scheduled and are still pending.
Expand Down Expand Up @@ -253,8 +256,7 @@ def from_connection_string(
)
return cls(**constructor_args)

async def send_messages(self, message, **kwargs):
# type: (Union[Message, BatchMessage, List[Message]], Any) -> None
async def send_messages(self, message: Union[Message, BatchMessage, List[Message]], **kwargs: Any) -> None:
"""Sends message and blocks until acknowledgement is received or operation times out.
If a list of messages was provided, attempts to send them as a single batch, throwing a
Expand Down Expand Up @@ -307,8 +309,7 @@ async def send_messages(self, message, **kwargs):
require_last_exception=True
)

async def create_batch(self, max_size_in_bytes=None):
# type: (int) -> BatchMessage
async def create_batch(self, max_size_in_bytes: int = None) -> BatchMessage:
"""Create a BatchMessage object with the max size of all content being constrained by max_size_in_bytes.
The max_size should be no greater than the max allowed message size defined by the service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ class ServiceBusSession(BaseSession):
:caption: Get session from a receiver
"""

async def get_state(self, **kwargs):
# type: (Any) -> str
async def get_state(self, **kwargs: Any) -> str:
"""Get the session state.
Returns None if no state has been set.
Expand Down Expand Up @@ -74,8 +73,7 @@ async def get_state(self, **kwargs):
session_state = session_state.decode('UTF-8')
return session_state

async def set_state(self, state, **kwargs):
# type: (Union[str, bytes, bytearray], Any) -> None
async def set_state(self, state: Union[str, bytes, bytearray], **kwargs: Any) -> None:
"""Set the session state.
:param state: The state value.
Expand Down Expand Up @@ -105,8 +103,7 @@ async def set_state(self, state, **kwargs):
timeout=timeout
)

async def renew_lock(self, **kwargs):
# type: (Any) -> datetime.datetime
async def renew_lock(self, **kwargs: Any) -> datetime.datetime:
"""Renew the session lock.
This operation must be performed periodically in order to retain a lock on the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ def from_connection_string(
return super(ServiceBusSessionReceiver, cls).from_connection_string(conn_str, **kwargs) # type: ignore

@property
def session(self):
# type: ()->ServiceBusSession
def session(self) -> ServiceBusSession:
"""
Get the ServiceBusSession object linked with the receiver. Session is only available to session-enabled
entities.
Expand Down

0 comments on commit 4b7af13

Please sign in to comment.