diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/LICENSE b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/LICENSE new file mode 100644 index 000000000000..21071075c245 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/LICENSE @@ -0,0 +1,21 @@ + MIT License + + Copyright (c) Microsoft Corporation. All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/MANIFEST.in b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/MANIFEST.in new file mode 100644 index 000000000000..50c61fef797b --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/MANIFEST.in @@ -0,0 +1,2 @@ +include *.md +include azure/__init__.py \ No newline at end of file diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/__init__.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/__init__.py new file mode 100644 index 000000000000..62351a0ab30b --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/__init__.py @@ -0,0 +1,5 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/__init__.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/__init__.py new file mode 100644 index 000000000000..62351a0ab30b --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/__init__.py @@ -0,0 +1,5 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/__init__.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/__init__.py new file mode 100644 index 000000000000..62351a0ab30b --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/__init__.py @@ -0,0 +1,5 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/__init__.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/__init__.py new file mode 100644 index 000000000000..9e0e473c9a8b --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/__init__.py @@ -0,0 +1,12 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +__version__ = "1.0.0b1" + +from .blobstoragepm import BlobPartitionManager + +__all__ = [ + "BlobPartitionManager", +] diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/blobstoragepm.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/blobstoragepm.py new file mode 100644 index 000000000000..85fcc1ca2eb5 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/blobstoragepm.py @@ -0,0 +1,123 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +from typing import Iterable, Dict, Any +import logging +from collections import defaultdict +import asyncio +from azure.eventhub.aio.eventprocessor import PartitionManager, OwnershipLostError # type: ignore +from azure.core.exceptions import ResourceModifiedError, ResourceExistsError # type: ignore +from azure.storage.blob.aio import ContainerClient # type: ignore + +logger = logging.getLogger(__name__) +UPLOAD_DATA = "" + + +class BlobPartitionManager(PartitionManager): + """An PartitionManager that uses Azure Blob Storage to store the partition ownership and checkpoint data. + + This class implements methods list_ownership, claim_ownership, and update_checkpoint that are defined in class + azure.eventhub.eventprocessor.PartitionManager of package azure-eventhub. + + """ + def __init__(self, container_client: ContainerClient): + """ + + :param container_client: The Azure Blob Storage Container client. + """ + self._container_client = container_client + self._cached_ownership_dict = defaultdict(dict) # type: Dict[str, Dict[str, Any]] + # lock each partition for list_ownership, claim_ownership and update_checkpoint etag doesn't get out of sync + # when the three methods are running concurrently + self._cached_ownership_locks = defaultdict(asyncio.Lock) # type:Dict[str, asyncio.Lock] + + async def _upload_blob(self, ownership, metadata): + etag = ownership.get("etag") + if etag: + etag_match = {"if_match": etag} + else: + etag_match = {"if_none_match": '*'} + partition_id = ownership["partition_id"] + blob_client = await self._container_client.upload_blob( + name=partition_id, data=UPLOAD_DATA, overwrite=True, metadata=metadata, **etag_match + ) + uploaded_blob_properties = await blob_client.get_blob_properties() + ownership["etag"] = uploaded_blob_properties.etag + ownership["last_modified_time"] = uploaded_blob_properties.last_modified.timestamp() + + async def list_ownership(self, eventhub_name: str, consumer_group_name: str) -> Iterable[Dict[str, Any]]: + try: + blobs = self._container_client.list_blobs(include=['metadata']) + except Exception as err: # pylint:disable=broad-except + logger.warning("An exception occurred during list_ownership for eventhub %r consumer group %r. " + "Exception is %r", eventhub_name, consumer_group_name, err) + raise + async for b in blobs: + async with self._cached_ownership_locks[b.name]: + if b.name not in self._cached_ownership_dict \ + or b.last_modified.timestamp() >= self._cached_ownership_dict[b.name].get("last_modified_time"): + metadata = b.metadata + ownership = { + "eventhub_name": eventhub_name, + "consumer_group_name": consumer_group_name, + "partition_id": b.name, + "owner_id": metadata["owner_id"], + "etag": b.etag, + "last_modified_time": b.last_modified.timestamp() if b.last_modified else None + } + ownership.update(metadata) + self._cached_ownership_dict[b.name] = ownership + return self._cached_ownership_dict.values() + + async def claim_ownership(self, ownership_list: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]: + result = [] + for ownership in ownership_list: + partition_id = ownership["partition_id"] + eventhub_name = ownership["eventhub_name"] + consumer_group_name = ownership["consumer_group_name"] + owner_id = ownership["owner_id"] + + async with self._cached_ownership_locks[partition_id]: + metadata = {"owner_id": ownership["owner_id"]} + if "offset" in ownership: + metadata["offset"] = ownership["offset"] + if "sequence_number" in ownership: + metadata["sequence_number"] = ownership["sequence_number"] + try: + await self._upload_blob(ownership, metadata) + self._cached_ownership_dict[partition_id] = ownership + result.append(ownership) + except (ResourceModifiedError, ResourceExistsError): + logger.info( + "EventProcessor instance %r of eventhub %r consumer group %r lost ownership to partition %r", + owner_id, eventhub_name, consumer_group_name, partition_id) + except Exception as err: # pylint:disable=broad-except + logger.warning("An exception occurred when EventProcessor instance %r claim_ownership for " + "eventhub %r consumer group %r partition %r. The ownership is now lost. Exception " + "is %r", owner_id, eventhub_name, consumer_group_name, partition_id, err) + return result + + async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id, + offset, sequence_number) -> None: + metadata = { + "owner_id": owner_id, + "offset": offset, + "sequence_number": str(sequence_number) + } + cached_ownership = self._cached_ownership_dict[partition_id] + async with self._cached_ownership_locks[partition_id]: + try: + await self._upload_blob(cached_ownership, metadata) + except (ResourceModifiedError, ResourceExistsError): + logger.info( + "EventProcessor instance %r of eventhub %r consumer group %r couldn't update_checkpoint to " + "partition %r because the ownership has been stolen", + owner_id, eventhub_name, consumer_group_name, partition_id) + raise OwnershipLostError() + except Exception as err: + logger.warning( + "EventProcessor instance %r of eventhub %r consumer group %r couldn't update_checkpoint to " + "partition %r because of unexpected error. Exception is %r", + owner_id, eventhub_name, consumer_group_name, partition_id, err) + raise # EventProcessor will catch the exception and handle it diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/conftest.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/conftest.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt new file mode 100644 index 000000000000..092dbcdb7de7 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt @@ -0,0 +1 @@ +-e ../../eventhub/azure-eventhubs \ No newline at end of file diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py new file mode 100644 index 000000000000..e7edc047831a --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py @@ -0,0 +1,42 @@ +import asyncio +import logging +import os +from azure.eventhub.aio import EventHubClient +from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor +from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager +from azure.storage.blob.aio import ContainerClient + +RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout +RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small +CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] +STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"] + +logging.basicConfig(level=logging.INFO) + + +async def do_operation(event): + # do some sync or async operations. If the operation is i/o intensive, async will have better performance + print(event) + + +class MyPartitionProcessor(PartitionProcessor): + async def process_events(self, events, partition_context): + if events: + await asyncio.gather(*[do_operation(event) for event in events]) + await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number) + else: + print("empty events received", "partition:", partition_context.partition_id) + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL) + container_client = ContainerClient.from_connection_string(STORAGE_CONNECTION_STR, container="eventprocessor") + partition_manager = BlobPartitionManager(container_client=container_client) + event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager, polling_interval=10) + try: + loop.run_until_complete(event_processor.start()) + except KeyboardInterrupt: + loop.run_until_complete(event_processor.stop()) + finally: + loop.stop() diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/sdk_packaging.toml b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/sdk_packaging.toml new file mode 100644 index 000000000000..e7687fdae93b --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/sdk_packaging.toml @@ -0,0 +1,2 @@ +[packaging] +auto_update = false \ No newline at end of file diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.cfg b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.cfg new file mode 100644 index 000000000000..3480374bc2f2 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.cfg @@ -0,0 +1,2 @@ +[bdist_wheel] +universal=1 \ No newline at end of file diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py new file mode 100644 index 000000000000..257854d88cb0 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python + +#------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +#-------------------------------------------------------------------------- + +import re +import os.path +import sys +from io import open +from setuptools import find_packages, setup + +if sys.version_info < (3, 5, 3): + raise RuntimeError('Only python 3.5.3 or above is supported') + +# Change the PACKAGE_NAME only to change folder and different name +PACKAGE_NAME = "azure-eventhub-checkpointstoreblob-aio" +PACKAGE_PPRINT_NAME = "Event Hubs checkpointer implementation with Blob Storage" + +package_folder_path = "azure/eventhub/extensions/checkpointstoreblobaio" +namespace_name = "azure.eventhub.extensions.checkpointstoreblobaio" + +# Version extraction inspired from 'requests' +with open(os.path.join(package_folder_path, '__init__.py'), 'r') as fd: + version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', + fd.read(), re.MULTILINE).group(1) + +if not version: + raise RuntimeError('Cannot find version information') + +with open('README.md') as f: + readme = f.read() +with open('HISTORY.md') as f: + history = f.read() + +exclude_packages = [ + 'tests', + 'examples', + # Exclude packages that will be covered by PEP420 or nspkg + 'azure', + 'azure.eventhub', + 'azure.eventhub.extensions', + ] + +setup( + name=PACKAGE_NAME, + version=version, + description='Microsoft Azure {} Client Library for Python'.format(PACKAGE_PPRINT_NAME), + long_description=readme + '\n\n' + history, + long_description_content_type='text/markdown', + license='MIT License', + author='Microsoft Corporation', + author_email='azpysdkhelp@microsoft.com', + url='https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointerblob-aio', + classifiers=[ + 'Development Status :: 3 - Alpha', + 'Programming Language :: Python', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'License :: OSI Approved :: MIT License', + ], + zip_safe=False, + packages=find_packages(exclude=exclude_packages), + install_requires=[ + 'azure-storage-blob<13.0.0,>=12.0.0b2', + 'azure-eventhub<6.0.0,>=5.0.0b2', + ], + extras_require={ + + } +)