Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Schema Registry + Avro Serializer] 1.0.0b1 #13124

Merged
merged 36 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
56bf5eb
init commit
yunhaoling Aug 14, 2020
d1c96f9
avro serializer structure
yunhaoling Aug 18, 2020
6311e92
adding avro serializer
yunhaoling Aug 20, 2020
0096ef2
tweak api version and fix a typo
yunhaoling Aug 20, 2020
2e95001
test template
yunhaoling Aug 21, 2020
2e8bcc7
avro serializer sync draft
yunhaoling Aug 22, 2020
6248ff1
major azure sr client work done
yunhaoling Aug 25, 2020
f97478e
add sample docstring for sr
yunhaoling Aug 25, 2020
3cf6459
avro serializer async impl
yunhaoling Aug 26, 2020
58fb59f
close the writer
yunhaoling Aug 26, 2020
02c60ec
update avro se/de impl
yunhaoling Aug 27, 2020
85d6766
update avro serializer impl
yunhaoling Aug 27, 2020
5fa4b43
fix apireview reported error in sr
yunhaoling Aug 27, 2020
b910027
srav namespace, setup update
yunhaoling Aug 27, 2020
6b6c8b2
doc update
yunhaoling Aug 28, 2020
c465be1
update doc and api
yunhaoling Aug 30, 2020
63a278c
impl, doc update
yunhaoling Aug 31, 2020
dc363f4
partial update according to laruent's feedback
yunhaoling Sep 1, 2020
740de0e
be consistent with eh extension structure
yunhaoling Sep 1, 2020
7734c42
more update code according to feedback
yunhaoling Sep 1, 2020
92cd385
update credential config
yunhaoling Sep 1, 2020
1c60676
rename package name to azure-schemaregistry-avroserializer
yunhaoling Sep 1, 2020
f20bba0
fix pylint
yunhaoling Sep 1, 2020
c81f16b
try ci fix
yunhaoling Sep 2, 2020
41ee64b
fix test for py27 as avro only accept unicode
yunhaoling Sep 3, 2020
2675331
first round of review feedback
yunhaoling Sep 3, 2020
fb0e6f9
remove temp ci experiment
yunhaoling Sep 3, 2020
0260ea3
init add conftest.py to pass py2.7 test
yunhaoling Sep 3, 2020
bb687cb
laurent feedback update
yunhaoling Sep 3, 2020
d8e0986
remove dictmixin for b1, update comment in sample
yunhaoling Sep 3, 2020
b91fb4f
update api in avroserializer and update test and readme
yunhaoling Sep 4, 2020
929ee68
update test, docs and links
yunhaoling Sep 4, 2020
8fbed90
add share requirement
yunhaoling Sep 4, 2020
01a39a7
update avro dependency
yunhaoling Sep 4, 2020
bde3c24
pr feedback and livetest update
yunhaoling Sep 4, 2020
a2903b6
Merge remote-tracking branch 'central/master' into sr-dev
yunhaoling Sep 4, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
impl, doc update
  • Loading branch information
yunhaoling committed Aug 31, 2020
commit 63a278c5b522c945e3b8dee50d1991cd974273d7
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Release History

## 1.0.0b1 (Unreleased)

Version 1.0.0b1 is the first preview of our efforts to create a user-friendly and Pythonic client library for Azure Schema Registry Avro Serializer.
83 changes: 80 additions & 3 deletions sdk/schemaregistry/azure-schemaregistry-avro-serializer/README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,92 @@
# Azure Schema Registry Avro Serializer library for Python

Azure Schema Registry Avro Serializer is a ...
<<This library is under development>>
Azure Schema Registry Avro Serializer provides the ability to serialize and deserialize data according
to the given avro schema. It would automatically register, get and cache the schema.

## Getting started

### Install the package

Install the Azure Service Bus client library for Python with [pip][pip]:

```Bash
pip install azure-schemaregistry-avro-serializer
```

### Prerequisites:
To use this package, you must have:
* Azure subscription - [Create a free account][azure_sub]
* Azure Schema Registry
* Python 2.7, 3.5 or later - [Install Python][python]

## Key concepts

- Avro: Apache Avro™ is a data serialization system.

## Examples

The following sections provide several code snippets covering some of the most common Schema Registry tasks, including:

- [Serialization](serialization)
- [Deserialization](deserialization)

### Serialization

```python
import os
from azure.schemaregistry.serializer.avro_serializer import SchemaRegistryAvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
endpoint = os.environ['SCHEMA_REGISTRY_ENDPOINT']
schema_group = "<your-group-name>"

serializer = SchemaRegistryAvroSerializer(endpoint, token_credential, schema_group)

schema_string = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""

with serializer:
dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
encoded_bytes = serializer.serialize(dict_data, schema_string)
```

### Deserialization

```python
import os
from azure.schemaregistry.serializer.avro_serializer import SchemaRegistryAvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
endpoint = os.environ['SCHEMA_REGISTRY_ENDPOINT']
schema_group = "<your-group-name>"

serializer = SchemaRegistryAvroSerializer(endpoint, token_credential, schema_group)

with serializer:
encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
decoded_data = serializer.deserialize(encoded_bytes)
```

## Troubleshooting

Azure Schema Registry Avro Serializer raise exceptions defined in [Azure Core][azure_core].

## Next steps

### More sample code

Please find further examples in the [samples](./samples) directory demonstrating common Azure Schema Registry Avro Serializer scenarios.

## Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a
Expand All @@ -27,4 +101,7 @@ This project has adopted the [Microsoft Open Source Code of Conduct](https://ope
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or
contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.

<!-- LINKS -->
[pip]: https://pypi.org/project/pip/
[python]: https://www.python.org/downloads/
[azure_core]: https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/core/azure-core/README.md
[azure_sub]: https://azure.microsoft.com/free/
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
#
# --------------------------------------------------------------------------
import abc
from typing import BinaryIO, Union, Type, TypeVar, Optional, Any
from typing import BinaryIO, Union, Type, TypeVar, Optional, Any, Dict
from io import BytesIO
import avro
from avro.io import DatumWriter, DatumReader, BinaryDecoder, BinaryEncoder

try:
Expand All @@ -42,37 +43,41 @@ def __init__(self, codec=None):
"""A Avro serializer using avro lib from Apache.
:param str codec: The writer codec. If None, let the avro library decides.
"""
try:
import avro # pylint: disable=unused-import
except ImportError:
raise ImportError("In order to create a AvroObjectSerializer you need to install the 'avro' library")

self._writer_codec = codec
self._schema_writer_cache = {} # type: Dict[str, DatumWriter]
self._schema_reader_cache = {} # type: Dict[str, DatumReader]

def serialize(
self,
stream, # type: BinaryIO
value, # type: ObjectType
data, # type: ObjectType
schema, # type: Optional[Any]
):
# type: (...) -> None
# type: (...) -> bytes
"""Convert the provided value to it's binary representation and write it to the stream.
Schema must be a Avro RecordSchema:
https://avro.apache.org/docs/1.10.0/gettingstartedpython.html#Defining+a+schema
:param stream: A stream of bytes or bytes directly
:type stream: BinaryIO
:param value: An object to serialize
:param data: An object to serialize
:param schema: A Avro RecordSchema
"""
if not schema:
raise ValueError("Schema is required in Avro serializer.")

kwargs = {}
if self._writer_codec:
kwargs['codec'] = self._writer_codec
if not isinstance(schema, avro.schema.Schema):
schema = avro.schema.parse(schema)

try:
writer = self._schema_writer_cache[str(schema)]
except KeyError:
writer = DatumWriter(schema)
self._schema_writer_cache[str(schema)] = writer

stream = BytesIO()

writer = DatumWriter(schema) # TODO: cache it
writer.write(value, BinaryEncoder(stream))
writer.write(data, BinaryEncoder(stream))
encoded_data = stream.getvalue()

stream.close()
return encoded_data

def deserialize(
self,
Expand All @@ -87,15 +92,23 @@ def deserialize(
:type data: BinaryIO or bytes
:param schema: A Avro RecordSchema
:param return_type: Return type is not supported in the Avro serializer.
:returns: An instanciated object
:returns: An instantiated object
:rtype: ObjectType
"""
if not hasattr(data, 'read'):
data = BytesIO(data)

avro_reader = DatumReader(writers_schema=schema) # TODO: cache it
if not isinstance(schema, avro.schema.Schema):
schema = avro.schema.parse(schema)

try:
reader = self._schema_reader_cache[str(schema)]
except KeyError:
reader = DatumReader(writers_schema=schema)
self._schema_reader_cache[str(schema)] = reader

bin_decoder = BinaryDecoder(data)
decoded_data = avro_reader.read(bin_decoder)
decoded_data = reader.read(bin_decoder)
data.close()

return decoded_data
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@

class SchemaRegistryAvroSerializer(object):
"""
SchemaRegistryAvroSerializer provides the ability to serialize and deserialize data according
to the given avro schema. It would automatically register, get and cache the schema.

:param str endpoint: The Schema Registry service endpoint, for example my-namespace.servicebus.windows.net.
:param credential: To authenticate to manage the entities of the SchemaRegistry namespace.
:type credential: TokenCredential
:param str schema_group
:param str schema_group: Schema group under which schema should be registered.

"""
def __init__(self, credential, endpoint, schema_group, **kwargs):
def __init__(self, endpoint, credential, schema_group, **kwargs):
self._schema_group = schema_group
self._avro_serializer = AvroObjectSerializer()
self._schema_registry_client = SchemaRegistryClient(credential=credential, endpoint=endpoint)
Expand Down Expand Up @@ -101,28 +103,26 @@ def _get_schema(self, schema_id):
return schema_str

def serialize(self, data, schema):
# type: (Dict[str, Any], Union[str, bytes, avro.schema.Schema]) -> bytes
# type: (Dict[str, Any], Union[str, bytes]) -> bytes
"""
Encode dict data with the given schema.

:param data: The dict data to be encoded.
:param schema: The schema used to encode the data. # TODO: support schema object/str/bytes?
:type schema: Union[str, bytes, avro.schema.Schema])
:type schema: Union[str, bytes]
:return:
"""
if not isinstance(schema, avro.schema.Schema):
schema = avro.schema.parse(schema)
# TODO: schema_id to datumwrtier cache

schema_id = self._get_schema_id(schema.fullname, str(schema))
stream = BytesIO()
self._avro_serializer.serialize(stream, data, schema)
data_bytes = self._avro_serializer.serialize(data, schema)
# TODO: Arthur: We are adding 4 bytes to the beginning of each SR payload.
# This is intended to become a record format identifier in the future.
# Right now, you can just put \x00\x00\x00\x00.
record_format_identifier = b'\0\0\0\0'
res = record_format_identifier + schema_id.encode('utf-8') + stream.getvalue() # TODO: should we use struck.pack and unpack for interoperability, could be a cross-language problem
stream.close()
return res
payload = record_format_identifier + schema_id.encode('utf-8') + data_bytes # TODO: should we use struck.pack and unpack for interoperability, could be a cross-language problem
return payload

def deserialize(self, data):
# type: (bytes) -> Dict[str, Any]
Expand All @@ -139,7 +139,5 @@ def deserialize(self, data):
schema_id = data[4:36].decode('utf-8')
schema_content = self._get_schema(schema_id)

# TODO: schema_id to datumreader cache
schema = avro.schema.parse(schema_content)
dict_data = self._avro_serializer.deserialize(data[36:], schema)
dict_data = self._avro_serializer.deserialize(data[36:], schema_content)
return dict_data
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@

class SchemaRegistryAvroSerializer:
"""
SchemaRegistryClient is as a central schema repository for enterprise-level data infrastructure,
complete with support for versioning and management.
SchemaRegistryAvroSerializer provides the ability to serialize and deserialize data according
to the given avro schema. It would automatically register, get and cache the schema.

:param str endpoint: The Schema Registry service endpoint, for example my-namespace.servicebus.windows.net.
:param credential: To authenticate to manage the entities of the SchemaRegistry namespace.
:type credential: TokenCredential
:param str schema_group

"""
def __init__(self, credential, endpoint, schema_group, **kwargs):
def __init__(self, endpoint, credential, schema_group, **kwargs):
self._schema_group = schema_group
self._avro_serializer = AvroObjectSerializer()
self._schema_registry_client = SchemaRegistryClient(credential=credential, endpoint=endpoint)
self._schema_registry_client = SchemaRegistryClient(endpoint=endpoint, credential=credential)
self._id_to_schema = {}
self._schema_to_id = {}

Expand Down Expand Up @@ -98,28 +98,26 @@ async def _get_schema(self, schema_id: str) -> str:
self._schema_to_id[schema_str] = schema_id
return schema_str

async def serialize(self, data: Dict[str, Any], schema: Union[str, bytes, avro.schema.Schema]) -> bytes:
async def serialize(self, data: Dict[str, Any], schema: Union[str, bytes]) -> bytes:
"""
Encode dict data with the given schema.

:param data: The dict data to be encoded.
:param schema: The schema used to encode the data. # TODO: support schema object/str/bytes?
:type schema: Union[str, bytes, avro.schema.Schema])
:type schema: Union[str, bytes]
:return:
"""
if not isinstance(schema, avro.schema.Schema):
schema = avro.schema.parse(schema)

schema_id = await self._get_schema_id(schema.fullname, str(schema))
stream = BytesIO()
self._avro_serializer.serialize(stream, data, schema)
data_bytes = self._avro_serializer.serialize(data, schema)
# TODO: Arthur: We are adding 4 bytes to the beginning of each SR payload.
# This is intended to become a record format identifier in the future.
# Right now, you can just put \x00\x00\x00\x00.
record_format_identifier = b'\0\0\0\0'
res = record_format_identifier + schema_id.encode('utf-8') + stream.getvalue()
stream.close()
return res
payload = record_format_identifier + schema_id.encode('utf-8') + data_bytes
return payload

async def deserialize(self, data: bytes) -> Dict[str, Any]:
"""
Expand All @@ -135,7 +133,6 @@ async def deserialize(self, data: bytes) -> Dict[str, Any]:
schema_id = data[4:36].decode('utf-8')
schema_content = await self._get_schema(schema_id)

# TODO: schema_id to datumreader cache
schema = avro.schema.parse(schema_content)
dict_data = self._avro_serializer.deserialize(data[36:], schema)
return dict_data
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
-e ../../identity/azure-identity
-e ../../../tools/azure-devtools
-e ../../../tools/azure-sdk-tools
../azure-schemaregistry
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
---
page_type: sample
languages:
- python
products:
- azure
- azure-schema-registry-avro-serializer
urlFragment: schemaregistry-avro-serializer-samples
---

# Azure Schema Registry Avro Serializer library for Python Samples

These are code samples that show common scenario operations with the Schema Registry Avro Serializer library.
The async versions of the samples (the python sample files appended with `_async`) show asynchronous operations,
and require Python 3.5 or later.

Several Schema Registry Avro Serializer Python SDK samples are available to you in the SDK's GitHub repository. These samples provide example code for additional scenarios commonly encountered while working with Schema Registry Avro Serializer:

* [avro_serializer.py](./sync_samples/avro_serializer.py) ([async version](./async_samples/avro_serializer_async.py)) - Examples for common Schema Registry Avro Serializer tasks:
* Serialize data according to the given schema
* Deserialize data

## Prerequisites
- Python 2.7, 3.5 or later.
- **Microsoft Azure Subscription:** To use Azure services, including Azure Schema Registry, you'll need a subscription.
If you do not have an existing Azure account, you may sign up for a free trial or use your MSDN subscriber benefits when you [create an account](https://account.windowsazure.com/Home/Index).

## Setup

1. Install the Azure Schema Registry client library for Python with [pip](https://pypi.org/project/pip/):

```bash
pip install azure-schemaregistry-avro-serializer
```

2. Clone or download this sample repository
3. Open the sample folder in Visual Studio Code or your IDE of choice.

## Running the samples

1. Open a terminal window and `cd` to the directory that the samples are saved in.
2. Set the environment variables specified in the sample file you wish to run.
3. Follow the usage described in the file, e.g. `python avro_serializer.py`

## Next steps

Check out the [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-schemaregistry-avro-serializer/latest/index.html) to learn more about
what you can do with the Azure Schema Registry Avro Serializer library.
Loading