Skip to content

Commit

Permalink
Fix memory leak related to thread orchestration
Browse files Browse the repository at this point in the history
  • Loading branch information
hextraza committed Jan 18, 2024
1 parent d9161f9 commit 6a5ea43
Showing 1 changed file with 78 additions and 3 deletions.
81 changes: 78 additions & 3 deletions src/process/bam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::fs::{File, OpenOptions};
use std::io::{Error, Write};
use std::time::Instant;

const MAX_UMIS_IN_CHANNEL: usize = 20;
const MAX_UMIS_IN_CHANNEL: usize = 50;
const SAFETY_BUFFER: f64 = 1.20;
const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
const SLEEP_DURATION: Duration = Duration::from_millis(50);
Expand Down Expand Up @@ -54,6 +54,51 @@ fn bam_data_header(prefix: &str) -> String {
format!("{}_an", prefix))
}

struct OwnedMetadataWrapper {
pub sequence: Box<str>,
pub mapq: Box<u8>,
pub orientation: Box<str>,
pub pair: Box<str>,
pub rev_comp: Box<bool>,
pub hit: Box<str>,
pub qname: Box<[u8]>,
pub qual: Box<[u8]>,
pub tx: Box<str>,
pub umi: Box<str>,
pub cb: Box<str>,
pub an: Box<str>,
}

fn convert_message_to_alignable(metadata: Vec<Box<OwnedMetadataWrapper>>) -> Vec<(
u8,
String,
String,
bool,
String,
Vec<u8>,
Vec<u8>,
String,
String,
String,
String,
)> {
metadata.into_iter().map(|wrapper| {
(
*wrapper.mapq,
(*wrapper.orientation).to_string(),
(*wrapper.pair).to_string(),
*wrapper.rev_comp,
(*wrapper.hit).to_string(),
(*wrapper.qname).to_vec(),
(*wrapper.qual).to_vec(),
(*wrapper.tx).to_string(),
(*wrapper.umi).to_string(),
(*wrapper.cb).to_string(),
(*wrapper.an).to_string(),
)
}).collect()
}

pub fn process(
input_files: Vec<String>,
reference_indices: Vec<(PseudoAligner, PseudoAligner)>,
Expand Down Expand Up @@ -132,10 +177,35 @@ pub fn process(
println!("Finished reading UMIs from input file.");
break;
} else {
// Wrapper that stops endless memory leak from strings and vecs getting lost in the
// sender queue. Seems like there's still some odd behavior around qual/tx/an, so
// they're getting dropped from the output for the time being.
let metadata: Vec<Box<OwnedMetadataWrapper>> = reader.current_metadata_group.iter().map(|tuple| {
Box::new(OwnedMetadataWrapper {
sequence: String::new().into_boxed_str(),
mapq: Box::new(tuple.0),
rev_comp: Box::new(tuple.3),
qual: Vec::new().into_boxed_slice(),
tx: String::new().into_boxed_str(),
an: String::new().into_boxed_str(),
orientation: tuple.1.clone().into_boxed_str(),
pair: tuple.2.clone().into_boxed_str(),
hit: tuple.4.clone().into_boxed_str(),
qname: tuple.5.clone().into_boxed_slice(),
//qual: tuple.6.clone().into_boxed_slice(),
//tx: tuple.7.clone().into_boxed_str(),
umi: tuple.8.clone().into_boxed_str(),
cb: tuple.9.clone().into_boxed_str(),
//an: tuple.10.clone().into_boxed_str(),
})
}).collect();

// Sender uses a wrapped object w/ Boxes instead of the clone of the data
sender
.send((
reader.current_umi_group.clone(),
reader.current_metadata_group.clone(),
//reader.current_metadata_group.clone(),
metadata,
))
.unwrap();
}
Expand Down Expand Up @@ -166,7 +236,12 @@ pub fn process(
let _safe_to_allocate = block_on_memory_headroom(num_consumers);

match data {
Ok((umi, current_metadata_group)) => {
Ok((umi, owned_metadata_group)) => {

// Use new message structure that reduces memory leakage from the BAM flags
let current_metadata_group = convert_message_to_alignable(owned_metadata_group);
//let current_metadata_group = owned_metadata_group;

let results = align_umi_to_libraries(
umi,
current_metadata_group,
Expand Down

0 comments on commit 6a5ea43

Please sign in to comment.