diff --git a/README.md b/README.md index 88146de4..23b22e61 100644 --- a/README.md +++ b/README.md @@ -187,7 +187,7 @@ User.deserialize(avro_json_binary, serialization_type="avro-json", create_instan Under [examples](https://github.com/marcosschroh/dataclasses-avroschema/tree/master/examples) folder you can find 3 differents kafka examples, one with [aiokafka](https://github.com/aio-libs/aiokafka) (`async`) showing the simplest use case when a `AvroModel` instance is serialized and sent it thorught kafka, and the event is consumed. The other two examples are `sync` using the [kafka-python](https://github.com/dpkp/kafka-python) driver, where the `avro-json` serialization and `schema evolution` (`FULL` compatibility) is shown. -Also, there is one `redis` examples using `redis streams` with [walrus](https://github.com/coleifer/walrus). +Also, there are two `redis` examples using `redis streams` with [walrus](https://github.com/coleifer/walrus) and [redisgears-py](https://github.com/RedisGears/redisgears-py) ## Features @@ -200,7 +200,8 @@ Also, there is one `redis` examples using `redis streams` with [walrus](https:// * [X] Instance serialization correspondent to `avro schema` generated * [X] Data deserialization. Return python dict or class instance * [X] Generate json from python class instance -* [X] Examples of integration with [aiokafka](https://github.com/aio-libs/aiokafka), [kafka-python](https://github.com/dpkp/kafka-python) and [walrus](https://github.com/coleifer/walrus) (`redis`) +* [X] Examples of integration with `kafka` drivers: [aiokafka](https://github.com/aio-libs/aiokafka), [kafka-python](https://github.com/dpkp/kafka-python) +* [X] Example of integration with `redis` drivers: [walrus](https://github.com/coleifer/walrus) and [redisgears-py](https://github.com/RedisGears/redisgears-py) ## Development diff --git a/examples/README.md b/examples/README.md index 5d683222..33b1ae1b 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,6 +1,6 @@ # Examples of dataclasses-avroschema -You will find a series of example about how to use [dataclasses-avroschema](https://github.com/marcosschroh/dataclasses-avroschema) and the integration with python drivers like [aiokafka](https://github.com/aio-libs/aiokafka), [kafka-python](https://github.com/dpkp/kafka-python) in the case of `kafka` and [walrus](https://github.com/coleifer/walrus) in the case of `redis`. +You will find a series of example about how to use [dataclasses-avroschema](https://github.com/marcosschroh/dataclasses-avroschema) and the integration with python drivers like [aiokafka](https://github.com/aio-libs/aiokafka), [kafka-python](https://github.com/dpkp/kafka-python) in the case of `kafka` and [walrus](https://github.com/coleifer/walrus) and [redisgears-py](https://github.com/RedisGears/redisgears-py) in the case of `redis`. ## Requirements @@ -63,4 +63,8 @@ In the file [kafka-examples/schema_evolution_example.py](https://github.com/marc ### dataclasses-avroschema and redis streams with walrus -In the file [redis-examples/redis_stream_example.py](https://github.com/marcosschroh/dataclasses-avroschema/blob/master/examples/redis-examples/redis_stream_example.py) you will find a the simplest use case of `redis streams` using the driver `walrus`. We create a consumer group and messages are read one by one. +In the file [redis-examples/redis_stream_example.py](https://github.com/marcosschroh/dataclasses-avroschema/blob/master/examples/redis-examples/redis_stream_example.py) you will find a the simplest use case of `redis streams` using the driver `walrus`. We create a consumer group and messages are read one by one. Run it with `make redis-stream-example` + +### dataclasses-avroschema and RedisGears + +In the file [redis-examples/redis_gears_example.py](https://github.com/marcosschroh/dataclasses-avroschema/blob/master/examples/redis-examples/redis_stream_example.py) you will find an example of using `streaming` with `RedisGears`. As use case we produce records with random values and the consumer filters them by the `age` property. Run it with `make redis-gears-example`. diff --git a/examples/redis-examples/redis_gears_example.py b/examples/redis-examples/redis_gears_example.py index 0acca83a..28a05dc3 100644 --- a/examples/redis-examples/redis_gears_example.py +++ b/examples/redis-examples/redis_gears_example.py @@ -1,8 +1,6 @@ from gearsclient import GearsRemoteBuilder as GearsBuilder - -# import redis - from dataclasses import dataclass +import json import random from time import sleep @@ -26,27 +24,24 @@ class Meta: aliases = ["user-v1", "super user"] -def consume(conn): - result = GearsBuilder('StreamReader', r=conn).\ - map(lambda x:execute('hget', x, 'genres')).\ - filter(lambda x:x != '\\N').\ - flatmap(lambda x: x.split(',')).\ - map(lambda x: x.strip()).\ - countby().\ - run() - - print(result) +def consume(conn, stream_name): + """ + Consume messages from Stream and filter the events by age >= 25 + """ + return GearsBuilder("StreamReader", r=conn).\ + map(lambda x: (json.loads(x["value"]["message"]))).\ + filter(lambda x: x["age"] >= 25).\ + run(stream_name) -def produce(consumer_group): +def produce(stream): for i in range(10): - # create an instance of User v1 user = UserModel( name=random.choice(["Juan", "Peter", "Michael", "Moby", "Kim",]), age=random.randint(1, 50) ) - msgid = consumer_group.my_stream.add({"message": user.serialize()}) + msgid = stream.add({"message": user.serialize(serialization_type="avro-json")}) print(f"Producing message {msgid}") print("Producer finished....") @@ -56,13 +51,10 @@ def produce(consumer_group): if __name__ == "__main__": db = Database() - stream_name = 'my-stream' - db.Stream(stream_name) # Create a new stream instance + stream_name = "my_stream" + stream = db.Stream(stream_name) # Create a new stream instance - # create the consumer group - consumer_group = db.consumer_group('my-consumer-group-1', [stream_name]) - consumer_group.create() # Create the consumer group. - consumer_group.set_id('$') + produce(stream) + results = consume(db, stream_name) - produce(consumer_group) - consume(db) + print(results)