Skip to content

Commit

Permalink
doc: redis gears example added (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh authored Aug 28, 2020
1 parent 08ede81 commit 7c05d67
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 28 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ User.deserialize(avro_json_binary, serialization_type="avro-json", create_instan

Under [examples](https://github.com/marcosschroh/dataclasses-avroschema/tree/master/examples) folder you can find 3 differents kafka examples, one with [aiokafka](https://github.com/aio-libs/aiokafka) (`async`) showing the simplest use case when a `AvroModel` instance is serialized and sent it thorught kafka, and the event is consumed.
The other two examples are `sync` using the [kafka-python](https://github.com/dpkp/kafka-python) driver, where the `avro-json` serialization and `schema evolution` (`FULL` compatibility) is shown.
Also, there is one `redis` examples using `redis streams` with [walrus](https://github.com/coleifer/walrus).
Also, there are two `redis` examples using `redis streams` with [walrus](https://github.com/coleifer/walrus) and [redisgears-py](https://github.com/RedisGears/redisgears-py)

## Features

Expand All @@ -200,7 +200,8 @@ Also, there is one `redis` examples using `redis streams` with [walrus](https://
* [X] Instance serialization correspondent to `avro schema` generated
* [X] Data deserialization. Return python dict or class instance
* [X] Generate json from python class instance
* [X] Examples of integration with [aiokafka](https://github.com/aio-libs/aiokafka), [kafka-python](https://github.com/dpkp/kafka-python) and [walrus](https://github.com/coleifer/walrus) (`redis`)
* [X] Examples of integration with `kafka` drivers: [aiokafka](https://github.com/aio-libs/aiokafka), [kafka-python](https://github.com/dpkp/kafka-python)
* [X] Example of integration with `redis` drivers: [walrus](https://github.com/coleifer/walrus) and [redisgears-py](https://github.com/RedisGears/redisgears-py)

## Development

Expand Down
8 changes: 6 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Examples of dataclasses-avroschema

You will find a series of example about how to use [dataclasses-avroschema](https://github.com/marcosschroh/dataclasses-avroschema) and the integration with python drivers like [aiokafka](https://github.com/aio-libs/aiokafka), [kafka-python](https://github.com/dpkp/kafka-python) in the case of `kafka` and [walrus](https://github.com/coleifer/walrus) in the case of `redis`.
You will find a series of example about how to use [dataclasses-avroschema](https://github.com/marcosschroh/dataclasses-avroschema) and the integration with python drivers like [aiokafka](https://github.com/aio-libs/aiokafka), [kafka-python](https://github.com/dpkp/kafka-python) in the case of `kafka` and [walrus](https://github.com/coleifer/walrus) and [redisgears-py](https://github.com/RedisGears/redisgears-py) in the case of `redis`.

## Requirements

Expand Down Expand Up @@ -63,4 +63,8 @@ In the file [kafka-examples/schema_evolution_example.py](https://github.com/marc

### dataclasses-avroschema and redis streams with walrus

In the file [redis-examples/redis_stream_example.py](https://github.com/marcosschroh/dataclasses-avroschema/blob/master/examples/redis-examples/redis_stream_example.py) you will find a the simplest use case of `redis streams` using the driver `walrus`. We create a consumer group and messages are read one by one.
In the file [redis-examples/redis_stream_example.py](https://github.com/marcosschroh/dataclasses-avroschema/blob/master/examples/redis-examples/redis_stream_example.py) you will find a the simplest use case of `redis streams` using the driver `walrus`. We create a consumer group and messages are read one by one. Run it with `make redis-stream-example`

### dataclasses-avroschema and RedisGears

In the file [redis-examples/redis_gears_example.py](https://github.com/marcosschroh/dataclasses-avroschema/blob/master/examples/redis-examples/redis_stream_example.py) you will find an example of using `streaming` with `RedisGears`. As use case we produce records with random values and the consumer filters them by the `age` property. Run it with `make redis-gears-example`.
40 changes: 16 additions & 24 deletions examples/redis-examples/redis_gears_example.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from gearsclient import GearsRemoteBuilder as GearsBuilder

# import redis

from dataclasses import dataclass
import json
import random
from time import sleep

Expand All @@ -26,27 +24,24 @@ class Meta:
aliases = ["user-v1", "super user"]


def consume(conn):
result = GearsBuilder('StreamReader', r=conn).\
map(lambda x:execute('hget', x, 'genres')).\
filter(lambda x:x != '\\N').\
flatmap(lambda x: x.split(',')).\
map(lambda x: x.strip()).\
countby().\
run()

print(result)
def consume(conn, stream_name):
"""
Consume messages from Stream and filter the events by age >= 25
"""
return GearsBuilder("StreamReader", r=conn).\
map(lambda x: (json.loads(x["value"]["message"]))).\
filter(lambda x: x["age"] >= 25).\
run(stream_name)


def produce(consumer_group):
def produce(stream):
for i in range(10):
# create an instance of User v1
user = UserModel(
name=random.choice(["Juan", "Peter", "Michael", "Moby", "Kim",]),
age=random.randint(1, 50)
)

msgid = consumer_group.my_stream.add({"message": user.serialize()})
msgid = stream.add({"message": user.serialize(serialization_type="avro-json")})
print(f"Producing message {msgid}")

print("Producer finished....")
Expand All @@ -56,13 +51,10 @@ def produce(consumer_group):

if __name__ == "__main__":
db = Database()
stream_name = 'my-stream'
db.Stream(stream_name) # Create a new stream instance
stream_name = "my_stream"
stream = db.Stream(stream_name) # Create a new stream instance

# create the consumer group
consumer_group = db.consumer_group('my-consumer-group-1', [stream_name])
consumer_group.create() # Create the consumer group.
consumer_group.set_id('$')
produce(stream)
results = consume(db, stream_name)

produce(consumer_group)
consume(db)
print(results)

0 comments on commit 7c05d67

Please sign in to comment.