Skip to content

Commit ac9f73e

Browse files
committed
Add GalaxySnail's write-up
1 parent 8b30334 commit ac9f73e

19 files changed

+2948
-0
lines changed

players/GalaxySnail/README.md

Lines changed: 1512 additions & 0 deletions
Large diffs are not rendered by default.

players/GalaxySnail/gist/ANSI.md

Lines changed: 346 additions & 0 deletions
Large diffs are not rendered by default.
6.54 KB
Loading
41.9 KB
Loading
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# pylint: disable=used-before-assignment
2+
import typing
3+
from typing import Literal, TypeVar, Protocol
4+
5+
if typing.TYPE_CHECKING:
6+
from _typeshed import SupportsWrite, ReadableBuffer
7+
8+
9+
T = TypeVar("T")
10+
T_co = TypeVar("T_co", covariant=True)
11+
T_contra = TypeVar("T_contra", contravariant=True)
12+
13+
# because of `AnyStr`, `typing.IO` can't save us
14+
class File(SupportsWrite[T_contra], Protocol):
15+
def close(self) -> None: ...
16+
def flush(self) -> None: ...
17+
18+
19+
raise RuntimeError("Never import it at runtime. Use typing.TYPE_CHECKING.")
20+
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""a simple TerminatedFrameReceiver by @njsmith.
2+
https://github.com/python-trio/trio/issues/796#issuecomment-471428274
3+
"""
4+
5+
import trio
6+
7+
_RECEIVE_SIZE = 4096 # pretty arbitrary
8+
9+
class TerminatedFrameReceiver:
10+
"""Parse frames out of a Trio stream, where each frame is terminated by a
11+
fixed byte sequence.
12+
13+
For example, you can parse newline-terminated lines by setting the
14+
terminator to b"\\n".
15+
16+
This uses some tricks to protect against denial of service attacks:
17+
18+
- It puts a limit on the maximum frame size, to avoid memory overflow; you
19+
might want to adjust the limit for your situation.
20+
21+
- It uses some algorithmic trickiness to avoid "slow loris" attacks. All
22+
algorithms are amortized O(n) in the length of the input.
23+
24+
"""
25+
def __init__(self, stream, terminator, max_frame_length=16384):
26+
self.stream = stream
27+
self.terminator = terminator
28+
self.max_frame_length = max_frame_length
29+
self._buf = bytearray()
30+
self._next_find_idx = 0
31+
32+
async def receive(self):
33+
while True:
34+
terminator_idx = self._buf.find(
35+
self.terminator, self._next_find_idx
36+
)
37+
if terminator_idx < 0:
38+
# no terminator found
39+
if len(self._buf) > self.max_frame_length:
40+
raise ValueError("frame too long")
41+
# next time, start the search where this one left off
42+
self._next_find_idx = max(0, len(self._buf) - len(self.terminator) + 1)
43+
# add some more data, then loop around
44+
more_data = await self.stream.receive_some(_RECEIVE_SIZE)
45+
if more_data == b"":
46+
if self._buf:
47+
raise ValueError("incomplete frame")
48+
raise trio.EndOfChannel
49+
self._buf += more_data
50+
else:
51+
# terminator found in buf, so extract the frame
52+
frame = self._buf[:terminator_idx]
53+
# Update the buffer in place, to take advantage of bytearray's
54+
# optimized delete-from-beginning feature.
55+
del self._buf[:terminator_idx+len(self.terminator)]
56+
# next time, start the search from the beginning
57+
self._next_find_idx = 0
58+
return frame
59+
60+
def __aiter__(self):
61+
return self
62+
63+
async def __anext__(self):
64+
try:
65+
return await self.receive()
66+
except trio.EndOfChannel:
67+
raise StopAsyncIteration
68+
69+
70+
def example():
71+
from trio.testing import memory_stream_pair
72+
async def main():
73+
sender_stream, receiver_stream = memory_stream_pair()
74+
75+
async def sender():
76+
await sender_stream.send_all(b"hello\r\n\r\n")
77+
await trio.sleep(1)
78+
await sender_stream.send_all(b"split-up ")
79+
await trio.sleep(1)
80+
await sender_stream.send_all(b"message\r\n\r")
81+
await trio.sleep(1)
82+
await sender_stream.send_all(b"\n")
83+
await trio.sleep(1)
84+
await sender_stream.send_all(b"goodbye\r\n\r\n")
85+
await trio.sleep(1)
86+
await sender_stream.aclose()
87+
88+
async def receiver():
89+
chan = TerminatedFrameReceiver(receiver_stream, b"\r\n\r\n")
90+
async for message in chan:
91+
print(f"Got message: {message!r}")
92+
93+
async with trio.open_nursery() as nursery:
94+
nursery.start_soon(sender)
95+
nursery.start_soon(receiver)
96+
97+
trio.run(main)
98+
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
from __future__ import annotations
2+
3+
from dataclasses import dataclass
4+
5+
import compiler
6+
from compiler import (
7+
Point, CodeIndex,
8+
NoArgCode, OneArgCode, TwoArgsCode, CommandCode
9+
)
10+
from utils import assert_never
11+
from code_gen import CodeGenerator
12+
13+
14+
import typing
15+
from typing import Tuple, Iterable
16+
from typing import BinaryIO, Literal, Final
17+
if typing.TYPE_CHECKING:
18+
from _protos import File as FileProto
19+
20+
21+
# 跳转表的写法没法从 mypy 的穷尽性检查中受益,所以放弃
22+
# CodeHandler = Callable[[CodeGenerator, CommandCode], None]
23+
24+
# mypy 的穷尽性检查甚至还不足以检查包含枚举的元组这样稍微复杂的情况,
25+
# 因此只能检查枚举是否穷尽,而无法检查枚举是否正确对应了参数 args 的类型
26+
27+
28+
@dataclass
29+
class Assembler:
30+
code_gen: CodeGenerator
31+
32+
def __enter__(self):
33+
return self
34+
35+
def __exit__(self, exc_type, exc_value, traceback):
36+
self.close()
37+
38+
def close(self):
39+
self.code_gen.close()
40+
41+
@classmethod
42+
def from_file(
43+
cls,
44+
file: FileProto,
45+
use_text_io: bool,
46+
*,
47+
close_when_exit: bool = False
48+
) -> "Assembler":
49+
code_gen = CodeGenerator(file, use_text_io,
50+
close_when_exit=close_when_exit)
51+
return cls(code_gen)
52+
53+
def _gen_code(self, code: CommandCode) -> None:
54+
code_index: CodeIndex
55+
args: tuple
56+
code_index, args = code
57+
g: CodeGenerator = self.code_gen
58+
59+
if code_index is CodeIndex.NOP:
60+
pass
61+
elif code_index is CodeIndex.UP:
62+
g.up(*args)
63+
elif code_index is CodeIndex.DOWN:
64+
g.down(*args)
65+
elif code_index is CodeIndex.LEFT:
66+
g.left(*args)
67+
elif code_index is CodeIndex.RIGHT:
68+
g.right(*args)
69+
elif code_index is CodeIndex.DROP:
70+
g.drop(*args)
71+
elif code_index is CodeIndex.GET:
72+
g.get(*args)
73+
elif code_index is CodeIndex.GOTO:
74+
g.goto(*args)
75+
elif code_index is CodeIndex.DROP_PLATE:
76+
g.drop_plate()
77+
elif code_index is CodeIndex.GET_PLATE:
78+
g.get_plate()
79+
elif code_index is CodeIndex.STAND_STILL:
80+
g.stand_still()
81+
elif code_index is CodeIndex.IF_GE_AND_GOTO:
82+
g.if_ge_and_goto(*args)
83+
else:
84+
# 穷尽性检查
85+
assert_never(code_index)
86+
87+
def __call__(self, codes: Iterable[CommandCode]) -> None:
88+
for code in codes:
89+
self._gen_code(code)
90+
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
from __future__ import annotations
2+
3+
from dataclasses import dataclass, InitVar
4+
5+
from utils import require_positive as _require_positive
6+
from utils import UnreachableError
7+
8+
9+
import typing
10+
from typing import Generic, TypeVar, cast
11+
if typing.TYPE_CHECKING:
12+
from _typeshed import ReadableBuffer
13+
from _protos import File as FileProto
14+
15+
16+
__all__ = ["CodeGenerator"]
17+
18+
T_AnyStr = TypeVar("T_AnyStr", str, "ReadableBuffer")
19+
20+
21+
@dataclass
22+
class CodeGenerator(Generic[T_AnyStr]):
23+
file: FileProto[T_AnyStr]
24+
use_text_io: InitVar[bool] = False
25+
close_when_exit: bool = False # kwonly
26+
27+
def __post_init__(self, use_text_io: bool) -> None:
28+
# use `setattr` instead of assignment to make mypy happy, see:
29+
# https://github.com/python/mypy/issues/2427#issuecomment-384229898
30+
if use_text_io:
31+
setattr(self, "_write", self._write_to_text_io)
32+
else:
33+
setattr(self, "_write", self._write_to_binary_io)
34+
35+
def __enter__(self):
36+
return self
37+
38+
def __exit__(self, exc_type, exc_value, traceback):
39+
self.close()
40+
41+
def close(self):
42+
self.file.flush()
43+
if self.close_when_exit:
44+
self.file.close()
45+
46+
def _write(self, data: str) -> None:
47+
"""此方法在 __post_init__ 方法中初始化"""
48+
raise UnreachableError(CodeGenerator._write.__doc__)
49+
50+
def _write_to_text_io(self, data: str):
51+
# XXX mypy can't check this, we cast it here
52+
file = cast("FileProto[str]", self.file)
53+
length = len(data)
54+
writed = 0
55+
while length != writed:
56+
writed += file.write(data[writed:])
57+
58+
def _write_to_binary_io(self, data: str):
59+
# XXX mypy can't check this, we cast it here
60+
bio = cast("FileProto[ReadableBuffer]", self.file)
61+
b = memoryview(data.encode("utf-8"))
62+
del data
63+
length = len(b)
64+
writed = 0
65+
while length != writed:
66+
writed += bio.write(b[writed:])
67+
68+
def _up(self, steps: int):
69+
self._write(f"向上 {steps}\n")
70+
71+
def up(self, steps: int):
72+
_require_positive(steps)
73+
self._up(steps)
74+
75+
def stand_still(self):
76+
self._up(0)
77+
78+
def down(self, steps: int):
79+
_require_positive(steps)
80+
self._write(f"向下 {steps}\n")
81+
82+
def left(self, steps: int):
83+
_require_positive(steps)
84+
self._write(f"向左 {steps}\n")
85+
86+
def right(self, steps: int):
87+
_require_positive(steps)
88+
self._write(f"向右 {steps}\n")
89+
90+
def drop(self, items: int):
91+
_require_positive(items)
92+
self._write(f"放下 {items} 个物品\n")
93+
94+
def get(self, items: int):
95+
_require_positive(items)
96+
self._write(f"拿起 {items} 个物品\n")
97+
98+
def drop_plate(self):
99+
self._write("放下盘子\n")
100+
101+
def get_plate(self):
102+
self._write("拿起盘子\n")
103+
104+
def _if_ge_and_goto(self, items: int, rel_lineno: int):
105+
if rel_lineno == 0:
106+
raise ValueError("`rel_lineno` requires a non-zero interger.")
107+
108+
self._write(f"如果手上的物品大于等于 {items} "
109+
f"向{'上' if rel_lineno < 0 else '下'}"
110+
f"跳转 {abs(rel_lineno)}\n")
111+
112+
def if_ge_and_goto(self, items: int, rel_lineno: int):
113+
if items <= 0:
114+
raise ValueError(
115+
f"`items` requires a positive integer, but got {items}.")
116+
117+
self._if_ge_and_goto(items, rel_lineno)
118+
119+
def goto(self, rel_lineno: int):
120+
self._if_ge_and_goto(0, rel_lineno)
121+

0 commit comments

Comments
 (0)