-
Notifications
You must be signed in to change notification settings - Fork 1
/
example_resp.py
155 lines (125 loc) · 4.51 KB
/
example_resp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from typing import Any, Generator, List, Optional, Tuple
import pytest
from asynkit import Monitor, await_sync
"""
Example of a stateful parser, implemented via recursive async calls,
and suspended via Monitor.
Can be used both in async and sync code.
An example of a `generator` based parser is also included, for comparison.
"""
pytestmark = pytest.mark.anyio
@pytest.fixture
def anyio_backend() -> str:
return "asyncio"
CRNL = b"\r\n"
def parse_resp_gen(
unparsed: bytes,
) -> Generator[Optional[Tuple[Any, bytes]], Optional[bytes], None]:
"""
This is a generator function that parses a Redis RESP string.
we only support ints and arrays of ints for simplicity
the generator approach maintains state between calls
Invoking a sub-generator requires some standard boilerplate.
"""
# wait until we have the first line
parts = unparsed.split(CRNL, 1)
while len(parts) == 1:
incoming = yield None
assert incoming is not None
unparsed += incoming
parts = unparsed.split(CRNL, 1)
line, unparsed = parts
cmd, value = line[:1], line[1:]
if cmd == b":":
yield int(value), unparsed
elif cmd == b"*":
count = int(value)
result_array: List[Any] = []
for _ in range(count):
# recursively parse each sub-object
# boilerplate required.
parser = parse_resp_gen(unparsed)
parsed = parser.send(None)
while parsed is None:
incoming = yield None
assert incoming is not None
parsed = parser.send(incoming)
item, unparsed = parsed
result_array.append(item)
yield result_array, unparsed
def test_generator() -> None:
"""
Test our generator based parser for Redis RESP strings.
"""
# construct a resp string consisting of an array of arrays of ints
resp = b"*2\r\n*3\r\n:1\r\n:2\r\n:3\r\n*2\r\n:4\r\n:5\r\n"
# split it up into chunks of two bytes
chunks = [resp[i : i + 2] for i in range(0, len(resp), 2)]
# create the parser
parser = parse_resp_gen(b"")
parsed = parser.send(None)
assert parsed is None
# send the chunks to the parser
while True:
parsed = parser.send(chunks.pop(0))
if parsed is not None:
break
assert parsed == ([[1, 2, 3], [4, 5]], b"")
async def parse_resp_mon(
monitor: Monitor[Tuple[Any, bytes]], unparsed: bytes
) -> Tuple[Any, bytes]:
"""
A Monitor based parser for Redis RESP strings.
"""
# get first line
parts = unparsed.split(CRNL, 1)
while len(parts) == 1:
# request more data
unparsed += await monitor.oob()
parts = unparsed.split(CRNL, 1)
line, unparsed = parts
cmd, value = line[:1], line[1:]
if cmd == b":":
return int(value), unparsed
elif cmd == b"*":
count = int(value)
result_array: List[Any] = []
# recursively parse each sub-object
# no boilerplate required.
for _ in range(count):
item, unparsed = await parse_resp_mon(monitor, unparsed)
result_array.append(item)
return result_array, unparsed
raise ValueError(f"unknown command '{cmd.decode()}'")
async def test_monitor() -> None:
"""Test our monitor based parser for Redis RESP strings."""
# construct a resp string consisting of an array of arrays of ints
resp = b"*2\r\n*3\r\n:1\r\n:2\r\n:3\r\n*2\r\n:4\r\n:5\r\n"
# split it up into chunks of two bytes
chunks = [resp[i : i + 2] for i in range(0, len(resp), 2)]
# create the parser
m: Monitor[Tuple[Any, bytes]] = Monitor()
parser = parse_resp_mon(m, b"")
await m.start(parser)
while True:
parsed = await m.try_await(parser, chunks.pop(0))
if parsed is not None:
break
assert parsed == ([[1, 2, 3], [4, 5]], b"")
def test_monitor_sync() -> None:
"""Test our monitor based parser for Redis RESP strings.
from synchronous code.
"""
# construct a resp string consisting of an array of arrays of ints
resp = b"*2\r\n*3\r\n:1\r\n:2\r\n:3\r\n*2\r\n:4\r\n:5\r\n"
# split it up into chunks of two bytes
chunks = [resp[i : i + 2] for i in range(0, len(resp), 2)]
# create the parser
m: Monitor[Tuple[Any, bytes]] = Monitor()
parser = parse_resp_mon(m, b"")
await_sync(m.start(parser))
while True:
parsed = await_sync(m.try_await(parser, chunks.pop(0)))
if parsed is not None:
break
assert parsed == ([[1, 2, 3], [4, 5]], b"")