diff --git a/sdk/eventhub/azure-eventhub-checkpointstoretable/azure/eventhub/extensions/checkpointstoretable/_tablestoragecs.py b/sdk/eventhub/azure-eventhub-checkpointstoretable/azure/eventhub/extensions/checkpointstoretable/_tablestoragecs.py index d88cb4b417ca..168927875bb3 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoretable/azure/eventhub/extensions/checkpointstoretable/_tablestoragecs.py +++ b/sdk/eventhub/azure-eventhub-checkpointstoretable/azure/eventhub/extensions/checkpointstoretable/_tablestoragecs.py @@ -7,7 +7,6 @@ import time import logging import calendar -import dateutil.parser from azure.core import MatchConditions from azure.eventhub import CheckpointStore # type: ignore # pylint: disable=no-name-in-module from azure.eventhub.exceptions import OwnershipLostError # type: ignore @@ -18,6 +17,8 @@ ) from ._vendor.data.tables import TableClient, UpdateMode from ._vendor.data.tables._base_client import parse_connection_str +from ._vendor.data.tables._deserialize import clean_up_dotnet_timestamps +from ._vendor.data.tables._common_conversion import TZ_UTC logger = logging.getLogger(__name__) @@ -39,6 +40,19 @@ def _to_timestamp(date): timestamp += date.microsecond / 1e6 return timestamp +def _timestamp_to_datetime(value): + # Cosmos returns this with a decimal point that throws an error on deserialization + cleaned_value = clean_up_dotnet_timestamps(value) + try: + dt_obj = datetime.datetime.strptime(cleaned_value, "%Y-%m-%dT%H:%M:%S.%fZ").replace( + tzinfo=TZ_UTC + ) + except ValueError: + dt_obj = datetime.datetime.strptime(cleaned_value, "%Y-%m-%dT%H:%M:%SZ").replace( + tzinfo=TZ_UTC + ) + return dt_obj + class TableCheckpointStore(CheckpointStore): """A CheckpointStore that uses Azure Table Storage to store the partition ownership and checkpoint data. @@ -113,13 +127,13 @@ def _create_ownership_entity(cls, ownership): Create a dictionary with the `ownership` attributes. """ ownership_entity = { - "PartitionKey": "{} {} {} Ownership".format( + "PartitionKey": u"{} {} {} Ownership".format( ownership["fully_qualified_namespace"], ownership["eventhub_name"], ownership["consumer_group"], ), - "RowKey": ownership["partition_id"], - "ownerid": ownership["owner_id"], + "RowKey": u"{}".format(ownership["partition_id"]), + "ownerid": u"{}".format(ownership["owner_id"]), } return ownership_entity @@ -129,21 +143,21 @@ def _create_checkpoint_entity(cls, checkpoint): Create a dictionary with `checkpoint` attributes. """ checkpoint_entity = { - "PartitionKey": "{} {} {} Checkpoint".format( + "PartitionKey": u"{} {} {} Checkpoint".format( checkpoint["fully_qualified_namespace"], checkpoint["eventhub_name"], checkpoint["consumer_group"], ), - "RowKey": checkpoint["partition_id"], - "offset": checkpoint["offset"], - "sequencenumber": checkpoint["sequence_number"], + "RowKey": u"{}".format(checkpoint["partition_id"]), + "offset": u"{}".format(checkpoint["offset"]), + "sequencenumber": u"{}".format(checkpoint["sequence_number"]), } return checkpoint_entity def _update_ownership(self, ownership, **kwargs): """_update_ownership mutates the passed in ownership.""" + ownership_entity = TableCheckpointStore._create_ownership_entity(ownership) try: - ownership_entity = TableCheckpointStore._create_ownership_entity(ownership) metadata = self._table_client.update_entity( mode=UpdateMode.REPLACE, entity=ownership_entity, @@ -166,7 +180,7 @@ def _update_ownership(self, ownership, **kwargs): ) ownership["etag"] = metadata["etag"] ownership["last_modified_time"] = _to_timestamp( - dateutil.parser.isoparse(metadata["content"]["Timestamp"]) + _timestamp_to_datetime(metadata["content"]["Timestamp"]) ) def _claim_one_partition(self, ownership, **kwargs): @@ -289,7 +303,7 @@ def list_checkpoints( "eventhub_name": eventhub_name, "consumer_group": consumer_group, "partition_id": entity[u"RowKey"], - "sequence_number": entity[u"sequencenumber"], + "sequence_number": int(entity[u"sequencenumber"]), "offset": str(entity[u"offset"]), } checkpoints_list.append(checkpoint) diff --git a/sdk/eventhub/azure-eventhub-checkpointstoretable/tests/test_storage_table_partition_manager.py b/sdk/eventhub/azure-eventhub-checkpointstoretable/tests/test_storage_table_partition_manager.py index 33e60c134464..c19b0cb6ffc4 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoretable/tests/test_storage_table_partition_manager.py +++ b/sdk/eventhub/azure-eventhub-checkpointstoretable/tests/test_storage_table_partition_manager.py @@ -13,14 +13,15 @@ from azure.eventhub.extensions.checkpointstoretable import TableCheckpointStore from azure.eventhub.exceptions import OwnershipLostError -STORAGE_CONN_STR = [ - #os.environ.get("AZURE_STORAGE_CONN_STR", "Azure Storage Connection String"), - os.environ.get("AZURE_COSMOS_CONN_STR", "Azure Storage Connection String"), +STORAGE_ENV_KEYS = [ + "AZURE_TABLES_CONN_STR", + "AZURE_COSMOS_CONN_STR" ] -def get_live_storage_table_client(storage_connection_str): +def get_live_storage_table_client(conn_str_env_key): try: + storage_connection_str = os.environ[conn_str_env_key] table_name = "table{}".format(uuid.uuid4().hex) table_service_client = TableServiceClient.from_connection_string( storage_connection_str @@ -176,11 +177,11 @@ def _update_and_list_checkpoint(storage_connection_str, table_name): assert checkpoint_list[0]["offset"] == "30" -@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR) -@pytest.mark.skip("update after adding conn str env var") -def test_claim_ownership_exception(storage_connection_str): +@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS) +@pytest.mark.liveTest +def test_claim_ownership_exception(conn_str_env_key): storage_connection_str, table_name = get_live_storage_table_client( - storage_connection_str + conn_str_env_key ) try: _claim_ownership_exception_test(storage_connection_str, table_name) @@ -188,11 +189,11 @@ def test_claim_ownership_exception(storage_connection_str): remove_live_storage_table_client(storage_connection_str, table_name) -@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR) -@pytest.mark.skip("update after adding conn str env var") -def test_claim_and_list_ownership(storage_connection_str): +@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS) +@pytest.mark.liveTest +def test_claim_and_list_ownership(conn_str_env_key): storage_connection_str, table_name = get_live_storage_table_client( - storage_connection_str + conn_str_env_key ) try: _claim_and_list_ownership(storage_connection_str, table_name) @@ -200,11 +201,11 @@ def test_claim_and_list_ownership(storage_connection_str): remove_live_storage_table_client(storage_connection_str, table_name) -@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR) -@pytest.mark.skip("update after adding conn str env var") -def test_update_checkpoint(storage_connection_str): +@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS) +@pytest.mark.liveTest +def test_update_checkpoint(conn_str_env_key): storage_connection_str, table_name = get_live_storage_table_client( - storage_connection_str + conn_str_env_key ) try: _update_and_list_checkpoint(storage_connection_str, table_name) diff --git a/sdk/eventhub/test-resources.json b/sdk/eventhub/test-resources.json index 3566eb9745f9..f66ea2ac3756 100644 --- a/sdk/eventhub/test-resources.json +++ b/sdk/eventhub/test-resources.json @@ -70,11 +70,14 @@ "eventHubsNamespace": "[concat('eh-', parameters('baseName'))]", "eventHubName": "[concat('eh-', parameters('baseName'), '-hub')]", "eventHubAuthRuleName": "[concat('eh-', parameters('baseName'), '-hub-auth-rule')]", - "storageAccount": "[concat('blb', parameters('baseName'))]", + "storageAccount": "[concat('storage', parameters('baseName'))]", "containerName": "your-blob-container-name", "defaultSASKeyName": "RootManageSharedAccessKey", "eventHubsAuthRuleResourceId": "[resourceId('Microsoft.EventHub/namespaces/authorizationRules', variables('eventHubsNamespace'), variables('defaultSASKeyName'))]", "storageAccountId": "[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccount'))]", + "tablesMgmtApiVersion": "2019-04-01", + "tablesAuthorizationApiVersion": "2018-09-01-preview", + "tableDataContributorRoleId": "0a9a7e1f-b9d0-4cc4-a60d-0319b160aaa3" }, "resources": [ { @@ -140,6 +143,48 @@ } ] }, + { + "type": "Microsoft.DocumentDB/databaseAccounts", + "apiVersion": "2020-04-01", + "name": "[variables('storageAccount')]", + "location": "[parameters('location')]", + "tags": { + "defaultExperience": "Azure Table", + "hidden-cosmos-mmspecial": "", + "CosmosAccountType": "Non-Production" + }, + "kind": "GlobalDocumentDB", + "properties": { + "publicNetworkAccess": "Enabled", + "enableAutomaticFailover": false, + "enableMultipleWriteLocations": false, + "isVirtualNetworkFilterEnabled": false, + "virtualNetworkRules": [], + "disableKeyBasedMetadataWriteAccess": false, + "enableFreeTier": false, + "enableAnalyticalStorage": false, + "databaseAccountOfferType": "Standard", + "consistencyPolicy": { + "defaultConsistencyLevel": "BoundedStaleness", + "maxIntervalInSeconds": 86400, + "maxStalenessPrefix": 1000000 + }, + "locations": [ + { + "locationName": "[parameters('location')]", + "provisioningState": "Succeeded", + "failoverPriority": 0, + "isZoneRedundant": false + } + ], + "capabilities": [ + { + "name": "EnableTable" + } + ], + "ipRules": [] + } + }, { "type": "Microsoft.Authorization/roleAssignments", "apiVersion": "2019-04-01-preview", @@ -159,6 +204,15 @@ "principalId": "[parameters('testApplicationOid')]", "scope": "[resourceGroup().id]" } + }, + { + "type": "Microsoft.Authorization/roleAssignments", + "apiVersion": "[variables('tablesAuthorizationApiVersion')]", + "name": "[guid(concat('tableDataContributorRoleId', resourceGroup().id))]", + "properties": { + "roleDefinitionId": "[resourceId('Microsoft.Authorization/roleDefinitions', variables('tableDataContributorRoleId'))]", + "principalId": "[parameters('testApplicationOid')]" + } } ], "outputs": { @@ -197,6 +251,14 @@ "AZURE_STORAGE_ACCESS_KEY":{ "type": "string", "value": "[listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value]" + }, + "AZURE_TABLES_CONN_STR": { + "type": "string", + "value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=', parameters('storageEndpointSuffix'))]" + }, + "AZURE_COSMOS_CONN_STR": { + "type": "string", + "value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(resourceId('Microsoft.DocumentDB/databaseAccounts', variables('storageAccount')), '2020-04-01').primaryMasterKey, ';TableEndpoint=https://', variables('storageAccount'), '.table.cosmos.azure.com:443/')]" } } }