Skip to content

Commit cb6965f

Browse files
committed
Made producers notify all consumers and consumers to wait for all producers to be done
1 parent ded8252 commit cb6965f

File tree

1 file changed

+14
-16
lines changed

1 file changed

+14
-16
lines changed

src/bin/multi_read_exact.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ struct ReadData {
3030
producer_tx: Sender<Message>,
3131
consumers: Senders,
3232
num_chunks: u64,
33-
producers_per_consumer: u64
3433
}
3534
type ProducerId = u64;
35+
type NumProducers = u64;
3636
enum Message {
3737
Read(ReadData, Buffer),
38-
End(ProducerId),
38+
End(ProducerId, NumProducers),
3939
}
4040

4141
// Moving a generic Fn instance requires customization
@@ -163,7 +163,6 @@ fn read_file<T: 'static + Clone + Send + Sync, R: 'static + Clone + Sync + Send>
163163
producer_chunk_size,
164164
last_producer_chunk_size,
165165
task_chunk_size,
166-
last_task_chunk_size,
167166
last_prod_task_chunk_size,
168167
chunks_per_producer,
169168
reserved_size as usize,
@@ -233,7 +232,7 @@ fn build_producers(num_producers: u64, filename: &str) -> Senders {
233232
num_consumers,
234233
num_producers as usize,
235234
);
236-
println!("[{}] Sending {} bytes to consumer {}", i, buffer.len(), c);
235+
//println!("[{}] Sending {} bytes to consumer {}", i, buffer.len(), c);
237236
consumers.insert(c);
238237
prev_consumer = c;
239238
#[cfg(feature = "print_ptr")]
@@ -255,10 +254,10 @@ fn build_producers(num_producers: u64, filename: &str) -> Senders {
255254
.expect(&format!("Cannot send buffer"));
256255
if offset as u64 >= end_offset {
257256
// signal the end of stream to consumers
258-
consumers.iter().for_each(|x| {
259-
println!("{}>> Sending End of message to consumer {}", i, x);
260-
let _ = rd.consumers[*x]
261-
.send(End(i));
257+
(0..rd.consumers.len()).for_each(|x| {
258+
//println!("{}>> Sending End of message to consumer {}", i, x);
259+
let _ = rd.consumers[x]
260+
.send(End(i, num_producers));
262261
});
263262
break;
264263
}
@@ -289,7 +288,6 @@ fn build_consumers<T: 'static + Clone + Sync + Send, R: 'static + Clone + Sync +
289288
let data = data.clone();
290289
let h = thread::spawn(move || {
291290
let mut ret = Vec::new();
292-
let mut producers_per_consumer = 0;
293291
let mut producers_end_signal_count = 0;
294292
let mut bytes = 0;
295293
loop {
@@ -300,9 +298,8 @@ fn build_consumers<T: 'static + Clone + Sync + Send, R: 'static + Clone + Sync +
300298
if let Ok(msg) = rx.recv() {
301299
match msg {
302300
Read(rd, buffer) => {
303-
if producers_per_consumer == 0 { producers_per_consumer = rd.producers_per_consumer; }
304301
bytes += buffer.len();
305-
println!("{}> Received {} bytes from [{}]", i, buffer.len(), rd.producer_id);
302+
//println!("{}> Received {} bytes from [{}]", i, buffer.len(), rd.producer_id);
306303
ret.push((
307304
rd.chunk_id,
308305
cc.call(&buffer, data.clone(), rd.chunk_id, rd.num_chunks),
@@ -321,10 +318,10 @@ fn build_consumers<T: 'static + Clone + Sync + Send, R: 'static + Clone + Sync +
321318
//break;
322319
}
323320
},
324-
End(_prod_id) => {
321+
End(_prod_id, num_producers) => {
325322
producers_end_signal_count += 1;
326-
println!("{}> received End signal from {}", i, _prod_id);
327-
if producers_per_consumer == producers_end_signal_count {
323+
println!("{}> received End signal from {} {}/{}", i, _prod_id,producers_end_signal_count, num_producers);
324+
if producers_end_signal_count >= num_producers {
328325

329326
//println!(
330327
// "{}>> {} {}/{}",
@@ -335,6 +332,9 @@ fn build_consumers<T: 'static + Clone + Sync + Send, R: 'static + Clone + Sync +
335332
}
336333
}
337334
} else {
335+
// we do not care if the communication channel was closed
336+
// since it only happen when the producer is finished
337+
// of an error elsewhere occurred
338338
//println!("{}> Exiting", i);
339339
//break;
340340
}
@@ -362,7 +362,6 @@ fn launch(
362362
producer_chunk_size: u64,
363363
last_producer_chunk_size: u64,
364364
task_chunk_size: u64,
365-
last_task_chunk_size: u64,
366365
last_producer_task_chunk_size: u64,
367366
chunks_per_producer: u64,
368367
reserved_size: usize,
@@ -393,7 +392,6 @@ fn launch(
393392
consumers: tx_consumers.clone(),
394393
producer_id: 0, // will be overwritten
395394
num_chunks: chunks_per_producer * num_producers,
396-
producers_per_consumer: num_producers
397395
};
398396
tx.send(Message::Read(rd, buffer)).expect("Cannot send");
399397
}

0 commit comments

Comments
 (0)