Skip to content

Commit ff516c8

Browse files
committed
discard messages for dropped channels
1 parent ad41e93 commit ff516c8

File tree

3 files changed

+61
-22
lines changed

3 files changed

+61
-22
lines changed

communication/src/allocator/process.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl Allocate for Process {
142142

143143
let vector =
144144
entry
145-
.downcast_mut::<(Vec<Option<(Vec<(Pusher<Message<T>>, Buzzer)>, Puller<Message<T>>)>>)>()
145+
.downcast_mut::<Vec<Option<(Vec<(Pusher<Message<T>>, Buzzer)>, Puller<Message<T>>)>>>()
146146
.expect("failed to correctly cast channel");
147147

148148
let (sends, recv) =

communication/src/allocator/zero_copy/allocator.rs

+30-9
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Zero-copy allocator based on TCP.
22
use std::rc::Rc;
33
use std::cell::RefCell;
4-
use std::collections::{VecDeque, HashMap};
4+
use std::collections::{VecDeque, HashMap, hash_map::Entry};
55
use std::sync::mpsc::{Sender, Receiver};
66

77
use bytes::arc::Bytes;
@@ -105,6 +105,7 @@ impl<A: AllocateBuilder> TcpBuilder<A> {
105105
index: self.index,
106106
peers: self.peers,
107107
canaries: Rc::new(RefCell::new(Vec::new())),
108+
channel_id_bound: None,
108109
staged: Vec::new(),
109110
sends,
110111
recvs,
@@ -124,6 +125,8 @@ pub struct TcpAllocator<A: Allocate> {
124125
staged: Vec<Bytes>, // staging area for incoming Bytes
125126
canaries: Rc<RefCell<Vec<usize>>>,
126127

128+
channel_id_bound: Option<usize>,
129+
127130
// sending, receiving, and responding to binary buffers.
128131
sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, // sends[x] -> goes to process x.
129132
recvs: Vec<MergeQueue>, // recvs[x] <- from process x.
@@ -135,6 +138,12 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
135138
fn peers(&self) -> usize { self.peers }
136139
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
137140

141+
// Assume and enforce in-order identifier allocation.
142+
if let Some(bound) = self.channel_id_bound {
143+
assert!(bound < identifier);
144+
}
145+
self.channel_id_bound = Some(identifier);
146+
138147
// Result list of boxed pushers.
139148
let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::new();
140149

@@ -186,11 +195,15 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
186195
// Check for channels whose `Puller` has been dropped.
187196
let mut canaries = self.canaries.borrow_mut();
188197
for dropped_channel in canaries.drain(..) {
189-
let dropped =
198+
let _dropped =
190199
self.to_local
191200
.remove(&dropped_channel)
192201
.expect("non-existent channel dropped");
193-
assert!(dropped.borrow().is_empty());
202+
// Borrowed channels may be non-empty, if the dataflow was forcibly
203+
// dropped. The contract is that if a dataflow is dropped, all other
204+
// workers will drop the dataflow too, without blocking indefinitely
205+
// on events from it.
206+
// assert!(dropped.borrow().is_empty());
194207
}
195208
::std::mem::drop(canaries);
196209

@@ -215,15 +228,23 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
215228
let _ = peel.extract_to(40);
216229

217230
// Increment message count for channel.
231+
// Safe to do this even if the channel has been dropped.
218232
events.push_back((header.channel, Event::Pushed(1)));
219233

220234
// Ensure that a queue exists.
221-
// We may receive data before allocating, and shouldn't block.
222-
self.to_local
223-
.entry(header.channel)
224-
.or_insert_with(|| Rc::new(RefCell::new(VecDeque::new())))
225-
.borrow_mut()
226-
.push_back(peel);
235+
match self.to_local.entry(header.channel) {
236+
Entry::Vacant(entry) => {
237+
// We may receive data before allocating, and shouldn't block.
238+
if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
239+
entry.insert(Rc::new(RefCell::new(VecDeque::new())))
240+
.borrow_mut()
241+
.push_back(peel);
242+
}
243+
}
244+
Entry::Occupied(mut entry) => {
245+
entry.get_mut().borrow_mut().push_back(peel);
246+
}
247+
}
227248
}
228249
else {
229250
println!("failed to read full header!");

communication/src/allocator/zero_copy/allocator_process.rs

+30-12
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use std::rc::Rc;
44
use std::cell::RefCell;
5-
use std::collections::{VecDeque, HashMap};
5+
use std::collections::{VecDeque, HashMap, hash_map::Entry};
66
use std::sync::mpsc::{Sender, Receiver};
77

88
use bytes::arc::Bytes;
@@ -28,7 +28,6 @@ pub struct ProcessBuilder {
2828
peers: usize, // number of peer allocators.
2929
pushers: Vec<Receiver<MergeQueue>>, // for pushing bytes at other workers.
3030
pullers: Vec<Sender<MergeQueue>>, // for pulling bytes from other workers.
31-
// signal: Signal,
3231
}
3332

3433
impl ProcessBuilder {
@@ -80,11 +79,11 @@ impl ProcessBuilder {
8079
peers: self.peers,
8180
events: Rc::new(RefCell::new(VecDeque::new())),
8281
canaries: Rc::new(RefCell::new(Vec::new())),
82+
channel_id_bound: None,
8383
staged: Vec::new(),
8484
sends,
8585
recvs,
8686
to_local: HashMap::new(),
87-
// _signal: self.signal,
8887
}
8988
}
9089
}
@@ -108,7 +107,8 @@ pub struct ProcessAllocator {
108107

109108
canaries: Rc<RefCell<Vec<usize>>>,
110109

111-
// _signal: Signal,
110+
channel_id_bound: Option<usize>,
111+
112112
// sending, receiving, and responding to binary buffers.
113113
staged: Vec<Bytes>,
114114
sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, // sends[x] -> goes to thread x.
@@ -121,6 +121,12 @@ impl Allocate for ProcessAllocator {
121121
fn peers(&self) -> usize { self.peers }
122122
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
123123

124+
// Assume and enforce in-order identifier allocation.
125+
if let Some(bound) = self.channel_id_bound {
126+
assert!(bound < identifier);
127+
}
128+
self.channel_id_bound = Some(identifier);
129+
124130
let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::new();
125131

126132
for target_index in 0 .. self.peers() {
@@ -158,11 +164,15 @@ impl Allocate for ProcessAllocator {
158164
// Check for channels whose `Puller` has been dropped.
159165
let mut canaries = self.canaries.borrow_mut();
160166
for dropped_channel in canaries.drain(..) {
161-
let dropped =
167+
let _dropped =
162168
self.to_local
163169
.remove(&dropped_channel)
164170
.expect("non-existent channel dropped");
165-
assert!(dropped.borrow().is_empty());
171+
// Borrowed channels may be non-empty, if the dataflow was forcibly
172+
// dropped. The contract is that if a dataflow is dropped, all other
173+
// workers will drop the dataflow too, without blocking indefinitely
174+
// on events from it.
175+
// assert!(dropped.borrow().is_empty());
166176
}
167177
std::mem::drop(canaries);
168178

@@ -185,15 +195,23 @@ impl Allocate for ProcessAllocator {
185195
let _ = peel.extract_to(40);
186196

187197
// Increment message count for channel.
198+
// Safe to do this even if the channel has been dropped.
188199
events.push_back((header.channel, Event::Pushed(1)));
189200

190201
// Ensure that a queue exists.
191-
// We may receive data before allocating, and shouldn't block.
192-
self.to_local
193-
.entry(header.channel)
194-
.or_insert_with(|| Rc::new(RefCell::new(VecDeque::new())))
195-
.borrow_mut()
196-
.push_back(peel);
202+
match self.to_local.entry(header.channel) {
203+
Entry::Vacant(entry) => {
204+
// We may receive data before allocating, and shouldn't block.
205+
if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
206+
entry.insert(Rc::new(RefCell::new(VecDeque::new())))
207+
.borrow_mut()
208+
.push_back(peel);
209+
}
210+
}
211+
Entry::Occupied(mut entry) => {
212+
entry.get_mut().borrow_mut().push_back(peel);
213+
}
214+
}
197215
}
198216
else {
199217
println!("failed to read full header!");

0 commit comments

Comments
 (0)