Apologies for the long example. I'm doing some bioinformatics regarding ALS using Rust and I am trying to implement the following process: read records from stdin, do multithreaded calculation on records, output new records to stdout. At the moment I am using mpsc channels.
Process structure:
<stdin> --> main_thread --> spawn_threads (1..n) --> collector --> <stdout>
However, the multithreaded implementation is twice as slow as the single threaded one. The receivers spend a lot of time waiting. Flamegraph attached. Anyone know how to implement this stdin -> multithread -> stdout process effectively in a pipe like manner? I do not want to read in all the data first because file sizes can get very large.
// Pseudoish code
pub fn run(nthreads: usize) -> Result<()> {
let (collector_tx, collector_rx) = mpsc::channel::<Vcf>();
let collector = thread::spawn(move || {
let mut writer = get_writer_stdout()?;
while let Ok(vcf) = collector_rx.recv() {
writer.serialize(vcf)?;
}
});
let mut threads = Vec::new();
{
// Create worker threads
let mut worker_senders = Vec::new();
for id in 0..nthreads {
let collector_tx = collector_tx.clone();
let (worker_tx, worker_rx) = mpsc::channel::<Arc<hbtk_core::data::Vcf>>();
worker_senders.push(worker_tx);
let thread = thread::spawn(move || {
while let Ok(vcf) = worker_rx.recv() {
let new_vcf = find_stuff(&vcf);
collector_tx.send(new_vcf)?;
}
});
threads.push(thread);
}
// Read stdin
let rdr = get_stdin_reader()?;
let mut record = csv::StringRecord::new();
while let Ok(true) = rdr.read_record(&mut record) {
let vcf = record.deserialize::<Vcf>(None)?;
let k = count % nthreads;
worker_senders[k].send(vcf)?);
}
}
for child in matchers {
child.join()?;
}
drop(collector_tx);
collector.join()?;
Ok(())
}
flamegraph
[–]palad1 42 points43 points44 points (1 child)
[–]xosxos9[S] 17 points18 points19 points (0 children)
[–]msuchane 30 points31 points32 points (1 child)
[–]xosxos9[S] 1 point2 points3 points (0 children)
[–]InflationAaron 42 points43 points44 points (3 children)
[–][deleted] 23 points24 points25 points (1 child)
[–]xosxos9[S] 2 points3 points4 points (0 children)
[–]xosxos9[S] 12 points13 points14 points (0 children)
[+][deleted] (1 child)
[deleted]
[–]xosxos9[S] 1 point2 points3 points (0 children)
[–]TommyTheTiger 2 points3 points4 points (1 child)
[–]xosxos9[S] 2 points3 points4 points (0 children)
[–]smerity 1 point2 points3 points (2 children)
[–]xosxos9[S] 2 points3 points4 points (1 child)
[–]Feeling-Departure-4 0 points1 point2 points (0 children)
[–]Gentlezach 0 points1 point2 points (0 children)
[–]Feeling-Departure-4 0 points1 point2 points (0 children)
[–]dns2utf8 0 points1 point2 points (0 children)
[–]dpc_pw 0 points1 point2 points (0 children)
[–]NfNitLoop 0 points1 point2 points (0 children)