forked from faust-streaming/faust
-
Notifications
You must be signed in to change notification settings - Fork 0
/
windowed_aggregation.py
76 lines (56 loc) · 1.62 KB
/
windowed_aggregation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from datetime import datetime, timedelta
from time import time
import random
import faust
class RawModel(faust.Record):
date: datetime
value: float
class AggModel(faust.Record):
date: datetime
count: int
mean: float
TOPIC = 'raw-event'
SINK = 'agg-event'
TABLE = 'tumbling_table'
KAFKA = 'kafka://localhost:9092'
CLEANUP_INTERVAL = 1.0
WINDOW = 10
WINDOW_EXPIRES = 1
PARTITIONS = 1
app = faust.App('windowed-agg', broker=KAFKA, version=1, topic_partitions=1)
app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=RawModel)
sink = app.topic(SINK, value_type=AggModel)
def window_processor(key, events):
timestamp = key[1][0]
values = [event.value for event in events]
count = len(values)
mean = sum(values) / count
print(
f'processing window:'
f'{len(values)} events,'
f'mean: {mean:.2f},'
f'timestamp {timestamp}',
)
sink.send_soon(value=AggModel(date=timestamp, count=count, mean=mean))
tumbling_table = (
app.Table(
TABLE,
default=list,
partitions=PARTITIONS,
on_window_close=window_processor,
)
.tumbling(WINDOW, expires=timedelta(seconds=WINDOW_EXPIRES))
.relative_to_field(RawModel.date)
)
@app.agent(source)
async def print_windowed_events(stream):
async for event in stream:
value_list = tumbling_table['events'].value()
value_list.append(event)
tumbling_table['events'] = value_list
@app.timer(0.1)
async def produce():
await source.send(value=RawModel(value=random.random(), date=int(time())))
if __name__ == '__main__':
app.main()