Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for Async Generators in TableClient.submit_transaction method #21119

Merged
merged 9 commits into from
Oct 21, 2021
6 changes: 5 additions & 1 deletion sdk/tables/azure-data-tables/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# Release History

## 12.1.1 (2021-09-08)
## 12.1.1 (Unreleased)

### Bugs Fixed

- Resolved bug where strings couldn't be used instead of enum value for entity Update Mode (#20247).
- Resolved bug where single quote characters in Partition and Row keys were not escaped correctly (#20301).

### Features Added

- Added support for async iterators in `aio.TableClient.submit_transaction (#21083, thank you yashbhutoria).

### Other Changes

- Bumped dependency on `msrest` to `>=0.6.21`
Expand Down
21 changes: 18 additions & 3 deletions sdk/tables/azure-data-tables/azure/data/tables/_table_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
Dict,
Mapping,
Optional,
List
List,
Tuple
)

from azure.core import MatchConditions

from ._common_conversion import _transform_patch_to_cosmos_post
from ._models import UpdateMode
from ._models import UpdateMode, TransactionOperation
from ._serialize import _get_match_headers, _add_entity_properties, _prepare_key
from ._entity import TableEntity

Expand All @@ -26,8 +27,10 @@
from ._generated import models, AzureTable
from ._generated._configuration import AzureTableConfiguration

EntityType = Union[TableEntity, Mapping[str, Any]]

EntityType = Union[TableEntity, Mapping[str, Any]]
OperationType = Union[TransactionOperation, str]
TransactionOperationType = Union[Tuple[OperationType, EntityType], Tuple[OperationType, EntityType, Mapping[str, Any]]]


class TableBatchOperations(object):
Expand Down Expand Up @@ -91,6 +94,18 @@ def _verify_partition_key(
elif entity["PartitionKey"] != self._partition_key:
raise ValueError("Partition Keys must all be the same")

def add_operation(self, operation):
# type: (TransactionOperationType) -> None
"""Add a single operation to a batch."""
try:
operation_kwargs = operation[2] # type: ignore
except IndexError:
operation_kwargs = {}
try:
getattr(self, operation[0].lower())(operation[1], **operation_kwargs)
except AttributeError:
raise ValueError("Unrecognized operation: {}".format(operation))

def create(
self,
entity, # type: EntityType
Expand Down
36 changes: 17 additions & 19 deletions sdk/tables/azure-data-tables/azure/data/tables/_table_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# --------------------------------------------------------------------------

import functools
from typing import Optional, Any, TYPE_CHECKING, Union, List, Tuple, Dict, Mapping, Iterable, overload, cast
from typing import Optional, Any, TYPE_CHECKING, Union, List, Dict, Mapping, Iterable, overload, cast
try:
from urllib.parse import urlparse, unquote
except ImportError:
Expand Down Expand Up @@ -34,19 +34,14 @@
from ._base_client import parse_connection_str, TablesBaseClient
from ._serialize import serialize_iso, _parameter_filter_substitution
from ._deserialize import deserialize_iso, _return_headers_and_deserialized
from ._table_batch import TableBatchOperations
from ._table_batch import TableBatchOperations, EntityType, TransactionOperationType
from ._models import (
TableEntityPropertiesPaged,
UpdateMode,
TableAccessPolicy,
TransactionOperation,
TableItem
)

EntityType = Union[TableEntity, Mapping[str, Any]]
OperationType = Union[TransactionOperation, str]
TransactionOperationType = Union[Tuple[OperationType, EntityType], Tuple[OperationType, EntityType, Mapping[str, Any]]]

if TYPE_CHECKING:
from azure.core.credentials import AzureNamedKeyCredential, AzureSasCredential

Expand Down Expand Up @@ -691,10 +686,14 @@ def submit_transaction(

If any one of these operations fails, the entire transaction will be rejected.

:param operations: The list of operations to commit in a transaction. This should be a list of
:param operations: The list of operations to commit in a transaction. This should be an iterable of
tuples containing an operation name, the entity on which to operate, and optionally, a dict of additional
kwargs for that operation.
:type operations: Iterable[Tuple[str, EntityType]]
kwargs for that operation. For example::

- ('upsert', {'PartitionKey': 'A', 'RowKey': 'B'})
- ('upsert', {'PartitionKey': 'A', 'RowKey': 'B'}, {'mode': UpdateMode.REPLACE})

:type operations: Iterable[TransactionOperationType]
:return: A list of mappings with response metadata for each operation in the transaction.
:rtype: List[Mapping[str, Any]]
:raises: :class:`~azure.data.tables.TableTransactionError`
Expand All @@ -717,13 +716,12 @@ def submit_transaction(
is_cosmos_endpoint=self._cosmos_endpoint,
**kwargs
)
for operation in operations:
try:
operation_kwargs = operation[2] # type: ignore
except IndexError:
operation_kwargs = {}
try:
getattr(batched_requests, operation[0].lower())(operation[1], **operation_kwargs)
except AttributeError:
raise ValueError("Unrecognized operation: {}".format(operation[0]))
try:
for operation in operations:
batched_requests.add_operation(operation)
except TypeError:
raise TypeError(
"The value of 'operations' must be an iterator "
"of Tuples. Please check documentation for correct Tuple format."
)
return self._batch_send(*batched_requests.requests, **kwargs) # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .._common_conversion import _transform_patch_to_cosmos_post
from .._models import UpdateMode
from .._entity import TableEntity
from .._table_batch import EntityType
from .._table_batch import EntityType, TransactionOperationType
from .._serialize import (
_prepare_key,
_get_match_headers,
Expand Down Expand Up @@ -68,6 +68,17 @@ def _verify_partition_key(
elif entity["PartitionKey"] != self._partition_key:
raise ValueError("Partition Keys must all be the same")

def add_operation(self, operation: TransactionOperationType) -> None:
"""Add a single operation to a batch."""
try:
operation_kwargs = operation[2] # type: ignore
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there is a way to make this more readable?

try:
    operation_type, entity, kwargs = operation
except ValueError:
    operation_type, entity, kwargs = *operation, {}
try:
    getattr(self, operation_type.lower())(entity, **kwargs)

Copy link
Member

@annatisch annatisch Oct 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's really elegant! Love it! Updating now...

except IndexError:
operation_kwargs = {}
try:
getattr(self, operation[0].lower())(operation[1], **operation_kwargs)
except AttributeError:
raise ValueError("Unrecognized operation: {}".format(operation))

def create(
self,
entity: EntityType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# license information.
# --------------------------------------------------------------------------
import functools
from typing import List, Union, Any, Optional, Mapping, Iterable, Dict, overload, cast, TYPE_CHECKING
from typing import AsyncIterable, List, Union, Any, Optional, Mapping, Iterable, Dict, overload, cast, TYPE_CHECKING
try:
from urllib.parse import urlparse, unquote
except ImportError:
Expand Down Expand Up @@ -664,17 +664,23 @@ async def upsert_entity(
@distributed_trace_async
async def submit_transaction(
self,
operations: Iterable[TransactionOperationType],
operations: Union[
Iterable[TransactionOperationType], AsyncIterable[TransactionOperationType]
],
**kwargs
) -> List[Mapping[str, Any]]:
"""Commit a list of operations as a single transaction.

If any one of these operations fails, the entire transaction will be rejected.

:param operations: The list of operations to commit in a transaction. This should be a list of
tuples containing an operation name, the entity on which to operate, and optionally, a dict of additional
kwargs for that operation.
:type operations: Iterable[Tuple[str, EntityType]]
:param operations: The list of operations to commit in a transaction. This should be an iterable
(or async iterable) of tuples containing an operation name, the entity on which to operate,
and optionally, a dict of additional kwargs for that operation. For example::

- ('upsert', {'PartitionKey': 'A', 'RowKey': 'B'})
- ('upsert', {'PartitionKey': 'A', 'RowKey': 'B'}, {'mode': UpdateMode.REPLACE})

:type operations: Union[Iterable[TransactionOperationType],AsyncIterable[TransactionOperationType]]
:return: A list of mappings with response metadata for each operation in the transaction.
:rtype: List[Mapping[str, Any]]
:raises ~azure.data.tables.TableTransactionError:
Expand All @@ -697,13 +703,17 @@ async def submit_transaction(
is_cosmos_endpoint=self._cosmos_endpoint,
**kwargs
)
for operation in operations:
try:
operation_kwargs = operation[2] # type: ignore
except IndexError:
operation_kwargs = {}
try:
for operation in operations: # type: ignore
batched_requests.add_operation(operation)
except TypeError:
try:
getattr(batched_requests, operation[0].lower())(operation[1], **operation_kwargs)
except AttributeError:
raise ValueError("Unrecognized operation: {}".format(operation))
async for operation in operations: # type: ignore
batched_requests.add_operation(operation)
except TypeError:
raise TypeError(
"The value of 'operations' must be an iterator or async iterator "
"of Tuples. Please check documentation for correct Tuple format."
)

return await self._batch_send(*batched_requests.requests, **kwargs)
Loading