Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SchemaRegistry] add async version of avro serializer #21026

Merged
13 commits merged into from
Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- `auto_register_schemas` keyword argument has been added to `AvroSerializer`, which will allow for automatically registering schemas passed in to the `serialize`.
- `value` parameter in `serialize` on `AvroSerializer` takes type `Mapping` rather than `Dict`.
- Async version of `SchemaRegistryAvroSerializer` has been added under `azure.schemaregistry.serializer.avroserializer.aio`.

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# --------------------------------------------------------------------------
#
# Copyright (c) Microsoft Corporation. All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the ""Software""), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
from ._schema_registry_avro_serializer_async import AvroSerializer

__all__ = [
"AvroSerializer"
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# --------------------------------------------------------------------------
#
# Copyright (c) Microsoft Corporation. All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the ""Software""), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
swathipil marked this conversation as resolved.
Show resolved Hide resolved
from io import BytesIO
from typing import Any, Dict, Mapping
import avro

from .._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX
from .._avro_serializer import AvroObjectSerializer


class AvroSerializer(object):
"""
AvroSerializer provides the ability to serialize and deserialize data according
to the given avro schema. It would automatically register, get and cache the schema.

:keyword client: Required. The schema registry client
which is used to register schema and retrieve schema from the service.
:paramtype client: ~azure.schemaregistry.SchemaRegistryClient
swathipil marked this conversation as resolved.
Show resolved Hide resolved
:keyword str group_name: Required. Schema group under which schema should be registered.
:keyword bool auto_register_schemas: When true, register new schemas passed to serialize.
Otherwise, and by default, fail if it has not been pre-registered in the registry.
swathipil marked this conversation as resolved.
Show resolved Hide resolved

"""

def __init__(self, **kwargs):
# type: (Any) -> None
try:
self._schema_group = kwargs.pop("group_name")
self._schema_registry_client = kwargs.pop("client") # type: "SchemaRegistryClient"
except KeyError as e:
raise TypeError("'{}' is a required keyword.".format(e.args[0]))
self._avro_serializer = AvroObjectSerializer(codec=kwargs.get("codec"))
self._auto_register_schemas = kwargs.get("auto_register_schemas", False)
self._auto_register_schema_func = (
self._schema_registry_client.register_schema
if self._auto_register_schemas
else self._schema_registry_client.get_schema_id
)

async def __aenter__(self):
# type: () -> SchemaRegistryAvroSerializer
await self._schema_registry_client.__aenter__()
return self

async def __aexit__(self, *exc_details):
# type: (Any) -> None
await self._schema_registry_client.__exit__(*exc_details)
swathipil marked this conversation as resolved.
Show resolved Hide resolved

async def close(self):
# type: () -> None
"""This method is to close the sockets opened by the client.
It need not be used when using with a context manager.
"""
await self._schema_registry_client.close()

#@lru_cache(maxsize=128)
swathipil marked this conversation as resolved.
Show resolved Hide resolved
async def _get_schema_id(self, schema_name, schema_str, **kwargs):
# type: (str, str, Any) -> str
"""
Get schema id from local cache with the given schema.
If there is no item in the local cache, get schema id from the service and cache it.

:param schema_name: Name of the schema
:type schema_name: str
:param schema: Schema object
:type schema: avro.schema.Schema
swathipil marked this conversation as resolved.
Show resolved Hide resolved
:return: Schema Id
:rtype: str
"""
schema_properties = await self._auto_register_schema_func(
self._schema_group, schema_name, "Avro", schema_str, **kwargs
)
return schema_properties.schema_id

#@lru_cache(maxsize=128)
async def _get_schema(self, schema_id, **kwargs):
# type: (str, Any) -> str
"""
Get schema content from local cache with the given schema id.
If there is no item in the local cache, get schema from the service and cache it.

:param str schema_id: Schema id
:return: Schema content
"""
schema = await self._schema_registry_client.get_schema(
schema_id, **kwargs
)
return schema.schema_content

@classmethod
@lru_cache(maxsize=128)
def _parse_schema(cls, schema):
return avro.schema.parse(schema)
swathipil marked this conversation as resolved.
Show resolved Hide resolved

async def serialize(self, value, **kwargs):
# type: (Mapping[str, Any], Any) -> bytes
"""
Encode data with the given schema. The returns bytes are consisted of: The first 4 bytes
denoting record format identifier. The following 32 bytes denoting schema id returned by schema registry
service. The remaining bytes are the real data payload.

:param value: The data to be encoded.
:type value: Mapping[str, Any]
:keyword schema: Required. The schema used to encode the data.
:paramtype schema: str
:rtype: bytes
"""
try:
raw_input_schema = kwargs.pop("schema")
except KeyError as e:
raise TypeError("'{}' is a required keyword.".format(e.args[0]))

cached_schema = AvroSerializer._parse_schema(raw_input_schema)
record_format_identifier = b"\0\0\0\0"
schema_id = await self._get_schema_id(cached_schema.fullname, str(cached_schema), **kwargs)
data_bytes = self._avro_serializer.serialize(value, cached_schema)

stream = BytesIO()

stream.write(record_format_identifier)
stream.write(schema_id.encode("utf-8"))
stream.write(data_bytes)
stream.flush()

payload = stream.getvalue()
stream.close()
return payload

async def deserialize(self, value, **kwargs):
# type: (bytes, Any) -> Dict[str, Any]
"""
Decode bytes data.

:param bytes value: The bytes data needs to be decoded.
:rtype: Dict[str, Any]
"""
# record_format_identifier = data[0:4] # The first 4 bytes are retained for future record format identifier.
schema_id = value[
SCHEMA_ID_START_INDEX : (SCHEMA_ID_START_INDEX + SCHEMA_ID_LENGTH)
].decode("utf-8")
schema_content = await self._get_schema(schema_id, **kwargs)

dict_value = self._avro_serializer.deserialize(
value[DATA_START_INDEX:], schema_content
)
return dict_value
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# --------------------------------------------------------------------------
#
# Copyright (c) Microsoft Corporation. All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the ""Software""), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
import os
import asyncio

from azure.identity.aio import ClientSecretCredential
from azure.schemaregistry.aio import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer.aio import AvroSerializer

TENANT_ID=os.environ['AZURE_TENANT_ID']
CLIENT_ID=os.environ['AZURE_CLIENT_ID']
CLIENT_SECRET=os.environ['AZURE_CLIENT_SECRET']

SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE=os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME=os.environ['SCHEMAREGISTRY_GROUP']
SCHEMA_STRING = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""


token_credential = ClientSecretCredential(
tenant_id=TENANT_ID,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET
)


async def serialize(serializer):
dict_data_ben = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"}
dict_data_alice = {"name": u"Alice", "favorite_number": 15, "favorite_color": u"green"}

# Schema would be automatically registered into Schema Registry and cached locally.
payload_ben = await serializer.serialize(dict_data_ben, schema=SCHEMA_STRING)
# The second call won't trigger a service call.
payload_alice = await serializer.serialize(dict_data_alice, schema=SCHEMA_STRING)

print('Encoded bytes are: ', payload_ben)
print('Encoded bytes are: ', payload_alice)
return [payload_ben, payload_alice]


async def deserialize(serializer, bytes_payload):
# serializer.deserialize would extract the schema id from the payload,
# retrieve schema from Schema Registry and cache the schema locally.
# If the schema id is the local cache, the call won't trigger a service call.
dict_data = await serializer.deserialize(bytes_payload)

print('Deserialized data is: ', dict_data)
return dict_data


async def main():
schema_registry = SchemaRegistryClient(endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=token_credential)
serializer = AvroSerializer(client=schema_registry, group_name=GROUP_NAME, auto_register_schemas=True)
async with schema_registry:
bytes_data_ben, bytes_data_alice = await serialize(serializer)
dict_data_ben = await deserialize(serializer, bytes_data_ben)
dict_data_alice = await deserialize(serializer, bytes_data_alice)
await schema_registry.close()

if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Examples to show receiving events from EventHub with AvroSerializer integrated for data deserialization.
"""

# pylint: disable=C0111
import os
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
from azure.schemaregistry.aio import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer.aio import AvroSerializer

EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME = os.environ['SCHEMAREGISTRY_GROUP']


# create an EventHubConsumerClient instance
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=EVENTHUB_CONNECTION_STR,
consumer_group='$Default',
eventhub_name=EVENTHUB_NAME,
)
# create a AvroSerializer instance
# TODO: after 'azure-schemaregistry==1.0.0b3' is released, update 'endpoint' to 'fully_qualified_namespace'
avro_serializer = AvroSerializer(
client=SchemaRegistryClient(
endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE,
credential=DefaultAzureCredential()
),
group_name=GROUP_NAME,
auto_register_schemas=True
)

async def on_event(partition_context, event):
print("Received event from partition: {}.".format(partition_context.partition_id))

bytes_payload = b"".join(b for b in event.body)
print('The received bytes of the EventData is {}.'.format(bytes_payload))

# Use the deserialize method to convert bytes to dict object.
# The deserialize method would extract the schema id from the payload, and automatically retrieve the Avro Schema
# from the Schema Registry Service. The schema would be cached locally for future usage.
deserialized_data = await avro_serializer.deserialize(bytes_payload)
print('The dict data after deserialization is {}'.format(deserialized_data))


async def main():
try:
async with eventhub_consumer, avro_serializer:
await eventhub_consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
except KeyboardInterrupt:
print('Stopped receiving.')

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Loading