@@ -28,8 +28,10 @@ type 'a event =
28
28
29
29
(* Communication channels *)
30
30
type 'a channel =
31
- { writes_pending : 'a communication Queue .t ; (* All offers to write on it *)
32
- reads_pending : 'a communication Queue .t } (* All offers to read from it *)
31
+ { mutable writes_pending : 'a communication Queue .t ;
32
+ (* All offers to write on it *)
33
+ mutable reads_pending : 'a communication Queue .t }
34
+ (* All offers to read from it *)
33
35
34
36
(* Communication offered *)
35
37
and 'a communication =
@@ -125,6 +127,13 @@ let basic_poll genev =
125
127
let poll ev =
126
128
basic_poll(scramble_array(Array. of_list(flatten_event ev [] )))
127
129
130
+ (* Remove all communication opportunities already synchronized *)
131
+
132
+ let cleanup_queue q =
133
+ let q' = Queue. create() in
134
+ Queue. iter (fun c -> if ! (c.performed) = - 1 then Queue. add c q') q;
135
+ q'
136
+
128
137
(* Event construction *)
129
138
130
139
let always data =
@@ -156,7 +165,9 @@ let send channel data =
156
165
true
157
166
with Queue. Empty ->
158
167
false );
159
- suspend = (fun () -> Queue. add wcomm channel.writes_pending);
168
+ suspend = (fun () ->
169
+ channel.writes_pending < - cleanup_queue channel.writes_pending;
170
+ Queue. add wcomm channel.writes_pending);
160
171
result = (fun () -> () ) })
161
172
162
173
let receive channel =
@@ -183,6 +194,7 @@ let receive channel =
183
194
with Queue. Empty ->
184
195
false );
185
196
suspend = (fun () ->
197
+ channel.reads_pending < - cleanup_queue channel.reads_pending;
186
198
Queue. add rcomm channel.reads_pending);
187
199
result = (fun () ->
188
200
match rcomm.data with
0 commit comments