Skip to content

Commit 5578f47

Browse files
committed
gather
1 parent beb3cf9 commit 5578f47

File tree

3 files changed

+45
-10
lines changed

3 files changed

+45
-10
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ official python [selectors](https://docs.python.org/3/library/selectors.html) mo
4545
- [x] get_event_loop
4646
- [x] selectors block
4747
- [x] async sock
48-
- [ ] gather
48+
- [x] gather
49+
- [ ] creat server
50+
- [ ] cancel
4951

5052
## quote
5153

io_test.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,26 @@
33
import taio
44

55

6-
async def main():
6+
async def client(message):
77
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
88
sock.setblocking(False)
99
loop = taio.get_event_loop()
1010
await loop.sock_connect(sock, ('127.0.0.1', 6666))
11-
sock.send(b'hey server, Im taio')
11+
sock.send(message)
1212
data = await loop.sock_recv(sock, 1024)
1313
print(data)
1414
sock.close()
15+
return data
16+
17+
18+
async def main():
19+
cors = []
20+
for i in range(5):
21+
cors.append(client(f'{i} taio client'.encode('utf8')))
22+
datas = await taio.gather(*cors)
23+
print('All: ', datas)
1524

1625

1726
if __name__ == '__main__':
1827
loop = taio.get_event_loop()
19-
loop.create_task(main())
20-
loop.create_task(main())
21-
loop.create_task(main())
22-
loop.create_task(main())
23-
loop.create_task(main())
24-
loop.call_later(1.5, loop.stop)
25-
loop.run_forever()
28+
loop.run_until_complete(main())

taio/tasks.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,36 @@ def __wakeup(self, future):
3232
self.__step()
3333

3434

35+
def gather(*coros_or_futures):
36+
loop = events.get_event_loop()
37+
if not coros_or_futures:
38+
outer = loop.create_future()
39+
outer.set_result([])
40+
return outer
41+
42+
def _done_callback(fut):
43+
nonlocal nfinished
44+
nfinished += 1
45+
if nfinished == nfuts:
46+
results = []
47+
for c in children:
48+
results.append(c.result())
49+
outer.set_result(results)
50+
51+
arg_to_fut = {}
52+
nfuts = 0
53+
nfinished = 0
54+
children = []
55+
for arg in coros_or_futures:
56+
fut = loop.create_task(arg) # ensure future
57+
nfuts += 1
58+
arg_to_fut[arg] = fut
59+
fut.add_done_callback(_done_callback)
60+
children.append(fut)
61+
outer = loop.create_future()
62+
return outer
63+
64+
3565
async def sleep(delay):
3666
future = futures.Future()
3767
events.get_event_loop().call_later(delay, future.set_result, None)

0 commit comments

Comments
 (0)