Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename receive() and peek() (send/schedule as well) as receive_messages() and peek_messages() respectively to align with other service bus SDKs. #12222

1 change: 1 addition & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* Fixed bug where sync AutoLockRenew does not shutdown itself timely.
* Fixed bug where async AutoLockRenew does not support context manager.
* Renamed `receive()`, `peek()` `schedule()` and `send()` to `receive_messages()`, `peek_messages()`, `schedule_messages()` and `send_messages()` to align with other service bus SDKs.
KieranBrantnerMagee marked this conversation as resolved.
Show resolved Hide resolved

## 7.0.0b3 (2020-06-08)

Expand Down
8 changes: 4 additions & 4 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_sender(queue_name) as sender:

message = Message("Single message")
sender.send(message)
sender.send_messages(message)
```

### Receive a message from a queue

To receive from a queue, you can either perform a one-off receive via "receiver.receive()" or receive persistently as follows:
To receive from a queue, you can either perform a one-off receive via "receiver.receive_messages()" or receive persistently as follows:

```Python
from azure.servicebus import ServiceBusClient
Expand Down Expand Up @@ -181,7 +181,7 @@ session_id = os.environ.get('SERVICE_BUS_SESSION_ID')

with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_topic_sender(topic_name) as sender:
sender.send(Message("Session Enabled Message", session_id=session_id))
sender.send_messages(Message("Session Enabled Message", session_id=session_id))

# If session_id is null here, will receive from the first available session.
with client.get_subscription_session_receiver(topic_name, subscription_name, session_id) as receiver:
Expand Down Expand Up @@ -231,7 +231,7 @@ be transparent to a user, but if you notice a reconnect occuring after such a du
link will extend this timeout.
- idle_timeout: Provided on creation of a receiver, the time after which the underlying UAMQP link will be closed after no traffic. This primarily dictates the length
a generator-style receive will run for before exiting if there are no messages. Passing None (default) will wait forever, up until the 10 minute threshold if no other action is taken.
- max_wait_time: Provided when calling receive() to fetch a batch of messages. Dictates how long the receive() will wait for more messages before returning, similarly up to the aformentioned limits.
- max_wait_time: Provided when calling `receive_messages()` to fetch a batch of messages. Dictates how long the `receive_messages()` will wait for more messages before returning, similarly up to the aformentioned limits.

### AutoLockRenew

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def from_connection_string(
raise ValueError("Subscription name is missing for the topic. Please specify subscription_name.")
return cls(**constructor_args)

def receive(self, max_batch_size=None, max_wait_time=None):
def receive_messages(self, max_batch_size=None, max_wait_time=None):
# type: (int, float) -> List[ReceivedMessage]
"""Receive a batch of messages at once.

Expand Down Expand Up @@ -365,7 +365,7 @@ def receive_deferred_messages(self, sequence_numbers):
m._receiver = self # pylint: disable=protected-access
return messages

def peek(self, message_count=1, sequence_number=None):
def peek_messages(self, message_count=1, sequence_number=None):
# type: (int, Optional[int]) -> List[PeekMessage]
"""Browse messages currently pending in the queue.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def _send(self, message, timeout=None, last_exception=None):
self._set_msg_timeout(timeout, last_exception)
self._handler.send_message(message.message)

def schedule(self, messages, schedule_time_utc):
def schedule_messages(self, messages, schedule_time_utc):
# type: (Union[Message, List[Message]], datetime.datetime) -> List[int]
"""Send Message or multiple Messages to be enqueued at a specific time.
Returns a list of the sequence numbers of the enqueued messages.
Expand Down Expand Up @@ -295,7 +295,7 @@ def from_connection_string(
)
return cls(**constructor_args)

def send(self, message):
def send_messages(self, message):
# type: (Union[Message, BatchMessage, List[Message]]) -> None
"""Sends message and blocks until acknowledgement is received or operation times out.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def from_connection_string(
raise ValueError("Subscription name is missing for the topic. Please specify subscription_name.")
return cls(**constructor_args)

async def receive(self, max_batch_size=None, max_wait_time=None):
async def receive_messages(self, max_batch_size=None, max_wait_time=None):
# type: (int, float) -> List[ReceivedMessage]
"""Receive a batch of messages at once.

Expand Down Expand Up @@ -353,7 +353,7 @@ async def receive_deferred_messages(self, sequence_numbers):
m._receiver = self # pylint: disable=protected-access
return messages

async def peek(self, message_count=1, sequence_number=0):
async def peek_messages(self, message_count=1, sequence_number=0):
"""Browse messages currently pending in the queue.

Peeked messages are not removed from queue, nor are they locked. They cannot be completed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async def _send(self, message, timeout=None, last_exception=None):
self._set_msg_timeout(timeout, last_exception)
await self._handler.send_message_async(message.message)

async def schedule(self, messages, schedule_time_utc):
async def schedule_messages(self, messages, schedule_time_utc):
# type: (Union[Message, List[Message]], datetime.datetime) -> List[int]
"""Send Message or multiple Messages to be enqueued at a specific time.
Returns a list of the sequence numbers of the enqueued messages.
Expand Down Expand Up @@ -236,7 +236,7 @@ def from_connection_string(
)
return cls(**constructor_args)

async def send(self, message):
async def send_messages(self, message):
# type: (Union[Message, BatchMessage, List[Message]]) -> None
"""Sends message and blocks until acknowledgement is received or operation times out.

Expand Down
25 changes: 13 additions & 12 deletions sdk/servicebus/azure-servicebus/migration_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ As a user this will be largely transparent to you, as initialization will still
the primary difference will be that rather than creating a queue_client, for instance, and then a sender off of that, you would simply
create a servicebus queue sender off of your ServiceBusClient instance via the "get_queue_sender" method.

It should also be noted that many of the helper methods that previously existed on the intermediary client (e.g. QueueClient and Peek) now
exist on the receiver (in the case of peek) or sender itself. This is to better consolidate functionality and align messaging link lifetime
It should also be noted that many of the helper methods that previously existed on the intermediary client (e.g. QueueClient and `peek()`) now
exist on the receiver (in the case of `peek()`) or sender itself. This is to better consolidate functionality and align messaging link lifetime
semantics with the sender or receiver lifetime.

### Client constructors
Expand All @@ -40,27 +40,28 @@ semantics with the sender or receiver lifetime.

| In v0.50 | Equivalent in v7 | Sample |
|---|---|---|
| `QueueClient.from_connection_string().get_receiver().fetch_next() and ServiceBusClient.from_connection_string().get_queue().get_receiver().fetch_next()`| `ServiceBusClient.from_connection_string().get_queue_receiver().receive()`| [Get a receiver and receive a single batch of messages](./samples/sync_samples/receive_queue.py) |
| `QueueClient.from_connection_string().get_receiver().fetch_next() and ServiceBusClient.from_connection_string().get_queue().get_receiver().fetch_next()`| `ServiceBusClient.from_connection_string().get_queue_receiver().receive_messages()`| [Get a receiver and receive a single batch of messages](./samples/sync_samples/receive_queue.py) |
| `QueueClient.from_connection_string().get_receiver().peek() and ServiceBusClient.from_connection_string().get_queue().get_receiver().peek()`| `ServiceBusClient.from_connection_string().get_queue_receiver().peek_messages()`| [Get a receiver and receive a single batch of messages](./samples/sync_samples/receive_queue.py) |

### Sending messages

| In v0.50 | Equivalent in v7 | Sample |
|---|---|---|
| `QueueClient.from_connection_string().send() and ServiceBusClient.from_connection_string().get_queue().get_sender().send()`| `ServiceBusClient.from_connection_string().get_queue_sender().send()`| [Get a sender and send a message](./samples/sync_samples/send_queue.py) |
| `queue_client.send(BatchMessage(["data 1", "data 2", ...]))`| `batch = queue_sender.create_batch() batch.add(Message("data 1")) queue_sender.send(batch)`| [Create and send a batch of messages](./samples/sync_samples/send_queue.py) |
| `QueueClient.from_connection_string().send() and ServiceBusClient.from_connection_string().get_queue().get_sender().send()`| `ServiceBusClient.from_connection_string().get_queue_sender().send_messages()`| [Get a sender and send a message](./samples/sync_samples/send_queue.py) |
| `queue_client.send(BatchMessage(["data 1", "data 2", ...]))`| `batch = queue_sender.create_batch() batch.add(Message("data 1")) queue_sender.send_messages(batch)`| [Create and send a batch of messages](./samples/sync_samples/send_queue.py) |

### Scheduling messages and cancelling scheduled messages

| In v0.50 | Equivalent in v7 | Sample |
|---|---|---|
| `queue_client.get_sender().schedule(schedule_time_utc, message1, message2)` | `sb_client.get_queue_sender().schedule([message1, message2], schedule_time_utc)` | [Schedule messages](./samples/sync_samples/schedule_messages_and_cancellation.py) |
| `queue_client.get_sender().schedule(schedule_time_utc, message1, message2)` | `sb_client.get_queue_sender().schedule_messages([message1, message2], schedule_time_utc)` | [Schedule messages](./samples/sync_samples/schedule_messages_and_cancellation.py) |
| `queue_client.get_sender().cancel_scheduled_messages(sequence_number1, sequence_number2)`| `sb_client.get_queue_sender().cancel_scheduled_messages([sequence_number1, sequence_number2])` | [Cancel scheduled messages](./samples/sync_samples/schedule_messages_and_cancellation.py)|

### Working with sessions

| In v0.50 | Equivalent in v7 | Sample |
|---|---|---|
| `queue_client.send(message, session='foo') and queue_client.get_sender(session='foo').send(message)`| `sb_client.get_queue_sender().send(Message('body', session_id='foo'))`| [Send a message to a session](./samples/sync_samples/session_send_receive.py) |
| `queue_client.send(message, session='foo') and queue_client.get_sender(session='foo').send(message)`| `sb_client.get_queue_sender().send_messages(Message('body', session_id='foo'))`| [Send a message to a session](./samples/sync_samples/session_send_receive.py) |
| `AutoLockRenew().register(queue_client.get_receiver(session='foo'))`| `AutoLockRenew().register(sb_client.get_queue_session_receiver(session_id='foo').session)`| [Access a session and ensure its lock is auto-renewed](./samples/sync_samples/session_send_receive.py) |
| `receiver.get_session_state()` | `receiver.session.get_session_state()` | [Perform session specific operations on a receiver](./samples/sync_samples/session_send_receive.py)

Expand Down Expand Up @@ -118,7 +119,7 @@ Becomes this in v7:
with ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR) as client:

with client.get_queue_receiver(queue_name=QUEUE_NAME) as receiver:
batch = receiver.receive(max_batch_size=10, max_wait_time=5)
batch = receiver.receive_messages(max_batch_size=10, max_wait_time=5)
for message in batch:
print("Message: {}".format(message))
message.complete()
Expand Down Expand Up @@ -148,12 +149,12 @@ with queue_client.get_sender() as sender:
# Send one at a time.
for i in range(100):
message = Message("Sample message no. {}".format(i))
sender.send(message)
sender.schedule_messages(message)

# Send as a batch.
messages_to_batch = [Message("Batch message no. {}".format(i)) for i in range(10)]
batch = BatchMessage(messages_to_batch)
sender.send(batch)
sender.schedule_messages(batch)
```

In v7:
Expand All @@ -164,11 +165,11 @@ with ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR) as client:
# Sending one at a time.
for i in range(100):
message = Message("Sample message no. {}".format(i))
sender.send(message)
sender.schedule_messages(message)

# Send as a batch
batch = new BatchMessage()
for i in range(10):
batch.add(Message("Batch message no. {}".format(i)))
sender.send(batch)
sender.schedule_messages(batch)
```
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

async def send_single_message(sender):
message = Message("DATA" * 64)
await sender.send(message)
await sender.send_messages(message)


async def main():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def main():
async with servicebus_client:
receiver = servicebus_client.get_queue_deadletter_receiver(queue_name=QUEUE_NAME, prefetch=10)
async with receiver:
received_msgs = await receiver.receive(max_batch_size=10, max_wait_time=5)
received_msgs = await receiver.receive_messages(max_batch_size=10, max_wait_time=5)
for msg in received_msgs:
print(str(msg))
await msg.complete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async def main():
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch=10)

async with receiver:
received_msgs = await receiver.receive(max_batch_size=10, max_wait_time=5)
received_msgs = await receiver.receive_messages(max_batch_size=10, max_wait_time=5)
deferred_sequenced_numbers = []
for msg in received_msgs:
print("Deferring msg: {}".format(str(msg)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def main():
async with servicebus_client:
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
async with receiver:
received_msgs = await receiver.peek(message_count=2)
received_msgs = await receiver.peek_messages(message_count=2)
for msg in received_msgs:
print(str(msg))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def main():
async with servicebus_client:
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch=10)
async with receiver:
received_msgs = await receiver.receive(max_batch_size=10, max_wait_time=5)
received_msgs = await receiver.receive_messages(max_batch_size=10, max_wait_time=5)
for msg in received_msgs:
print(str(msg))
await msg.complete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def main():
prefetch=10
)
async with receiver:
received_msgs = await receiver.receive(max_batch_size=10, max_wait_time=5)
received_msgs = await receiver.receive_messages(max_batch_size=10, max_wait_time=5)
for msg in received_msgs:
print(str(msg))
await msg.complete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ async def example_send_and_receive_async():
# [START send_async]
async with servicebus_sender:
message = Message("Hello World")
await servicebus_sender.send(message)
await servicebus_sender.send_messages(message)
# [END send_async]

# [START create_batch_async]
Expand All @@ -204,14 +204,14 @@ async def example_send_and_receive_async():

# [START peek_messages_async]
async with servicebus_receiver:
messages = await servicebus_receiver.peek()
messages = await servicebus_receiver.peek_messages()
for message in messages:
print(message)
# [END peek_messages_async]

# [START receive_async]
async with servicebus_receiver:
messages = await servicebus_receiver.receive(max_wait_time=5)
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
print(message)
await message.complete()
Expand All @@ -233,11 +233,11 @@ async def example_receive_deferred_async():
servicebus_sender = await example_create_servicebus_sender_async()
servicebus_receiver = await example_create_servicebus_receiver_async()
async with servicebus_sender:
await servicebus_sender.send(Message("Hello World"))
await servicebus_sender.send_messages(Message("Hello World"))
# [START receive_defer_async]
async with servicebus_receiver:
deferred_sequenced_numbers = []
messages = await servicebus_receiver.receive(max_wait_time=5)
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
deferred_sequenced_numbers.append(message.sequence_number)
print(message)
Expand Down Expand Up @@ -302,7 +302,7 @@ async def example_schedule_ops_async():
async with servicebus_sender:
scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30)
scheduled_messages = [Message("Scheduled message") for _ in range(10)]
sequence_nums = await servicebus_sender.schedule(scheduled_messages, scheduled_time_utc)
sequence_nums = await servicebus_sender.schedule_messages(scheduled_messages, scheduled_time_utc)
# [END scheduling_messages_async]

# [START cancel_scheduled_messages_async]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
async def schedule_single_message(sender):
message = Message("Message to be scheduled")
scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30)
sequence_number = await sender.schedule(message, scheduled_time_utc)
sequence_number = await sender.schedule_messages(message, scheduled_time_utc)
return sequence_number


Expand All @@ -34,7 +34,7 @@ async def schedule_multiple_messages(sender):
messages_to_schedule.append(Message("Message to be scheduled"))

scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30)
sequence_numbers = await sender.schedule(messages_to_schedule, scheduled_time_utc)
sequence_numbers = await sender.schedule_messages(messages_to_schedule, scheduled_time_utc)
return sequence_numbers


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

async def send_single_message(sender):
message = Message("DATA" * 64)
await sender.send(message)
await sender.send_messages(message)


async def send_batch_message(sender):
Expand All @@ -34,7 +34,7 @@ async def send_batch_message(sender):
# BatchMessage object reaches max_size.
# New BatchMessage object can be created here to send more data.
break
await sender.send(batch_message)
await sender.send_messages(batch_message)


async def main():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

async def send_single_message(sender):
message = Message("DATA" * 64)
await sender.send(message)
await sender.send_messages(message)


async def send_batch_message(sender):
Expand All @@ -34,7 +34,7 @@ async def send_batch_message(sender):
# BatchMessage object reaches max_size.
# New BatchMessage object can be created here to send more data.
break
await sender.send(batch_message)
await sender.send_messages(batch_message)


async def main():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ async def sample_session_send_receive_with_pool_async(connection_string, queue_n

for session_id in sessions:
async with client.get_queue_sender(queue_name) as sender:
await asyncio.gather(*[sender.send(Message("Sample message no. {}".format(i), session_id=session_id)) for i in range(20)])
await sender.send(Message("shutdown", session_id=session_id))
await asyncio.gather(*[sender.send_messages(Message("Sample message no. {}".format(i), session_id=session_id)) for i in range(20)])
await sender.send_messages(Message("shutdown", session_id=session_id))

receive_sessions = [message_processing(client, queue_name) for _ in range(concurrent_receivers)]
await asyncio.gather(*receive_sessions)
Expand Down
Loading