Skip to content

Commit f1df27d

Browse files
dicejbongjunj
authored andcommitted
don't delete sync-lowered subtasks unless they've exited (bytecodealliance#11655)
Previously, we were unconditionally deleting the callee subtask once it returned a value to a sync-lowered call, but that's only appropriate if the subtask has exited. Otherwise, it needs to keep running and only be deleted once it actually exits. Thanks to Luke for the `sync-streams.wast` test that uncovered this, which I've copied from the `component-model` repo. This also makes a couple of debug logging tweaks that proved useful while investigating the above issue. Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent 92b3a19 commit f1df27d

File tree

3 files changed

+199
-3
lines changed

3 files changed

+199
-3
lines changed

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,7 +1004,7 @@ impl Instance {
10041004
assert!(
10051005
state.table.get_mut().is_empty(),
10061006
"non-empty table: {:?}",
1007-
state.table
1007+
state.table.get_mut()
10081008
);
10091009
assert!(state.high_priority.is_empty());
10101010
assert!(state.low_priority.is_empty());
@@ -2243,12 +2243,15 @@ impl Instance {
22432243
// The caller used a sync-lowered import to call an async-lifted
22442244
// export, in which case the result, if any, has been stashed in
22452245
// `GuestTask::sync_result`.
2246-
if let Some(result) = state.get_mut(guest_task)?.sync_result.take() {
2246+
let task = state.get_mut(guest_task)?;
2247+
if let Some(result) = task.sync_result.take() {
22472248
if let Some(result) = result {
22482249
storage[0] = MaybeUninit::new(result);
22492250
}
22502251

2251-
Waitable::Guest(guest_task).delete_from(state)?;
2252+
if task.exited {
2253+
Waitable::Guest(guest_task).delete_from(state)?;
2254+
}
22522255
} else {
22532256
// This means the callee failed to call either `task.return` or
22542257
// `task.cancel` before exiting.

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2946,6 +2946,10 @@ impl Instance {
29462946
};
29472947
}
29482948

2949+
log::trace!(
2950+
"guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
2951+
);
2952+
29492953
Ok(result)
29502954
}
29512955

@@ -3178,6 +3182,10 @@ impl Instance {
31783182
};
31793183
}
31803184

3185+
log::trace!(
3186+
"guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
3187+
);
3188+
31813189
Ok(result)
31823190
}
31833191

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
;;! component_model_async = true
2+
;;! reference_types = true
3+
;;! gc_types = true
4+
5+
;; This test calls sync stream.write in $C.get and sync stream.read in $C.set.
6+
;; Both of these calls block because $C is first to the rendezvous. But since
7+
;; they are synchronous, control flow switches to $D.run which will do
8+
;; a complementary read/write that rendezvous, and then control flow will
9+
;; switch back to $C.get/set where the synchronous read/write will return
10+
;; without blocking.
11+
;;
12+
;; (Copied from
13+
;; https://github.com/WebAssembly/component-model/blob/main/test/async/sync-streams.wast)
14+
(component
15+
(component $C
16+
(core module $Memory (memory (export "mem") 1))
17+
(core instance $memory (instantiate $Memory))
18+
(core module $CM
19+
(import "" "mem" (memory 1))
20+
(import "" "task.return0" (func $task.return0))
21+
(import "" "task.return1" (func $task.return1 (param i32)))
22+
(import "" "stream.new" (func $stream.new (result i64)))
23+
(import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
24+
(import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32)))
25+
(import "" "stream.drop-readable" (func $stream.drop-readable (param i32)))
26+
(import "" "stream.drop-writable" (func $stream.drop-writable (param i32)))
27+
28+
(func (export "get") (result i32)
29+
(local $ret i32) (local $ret64 i64)
30+
(local $tx i32) (local $rx i32)
31+
(local $bufp i32)
32+
33+
;; ($rx, $tx) = stream.new
34+
(local.set $ret64 (call $stream.new))
35+
(local.set $rx (i32.wrap_i64 (local.get $ret64)))
36+
(local.set $tx (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32))))
37+
38+
;; return $rx
39+
(call $task.return1 (local.get $rx))
40+
41+
;; (stream.write $tx $bufp 4) will block and, because called
42+
;; synchronously, switch to the caller who will read and rendezvous
43+
(local.set $bufp (i32.const 16))
44+
(i32.store (local.get $bufp) (i32.const 0x01234567))
45+
(local.set $ret (call $stream.write (local.get $tx) (local.get $bufp) (i32.const 4)))
46+
(if (i32.ne (i32.const 0x41 (; DROPPED=1 | (4<<4) ;)) (local.get $ret))
47+
(then unreachable))
48+
49+
(call $stream.drop-writable (local.get $tx))
50+
(return (i32.const 0 (; EXIT ;)))
51+
)
52+
(func (export "get_cb") (param i32 i32 i32) (result i32)
53+
unreachable
54+
)
55+
56+
(func (export "set") (param $rx i32) (result i32)
57+
(local $ret i32) (local $ret64 i64)
58+
(local $bufp i32)
59+
60+
;; return immediately so that the caller can just call synchronously
61+
(call $task.return0)
62+
63+
;; (stream.read $tx $bufp 4) will block and, because called
64+
;; synchronously, switch to the caller who will write and rendezvous
65+
(local.set $bufp (i32.const 16))
66+
(local.set $ret (call $stream.read (local.get $rx) (local.get $bufp) (i32.const 4)))
67+
(if (i32.ne (i32.const 0x41 (; COMPLETED=0 | (4<<4) ;)) (local.get $ret))
68+
(then unreachable))
69+
(if (i32.ne (i32.const 0x89abcdef) (i32.load (local.get $bufp)))
70+
(then unreachable))
71+
72+
(call $stream.drop-readable (local.get $rx))
73+
(return (i32.const 0 (; EXIT ;)))
74+
)
75+
(func (export "set_cb") (param i32 i32 i32) (result i32)
76+
unreachable
77+
)
78+
)
79+
(type $ST (stream u8))
80+
(canon task.return (memory $memory "mem") (core func $task.return0))
81+
(canon task.return (result $ST) (memory $memory "mem") (core func $task.return1))
82+
(canon stream.new $ST (core func $stream.new))
83+
(canon stream.read $ST (memory $memory "mem") (core func $stream.read))
84+
(canon stream.write $ST (memory $memory "mem") (core func $stream.write))
85+
(canon stream.drop-readable $ST (core func $stream.drop-readable))
86+
(canon stream.drop-writable $ST (core func $stream.drop-writable))
87+
(core instance $cm (instantiate $CM (with "" (instance
88+
(export "mem" (memory $memory "mem"))
89+
(export "task.return0" (func $task.return0))
90+
(export "task.return1" (func $task.return1))
91+
(export "stream.new" (func $stream.new))
92+
(export "stream.read" (func $stream.read))
93+
(export "stream.write" (func $stream.write))
94+
(export "stream.drop-readable" (func $stream.drop-readable))
95+
(export "stream.drop-writable" (func $stream.drop-writable))
96+
))))
97+
(func (export "get") (result (stream u8)) (canon lift
98+
(core func $cm "get")
99+
async (memory $memory "mem") (callback (func $cm "get_cb"))
100+
))
101+
(func (export "set") (param "in" (stream u8)) (canon lift
102+
(core func $cm "set")
103+
async (memory $memory "mem") (callback (func $cm "set_cb"))
104+
))
105+
)
106+
(component $D
107+
(import "get" (func $get (result (stream u8))))
108+
(import "set" (func $set (param "in" (stream u8))))
109+
110+
(core module $Memory (memory (export "mem") 1))
111+
(core instance $memory (instantiate $Memory))
112+
(core module $DM
113+
(import "" "mem" (memory 1))
114+
(import "" "stream.new" (func $stream.new (result i64)))
115+
(import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
116+
(import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32)))
117+
(import "" "stream.drop-readable" (func $stream.drop-readable (param i32)))
118+
(import "" "stream.drop-writable" (func $stream.drop-writable (param i32)))
119+
(import "" "get" (func $get (result i32)))
120+
(import "" "set" (func $set (param i32)))
121+
122+
(func (export "run") (result i32)
123+
(local $ret i32) (local $ret64 i64)
124+
(local $rx i32) (local $tx i32)
125+
(local $bufp i32)
126+
127+
;; $rx = $C.get()
128+
(local.set $rx (call $get))
129+
130+
;; (stream.read $tx $bufp 4) will succeed without blocking
131+
(local.set $bufp (i32.const 20))
132+
(local.set $ret (call $stream.read (local.get $rx) (local.get $bufp) (i32.const 4)))
133+
(if (i32.ne (i32.const 0x40 (; COMPLETED=0 | (4<<4) ;)) (local.get $ret))
134+
(then unreachable))
135+
(if (i32.ne (i32.const 0x01234567) (i32.load (local.get $bufp)))
136+
(then unreachable))
137+
138+
(call $stream.drop-readable (local.get $rx))
139+
140+
;; ($rx, $tx) = stream.new
141+
;; $C.set($rx)
142+
(local.set $ret64 (call $stream.new))
143+
(local.set $rx (i32.wrap_i64 (local.get $ret64)))
144+
(local.set $tx (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32))))
145+
(call $set (local.get $rx))
146+
147+
;; (stream.write $tx $bufp 4) will succeed without blocking
148+
(local.set $bufp (i32.const 16))
149+
(local.set $ret (call $stream.write (local.get $tx) (local.get $bufp) (i32.const 4)))
150+
(if (i32.ne (i32.const 0x40 (; COMPLETED=0 | (4<<4) ;)) (local.get $ret))
151+
(then unreachable))
152+
153+
(call $stream.drop-writable (local.get $tx))
154+
(i32.const 42)
155+
)
156+
)
157+
(type $ST (stream u8))
158+
(canon stream.new $ST (core func $stream.new))
159+
(canon stream.read $ST async (memory $memory "mem") (core func $stream.read))
160+
(canon stream.write $ST async (memory $memory "mem") (core func $stream.write))
161+
(canon stream.drop-readable $ST (core func $stream.drop-readable))
162+
(canon stream.drop-writable $ST (core func $stream.drop-writable))
163+
(canon lower (func $get) (core func $get'))
164+
(canon lower (func $set) (core func $set'))
165+
(core instance $dm (instantiate $DM (with "" (instance
166+
(export "mem" (memory $memory "mem"))
167+
(export "stream.new" (func $stream.new))
168+
(export "stream.read" (func $stream.read))
169+
(export "stream.write" (func $stream.write))
170+
(export "stream.drop-readable" (func $stream.drop-readable))
171+
(export "stream.drop-writable" (func $stream.drop-writable))
172+
(export "get" (func $get'))
173+
(export "set" (func $set'))
174+
))))
175+
(func (export "run") (result u32) (canon lift (core func $dm "run")))
176+
)
177+
178+
(instance $c (instantiate $C))
179+
(instance $d (instantiate $D
180+
(with "get" (func $c "get"))
181+
(with "set" (func $c "set"))
182+
))
183+
(func (export "run") (alias export $d "run"))
184+
)
185+
(assert_return (invoke "run") (u32.const 42))

0 commit comments

Comments
 (0)