Skip to content

Commit

Permalink
Eventhubs blobstorage checkpointstore merge to preview3 (Azure#7109)
Browse files Browse the repository at this point in the history
  • Loading branch information
YijunXieMS authored Sep 7, 2019
1 parent 8e7e1c1 commit 13a8fe7
Show file tree
Hide file tree
Showing 15 changed files with 295 additions and 0 deletions.
Empty file.
21 changes: 21 additions & 0 deletions sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) Microsoft Corporation. All rights reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
include *.md
include azure/__init__.py
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "1.0.0b1"

from .blobstoragepm import BlobPartitionManager

__all__ = [
"BlobPartitionManager",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Iterable, Dict, Any
import logging
from collections import defaultdict
import asyncio
from azure.eventhub.aio.eventprocessor import PartitionManager, OwnershipLostError # type: ignore
from azure.core.exceptions import ResourceModifiedError, ResourceExistsError # type: ignore
from azure.storage.blob.aio import ContainerClient # type: ignore

logger = logging.getLogger(__name__)
UPLOAD_DATA = ""


class BlobPartitionManager(PartitionManager):
"""An PartitionManager that uses Azure Blob Storage to store the partition ownership and checkpoint data.
This class implements methods list_ownership, claim_ownership, and update_checkpoint that are defined in class
azure.eventhub.eventprocessor.PartitionManager of package azure-eventhub.
"""
def __init__(self, container_client: ContainerClient):
"""
:param container_client: The Azure Blob Storage Container client.
"""
self._container_client = container_client
self._cached_ownership_dict = defaultdict(dict) # type: Dict[str, Dict[str, Any]]
# lock each partition for list_ownership, claim_ownership and update_checkpoint etag doesn't get out of sync
# when the three methods are running concurrently
self._cached_ownership_locks = defaultdict(asyncio.Lock) # type:Dict[str, asyncio.Lock]

async def _upload_blob(self, ownership, metadata):
etag = ownership.get("etag")
if etag:
etag_match = {"if_match": etag}
else:
etag_match = {"if_none_match": '*'}
partition_id = ownership["partition_id"]
blob_client = await self._container_client.upload_blob(
name=partition_id, data=UPLOAD_DATA, overwrite=True, metadata=metadata, **etag_match
)
uploaded_blob_properties = await blob_client.get_blob_properties()
ownership["etag"] = uploaded_blob_properties.etag
ownership["last_modified_time"] = uploaded_blob_properties.last_modified.timestamp()

async def list_ownership(self, eventhub_name: str, consumer_group_name: str) -> Iterable[Dict[str, Any]]:
try:
blobs = self._container_client.list_blobs(include=['metadata'])
except Exception as err: # pylint:disable=broad-except
logger.warning("An exception occurred during list_ownership for eventhub %r consumer group %r. "
"Exception is %r", eventhub_name, consumer_group_name, err)
raise
async for b in blobs:
async with self._cached_ownership_locks[b.name]:
if b.name not in self._cached_ownership_dict \
or b.last_modified.timestamp() >= self._cached_ownership_dict[b.name].get("last_modified_time"):
metadata = b.metadata
ownership = {
"eventhub_name": eventhub_name,
"consumer_group_name": consumer_group_name,
"partition_id": b.name,
"owner_id": metadata["owner_id"],
"etag": b.etag,
"last_modified_time": b.last_modified.timestamp() if b.last_modified else None
}
ownership.update(metadata)
self._cached_ownership_dict[b.name] = ownership
return self._cached_ownership_dict.values()

async def claim_ownership(self, ownership_list: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
result = []
for ownership in ownership_list:
partition_id = ownership["partition_id"]
eventhub_name = ownership["eventhub_name"]
consumer_group_name = ownership["consumer_group_name"]
owner_id = ownership["owner_id"]

async with self._cached_ownership_locks[partition_id]:
metadata = {"owner_id": ownership["owner_id"]}
if "offset" in ownership:
metadata["offset"] = ownership["offset"]
if "sequence_number" in ownership:
metadata["sequence_number"] = ownership["sequence_number"]
try:
await self._upload_blob(ownership, metadata)
self._cached_ownership_dict[partition_id] = ownership
result.append(ownership)
except (ResourceModifiedError, ResourceExistsError):
logger.info(
"EventProcessor instance %r of eventhub %r consumer group %r lost ownership to partition %r",
owner_id, eventhub_name, consumer_group_name, partition_id)
except Exception as err: # pylint:disable=broad-except
logger.warning("An exception occurred when EventProcessor instance %r claim_ownership for "
"eventhub %r consumer group %r partition %r. The ownership is now lost. Exception "
"is %r", owner_id, eventhub_name, consumer_group_name, partition_id, err)
return result

async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id,
offset, sequence_number) -> None:
metadata = {
"owner_id": owner_id,
"offset": offset,
"sequence_number": str(sequence_number)
}
cached_ownership = self._cached_ownership_dict[partition_id]
async with self._cached_ownership_locks[partition_id]:
try:
await self._upload_blob(cached_ownership, metadata)
except (ResourceModifiedError, ResourceExistsError):
logger.info(
"EventProcessor instance %r of eventhub %r consumer group %r couldn't update_checkpoint to "
"partition %r because the ownership has been stolen",
owner_id, eventhub_name, consumer_group_name, partition_id)
raise OwnershipLostError()
except Exception as err:
logger.warning(
"EventProcessor instance %r of eventhub %r consumer group %r couldn't update_checkpoint to "
"partition %r because of unexpected error. Exception is %r",
owner_id, eventhub_name, consumer_group_name, partition_id, err)
raise # EventProcessor will catch the exception and handle it
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-e ../../eventhub/azure-eventhubs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import asyncio
import logging
import os
from azure.eventhub.aio import EventHubClient
from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor
from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager
from azure.storage.blob.aio import ContainerClient

RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout
RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]

logging.basicConfig(level=logging.INFO)


async def do_operation(event):
# do some sync or async operations. If the operation is i/o intensive, async will have better performance
print(event)


class MyPartitionProcessor(PartitionProcessor):
async def process_events(self, events, partition_context):
if events:
await asyncio.gather(*[do_operation(event) for event in events])
await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number)
else:
print("empty events received", "partition:", partition_context.partition_id)


if __name__ == '__main__':
loop = asyncio.get_event_loop()
client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL)
container_client = ContainerClient.from_connection_string(STORAGE_CONNECTION_STR, container="eventprocessor")
partition_manager = BlobPartitionManager(container_client=container_client)
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager, polling_interval=10)
try:
loop.run_until_complete(event_processor.start())
except KeyboardInterrupt:
loop.run_until_complete(event_processor.stop())
finally:
loop.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[packaging]
auto_update = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[bdist_wheel]
universal=1
75 changes: 75 additions & 0 deletions sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python

#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import re
import os.path
import sys
from io import open
from setuptools import find_packages, setup

if sys.version_info < (3, 5, 3):
raise RuntimeError('Only python 3.5.3 or above is supported')

# Change the PACKAGE_NAME only to change folder and different name
PACKAGE_NAME = "azure-eventhub-checkpointstoreblob-aio"
PACKAGE_PPRINT_NAME = "Event Hubs checkpointer implementation with Blob Storage"

package_folder_path = "azure/eventhub/extensions/checkpointstoreblobaio"
namespace_name = "azure.eventhub.extensions.checkpointstoreblobaio"

# Version extraction inspired from 'requests'
with open(os.path.join(package_folder_path, '__init__.py'), 'r') as fd:
version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
fd.read(), re.MULTILINE).group(1)

if not version:
raise RuntimeError('Cannot find version information')

with open('README.md') as f:
readme = f.read()
with open('HISTORY.md') as f:
history = f.read()

exclude_packages = [
'tests',
'examples',
# Exclude packages that will be covered by PEP420 or nspkg
'azure',
'azure.eventhub',
'azure.eventhub.extensions',
]

setup(
name=PACKAGE_NAME,
version=version,
description='Microsoft Azure {} Client Library for Python'.format(PACKAGE_PPRINT_NAME),
long_description=readme + '\n\n' + history,
long_description_content_type='text/markdown',
license='MIT License',
author='Microsoft Corporation',
author_email='azpysdkhelp@microsoft.com',
url='https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointerblob-aio',
classifiers=[
'Development Status :: 3 - Alpha',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'License :: OSI Approved :: MIT License',
],
zip_safe=False,
packages=find_packages(exclude=exclude_packages),
install_requires=[
'azure-storage-blob<13.0.0,>=12.0.0b2',
'azure-eventhub<6.0.0,>=5.0.0b2',
],
extras_require={

}
)

0 comments on commit 13a8fe7

Please sign in to comment.