forked from faust-streaming/faust
-
Notifications
You must be signed in to change notification settings - Fork 0
/
withdrawals.py
executable file
·103 lines (79 loc) · 2.79 KB
/
withdrawals.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python
"""Withdrawal example.
Quick Start
===========
1) Start worker:
.. sourcecode:: console
$ ./examples/simple.py worker -l info
2) Start sending example data:
$ ./examples/simple.py produce
"""
import asyncio
import random
from datetime import datetime, timezone
from itertools import count
import faust
from faust.cli import option
class Withdrawal(faust.Record, isodates=True, serializer='json'):
user: str
country: str
amount: float
date: datetime = None
app = faust.App(
'faust-withdrawals',
version=5,
broker='aiokafka://',
store='rocksdb://',
origin='examples.withdrawals',
topic_partitions=4,
# processing_guarantee='exactly_once',
)
withdrawals_topic = app.topic('withdrawals', value_type=Withdrawal)
user_to_total = app.Table(
'user_to_total', default=int,
).tumbling(3600).relative_to_stream()
country_to_total = app.Table(
'country_to_total', default=int,
).tumbling(10.0, expires=10.0).relative_to_stream()
@app.agent(withdrawals_topic)
async def track_user_withdrawal(withdrawals):
async for withdrawal in withdrawals:
user_to_total[withdrawal.user] += withdrawal.amount
@app.agent()
async def track_country_withdrawal(withdrawals):
async for withdrawal in withdrawals.group_by(Withdrawal.country):
country_to_total[withdrawal.country] += withdrawal.amount
@app.command(
option('--max-latency',
type=float, default=0.5, envvar='PRODUCE_LATENCY',
help='Add delay of (at most) n seconds between publishing.'),
option('--max-messages',
type=int, default=None,
help='Send at most N messages or 0 for infinity.'),
)
async def produce(self, max_latency: float, max_messages: int):
"""Produce example Withdrawal events."""
for i, withdrawal in enumerate(generate_withdrawals(max_messages)):
await withdrawals_topic.send(key=withdrawal.user, value=withdrawal)
if not i % 10000:
self.say(f'+SEND {i}')
if max_latency:
await asyncio.sleep(random.uniform(0, max_latency))
def generate_withdrawals_dict(n: int = None):
num_countries = 5
countries = [f'country_{i}' for i in range(num_countries)]
country_dist = [0.9] + ([0.10 / num_countries] * (num_countries - 1))
num_users = 500
users = [f'user_{i}' for i in range(num_users)]
for _ in range(n) if n is not None else count():
yield {
'user': random.choice(users),
'amount': random.uniform(0, 25_000),
'country': random.choices(countries, country_dist)[0],
'date': datetime.utcnow().replace(tzinfo=timezone.utc).isoformat(),
}
def generate_withdrawals(n: int = None):
for d in generate_withdrawals_dict(n):
yield Withdrawal(**d)
if __name__ == '__main__':
app.main()