Skip to content

Commit

Permalink
Rename receive() and peek() (send/schedule as well) as `receive_m…
Browse files Browse the repository at this point in the history
…essages()` and `peek_messages()` respectively to align with other service bus SDKs. (#12222)

* Rename `receive()` and `peek()` as `receive_messages()` and `peek_messages()` respectively to align with other service bus SDKs.
* Adjusts readmes, samples, tests, and migration guide accordingly.
* Rename send and schedule to send_messages and schedule_messages
* convert test_sb_client send/receive to _messages syntax
* Update sdk/servicebus/azure-servicebus/CHANGELOG.md
* Add breaking changes section with function renames within.
  • Loading branch information
KieranBrantnerMagee authored Jul 1, 2020
1 parent b0d481c commit 054a0f3
Show file tree
Hide file tree
Showing 41 changed files with 311 additions and 306 deletions.
4 changes: 4 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
* Fixed bug where sync AutoLockRenew does not shutdown itself timely.
* Fixed bug where async AutoLockRenew does not support context manager.

**Breaking Changes**

* Renamed `receive()`, `peek()` `schedule()` and `send()` to `receive_messages()`, `peek_messages()`, `schedule_messages()` and `send_messages()` to align with other service bus SDKs.

## 7.0.0b3 (2020-06-08)

**New Features**
Expand Down
24 changes: 12 additions & 12 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,19 @@ with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_sender(queue_name) as sender:
# Sending a single message
single_message = Message("Single message")
sender.send(single_message)
sender.send_messages(single_message)

# Sending a list of messages
messages = [Message("First message"), Message("Second message")]
sender.send(messages)
sender.send_messages(messages)
```

> **NOTE:** A message may be scheduled for delayed delivery using the `ServiceBusSender.schedule()` method, or by specifying `Message.scheduled_enqueue_time_utc` before calling `ServiceBusSender.send()`
> **NOTE:** A message may be scheduled for delayed delivery using the `ServiceBusSender.schedule_messages()` method, or by specifying `Message.scheduled_enqueue_time_utc` before calling `ServiceBusSender.send_messages()`
> For more detail on scheduling and schedule cancellation please see a sample [here](./samples/sync_samples/schedule_messages_and_cancellation.py).
### Receive messages from a queue

To receive from a queue, you can either perform an ad-hoc receive via "receiver.receive()" or receive persistently through the receiver itself.
To receive from a queue, you can either perform an ad-hoc receive via "receiver.receive_messages()" or receive persistently through the receiver itself.

#### Receive messages from a queue through iterating over ServiceBusReceiver

Expand All @@ -183,9 +183,9 @@ with ServiceBusClient.from_connection_string(connstr) as client:
> See [AutoLockRenewer](#autolockrenew) for a helper to perform this in the background automatically.
> Lock duration is set in Azure on the queue or topic itself.
#### [Receive messages from a queue through `ServiceBusReceiver.receive()`][receive_reference]
#### [Receive messages from a queue through `ServiceBusReceiver.receive_messages()`][receive_reference]

> **NOTE:** `ServiceBusReceiver.receive()` receives a single or constrained list of messages through an ad-hoc method call, as opposed to receiving perpetually from the generator. It always returns a list.
> **NOTE:** `ServiceBusReceiver.receive_messages()` receives a single or constrained list of messages through an ad-hoc method call, as opposed to receiving perpetually from the generator. It always returns a list.
```Python
from azure.servicebus import ServiceBusClient
Expand All @@ -196,19 +196,19 @@ queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
received_message_array = receiver.receive(max_wait_time=10) # try to receive a single message within 10 seconds
received_message_array = receiver.receive_messages(max_wait_time=10) # try to receive a single message within 10 seconds
if received_message_array:
print(str(received_message_array[0]))

with client.get_queue_receiver(queue_name, prefetch=5) as receiver:
received_message_array = receiver.receive(max_batch_size=5, max_wait_time=10) # try to receive maximum 5 messages in a batch within 10 seconds
received_message_array = receiver.receive_messages(max_batch_size=5, max_wait_time=10) # try to receive maximum 5 messages in a batch within 10 seconds
for message in received_message_array:
print(str(message))
```

In this example, max_batch_size (and prefetch, as required by max_batch_size) declares the maximum number of messages to attempt receiving before hitting a max_wait_time as specified in seconds.

> **NOTE:** It should also be noted that `ServiceBusReceiver.peek()` is subtly different than receiving, as it does not lock the messages being peeked, and thus they cannot be settled.
> **NOTE:** It should also be noted that `ServiceBusReceiver.peek_messages()` is subtly different than receiving, as it does not lock the messages being peeked, and thus they cannot be settled.

### [Sending][session_send_reference] and [receiving][session_receive_reference] a message from a session enabled queue
Expand All @@ -225,7 +225,7 @@ session_id = os.environ['SERVICE_BUS_SESSION_ID']

with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_sender(queue_name) as sender:
sender.send(Message("Session Enabled Message", session_id=session_id))
sender.send_messages(Message("Session Enabled Message", session_id=session_id))

# If session_id is null here, will receive from the first available session.
with client.get_queue_session_receiver(queue_name, session_id) as receiver:
Expand All @@ -252,7 +252,7 @@ subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_topic_sender(topic_name) as sender:
sender.send(Message("Data"))
sender.send_messages(Message("Data"))

# If session_id is null here, will receive from the first available session.
with client.get_subscription_receiver(topic_name, subscription_name) as receiver:
Expand Down Expand Up @@ -386,7 +386,7 @@ renewer = AutoLockRenew()
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_session_receiver(queue_name, session_id=session_id) as receiver:
renewer.register(receiver.session, timeout=300) # Timeout for how long to maintain the lock for, in seconds.
for msg in receiver.receive():
for msg in receiver.receive_messages():
renewer.register(msg, timeout=60)
# Do your application logic here
msg.complete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def from_connection_string(
raise ValueError("Subscription name is missing for the topic. Please specify subscription_name.")
return cls(**constructor_args)

def receive(self, max_batch_size=None, max_wait_time=None):
def receive_messages(self, max_batch_size=None, max_wait_time=None):
# type: (int, float) -> List[ReceivedMessage]
"""Receive a batch of messages at once.
Expand Down Expand Up @@ -372,7 +372,7 @@ def receive_deferred_messages(self, sequence_numbers):
m._receiver = self # pylint: disable=protected-access
return messages

def peek(self, message_count=1, sequence_number=None):
def peek_messages(self, message_count=1, sequence_number=None):
# type: (int, Optional[int]) -> List[PeekMessage]
"""Browse messages currently pending in the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def _send(self, message, timeout=None, last_exception=None):
self._set_msg_timeout(timeout, last_exception)
self._handler.send_message(message.message)

def schedule(self, messages, schedule_time_utc):
def schedule_messages(self, messages, schedule_time_utc):
# type: (Union[Message, List[Message]], datetime.datetime) -> List[int]
"""Send Message or multiple Messages to be enqueued at a specific time.
Returns a list of the sequence numbers of the enqueued messages.
Expand Down Expand Up @@ -295,7 +295,7 @@ def from_connection_string(
)
return cls(**constructor_args)

def send(self, message):
def send_messages(self, message):
# type: (Union[Message, BatchMessage, List[Message]]) -> None
"""Sends message and blocks until acknowledgement is received or operation times out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def from_connection_string(
raise ValueError("Subscription name is missing for the topic. Please specify subscription_name.")
return cls(**constructor_args)

async def receive(self, max_batch_size=None, max_wait_time=None):
async def receive_messages(self, max_batch_size=None, max_wait_time=None):
# type: (int, float) -> List[ReceivedMessage]
"""Receive a batch of messages at once.
Expand Down Expand Up @@ -360,7 +360,7 @@ async def receive_deferred_messages(self, sequence_numbers):
m._receiver = self # pylint: disable=protected-access
return messages

async def peek(self, message_count=1, sequence_number=0):
async def peek_messages(self, message_count=1, sequence_number=0):
"""Browse messages currently pending in the queue.
Peeked messages are not removed from queue, nor are they locked. They cannot be completed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async def _send(self, message, timeout=None, last_exception=None):
self._set_msg_timeout(timeout, last_exception)
await self._handler.send_message_async(message.message)

async def schedule(self, messages, schedule_time_utc):
async def schedule_messages(self, messages, schedule_time_utc):
# type: (Union[Message, List[Message]], datetime.datetime) -> List[int]
"""Send Message or multiple Messages to be enqueued at a specific time by the service.
Returns a list of the sequence numbers of the enqueued messages.
Expand Down Expand Up @@ -236,7 +236,7 @@ def from_connection_string(
)
return cls(**constructor_args)

async def send(self, message):
async def send_messages(self, message):
# type: (Union[Message, BatchMessage, List[Message]]) -> None
"""Sends message and blocks until acknowledgement is received or operation times out.
Expand Down
25 changes: 13 additions & 12 deletions sdk/servicebus/azure-servicebus/migration_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ As a user this will be largely transparent to you, as initialization will still
the primary difference will be that rather than creating a queue_client, for instance, and then a sender off of that, you would simply
create a servicebus queue sender off of your ServiceBusClient instance via the "get_queue_sender" method.

It should also be noted that many of the helper methods that previously existed on the intermediary client (e.g. QueueClient and Peek) now
exist on the receiver (in the case of peek) or sender itself. This is to better consolidate functionality and align messaging link lifetime
It should also be noted that many of the helper methods that previously existed on the intermediary client (e.g. QueueClient and `peek()`) now
exist on the receiver (in the case of `peek()`) or sender itself. This is to better consolidate functionality and align messaging link lifetime
semantics with the sender or receiver lifetime.

### Client constructors
Expand All @@ -40,27 +40,28 @@ semantics with the sender or receiver lifetime.

| In v0.50 | Equivalent in v7 | Sample |
|---|---|---|
| `QueueClient.from_connection_string().get_receiver().fetch_next() and ServiceBusClient.from_connection_string().get_queue().get_receiver().fetch_next()`| `ServiceBusClient.from_connection_string().get_queue_receiver().receive()`| [Get a receiver and receive a single batch of messages](./samples/sync_samples/receive_queue.py) |
| `QueueClient.from_connection_string().get_receiver().fetch_next() and ServiceBusClient.from_connection_string().get_queue().get_receiver().fetch_next()`| `ServiceBusClient.from_connection_string().get_queue_receiver().receive_messages()`| [Get a receiver and receive a single batch of messages](./samples/sync_samples/receive_queue.py) |
| `QueueClient.from_connection_string().get_receiver().peek() and ServiceBusClient.from_connection_string().get_queue().get_receiver().peek()`| `ServiceBusClient.from_connection_string().get_queue_receiver().peek_messages()`| [Get a receiver and receive a single batch of messages](./samples/sync_samples/receive_queue.py) |

### Sending messages

| In v0.50 | Equivalent in v7 | Sample |
|---|---|---|
| `QueueClient.from_connection_string().send() and ServiceBusClient.from_connection_string().get_queue().get_sender().send()`| `ServiceBusClient.from_connection_string().get_queue_sender().send()`| [Get a sender and send a message](./samples/sync_samples/send_queue.py) |
| `queue_client.send(BatchMessage(["data 1", "data 2", ...]))`| `batch = queue_sender.create_batch() batch.add(Message("data 1")) queue_sender.send(batch)`| [Create and send a batch of messages](./samples/sync_samples/send_queue.py) |
| `QueueClient.from_connection_string().send() and ServiceBusClient.from_connection_string().get_queue().get_sender().send()`| `ServiceBusClient.from_connection_string().get_queue_sender().send_messages()`| [Get a sender and send a message](./samples/sync_samples/send_queue.py) |
| `queue_client.send(BatchMessage(["data 1", "data 2", ...]))`| `batch = queue_sender.create_batch() batch.add(Message("data 1")) queue_sender.send_messages(batch)`| [Create and send a batch of messages](./samples/sync_samples/send_queue.py) |

### Scheduling messages and cancelling scheduled messages

| In v0.50 | Equivalent in v7 | Sample |
|---|---|---|
| `queue_client.get_sender().schedule(schedule_time_utc, message1, message2)` | `sb_client.get_queue_sender().schedule([message1, message2], schedule_time_utc)` | [Schedule messages](./samples/sync_samples/schedule_messages_and_cancellation.py) |
| `queue_client.get_sender().schedule(schedule_time_utc, message1, message2)` | `sb_client.get_queue_sender().schedule_messages([message1, message2], schedule_time_utc)` | [Schedule messages](./samples/sync_samples/schedule_messages_and_cancellation.py) |
| `queue_client.get_sender().cancel_scheduled_messages(sequence_number1, sequence_number2)`| `sb_client.get_queue_sender().cancel_scheduled_messages([sequence_number1, sequence_number2])` | [Cancel scheduled messages](./samples/sync_samples/schedule_messages_and_cancellation.py)|

### Working with sessions

| In v0.50 | Equivalent in v7 | Sample |
|---|---|---|
| `queue_client.send(message, session='foo') and queue_client.get_sender(session='foo').send(message)`| `sb_client.get_queue_sender().send(Message('body', session_id='foo'))`| [Send a message to a session](./samples/sync_samples/session_send_receive.py) |
| `queue_client.send(message, session='foo') and queue_client.get_sender(session='foo').send(message)`| `sb_client.get_queue_sender().send_messages(Message('body', session_id='foo'))`| [Send a message to a session](./samples/sync_samples/session_send_receive.py) |
| `AutoLockRenew().register(queue_client.get_receiver(session='foo'))`| `AutoLockRenew().register(sb_client.get_queue_session_receiver(session_id='foo').session)`| [Access a session and ensure its lock is auto-renewed](./samples/sync_samples/session_send_receive.py) |
| `receiver.get_session_state()` | `receiver.session.get_session_state()` | [Perform session specific operations on a receiver](./samples/sync_samples/session_send_receive.py)

Expand Down Expand Up @@ -118,7 +119,7 @@ Becomes this in v7:
with ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR) as client:

with client.get_queue_receiver(queue_name=QUEUE_NAME) as receiver:
batch = receiver.receive(max_batch_size=10, max_wait_time=5)
batch = receiver.receive_messages(max_batch_size=10, max_wait_time=5)
for message in batch:
print("Message: {}".format(message))
message.complete()
Expand Down Expand Up @@ -148,12 +149,12 @@ with queue_client.get_sender() as sender:
# Send one at a time.
for i in range(100):
message = Message("Sample message no. {}".format(i))
sender.send(message)
sender.schedule_messages(message)

# Send as a batch.
messages_to_batch = [Message("Batch message no. {}".format(i)) for i in range(10)]
batch = BatchMessage(messages_to_batch)
sender.send(batch)
sender.schedule_messages(batch)
```

In v7:
Expand All @@ -164,11 +165,11 @@ with ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR) as client:
# Sending one at a time.
for i in range(100):
message = Message("Sample message no. {}".format(i))
sender.send(message)
sender.schedule_messages(message)

# Send as a batch
batch = new BatchMessage()
for i in range(10):
batch.add(Message("Batch message no. {}".format(i)))
sender.send(batch)
sender.schedule_messages(batch)
```
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

async def send_single_message(sender):
message = Message("DATA" * 64)
await sender.send(message)
await sender.send_messages(message)


async def main():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def main():
async with servicebus_client:
receiver = servicebus_client.get_queue_deadletter_receiver(queue_name=QUEUE_NAME, prefetch=10)
async with receiver:
received_msgs = await receiver.receive(max_batch_size=10, max_wait_time=5)
received_msgs = await receiver.receive_messages(max_batch_size=10, max_wait_time=5)
for msg in received_msgs:
print(str(msg))
await msg.complete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async def main():
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch=10)

async with receiver:
received_msgs = await receiver.receive(max_batch_size=10, max_wait_time=5)
received_msgs = await receiver.receive_messages(max_batch_size=10, max_wait_time=5)
deferred_sequenced_numbers = []
for msg in received_msgs:
print("Deferring msg: {}".format(str(msg)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def main():
async with servicebus_client:
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
async with receiver:
received_msgs = await receiver.peek(message_count=2)
received_msgs = await receiver.peek_messages(message_count=2)
for msg in received_msgs:
print(str(msg))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def main():
async with servicebus_client:
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch=10)
async with receiver:
received_msgs = await receiver.receive(max_batch_size=10, max_wait_time=5)
received_msgs = await receiver.receive_messages(max_batch_size=10, max_wait_time=5)
for msg in received_msgs:
print(str(msg))
await msg.complete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def main():
prefetch=10
)
async with receiver:
received_msgs = await receiver.receive(max_batch_size=10, max_wait_time=5)
received_msgs = await receiver.receive_messages(max_batch_size=10, max_wait_time=5)
for msg in received_msgs:
print(str(msg))
await msg.complete()
Expand Down
Loading

0 comments on commit 054a0f3

Please sign in to comment.