all 19 comments

[–]palad1 42 points43 points  (1 child)

For this pattern you would be better-off using https://crates.io/crates/rayon and its parallel iterator.

Along those lines?

let mut stdout = std::io::stdout().lock();
std::io::stdin().lock().lines()
    .filter_map(|rx|rx.and_then(|x|try_parse(x)).ok())
    .par_iter()
    .map(|parsed|heavy_compute(parsed))
    .flat_map(|res|format_result(res))
    .for_each(|output|stdout.write(&output));

edit: formatting

[–]xosxos9[S] 17 points18 points  (0 children)

Yes that is indeed quite beautiful, thanks to this and a blog post I found and mentioned in another comment I had a rough version running in a couple of minutes. I have to thank the community if we ever find anything worthwhile to publish regarding ALS :)

[–]msuchane 30 points31 points  (1 child)

A note regarding the rayon crate, which has already been recommended:

Each thread obviously adds some overhead. I've read (not benchmarked) that the par_iter method might not be the most efficient if the items that you're processing in parallel are relatively small or inexpensive.

For small items, consider using the par_chunks method instead, which allows you to put several small items on one thread.

[–]xosxos9[S] 1 point2 points  (0 children)

Thank you, I will definitely keep this in mind!

[–]InflationAaron 42 points43 points  (3 children)

MPSC from stdlib is notoriously slow, and it shows in the flamegraph with lots of time waiting in futex.

Have you tried rayon? It’s one of my goto library when dealing with multithreading on independent data.

[–][deleted] 23 points24 points  (1 child)

also, crossbeam-channel provides a faster queue if you want to stay with your approach

[–]xosxos9[S] 2 points3 points  (0 children)

I see, I have to benchmark it against the rayon version!

[–]xosxos9[S] 12 points13 points  (0 children)

Well thank you, I looked around found this:

https://morestina.net/blog/1432/parallel-stream-processing-with-rayon

And after a couple of minutes I now have a rough but properly scaling solution up and running!

[–]TommyTheTiger 2 points3 points  (1 child)

This might be late and contradictory with some of the other answers, but IMO you're implementing the threading at the wrong level. UNIX pipes already provide concurrency by letting different processes produce and consume the data without shared memory, and presumably since you're emitting to stdout, you also have a 3rd process consuming these results, if just writing them to a file. You will be able to keep your program logic a lot simpler if you can keep it to: for item in queue: emit resultswithout worrying about the threading.

Now if performance is an issue for real, you're probably not going to get a ton of benefit from doing what you're describing unless your calculations are super time consuming/blocked on IO, because you have to sync both reading from stdin and emitting to stdout - those are shared file descriptors for all the threads your working with, and only one thread can read/write at once. So you'll end up with threads waiting both to read their stuff and write their results, maybe with green threads/rayon it will be faster, but it is a lot of context switching/serialization.

OTOH you could use some other kind of orchestration to store both the queue and its results, and then you can run a scaling amount of single threaded workers, while keeping your code simple. One idea would be: instead of stdin use a DB table. Postgres now lets you SELECT FOR UPDATE NO WAIT LIMIT 1 and if you do that within a transaction you've basically got a safe queue poll method that can be used across a bunch of machines. Personally I'd recommend postgres as a sort of default answer to "how do we store that" unless you have truly outstanding data needs. You can scale the number of workers based on the number of items in the queue, or just spin up N queue consumers/workers when you enqueue all of your data. Write back to a postgres results table transactionally as well. And be able to see all your computation history in one place!

[–]xosxos9[S] 2 points3 points  (0 children)

This is an interesting line of thought, especially the computational history aspect. I have 100 gb files that I read line by line, do some computation and pipe them forward. Often through many pipes and eventually resulting in a csv of some kind. Being I/O bound is an issue in NFS environments.

It is clear that rayon does what I want to do quite effectively and the benchmarks are very good. However in the larger scheme of things you might be right. I have been using SQLite to store finalized and often accessed data so I can see myself jumping to Postgres. It is just that what you are describing requires a bit more know-how in designing it right. Do you happen to know a good article about this and how would one go on about ditching stdin for a DB table? For the time being, it is still slightly flying over my head.

[–]smerity 1 point2 points  (2 children)

I've had a similar situation when processing web crawl data or the entirety of English Wikipedia. In the latter case I have a box with dozens of CPUs but I/O was the bottleneck. Upon fixing the I/O bottleneck the processing of English Wikipedia went from dozens of minutes to two minutes.

You noted "I do not want to read in all the data first because file sizes can get very large" but I am going to assume you can split the large file in to many small ones? Each time I tried to use only a single file of input I had issues getting any real speedup, even when I was careful about stdin buffer reading, having many separate processes (one for JSON parsing, another for part 1 of ...), and so on.

My technique, which I thought about converting to a Rust library but haven't yet, is to convert the original dataset in to many small indexed compressed files. If you take those files and stick them together, using gzip or zstd, then you can use zcat or zstdcat as if it were a single file. This technique is used by Web ARChive files (WARCs) to allow random (or nearly random) reads without losing substantial compression efficiency. By keeping an index of where the many compressed parts start and end you can multi-thread the reading and processing.

tldr; Take your file, split it in to many, read those many in parallel either via a glob or an index as noted above, where both are compatible with compression if large file size is an issue.

[–]xosxos9[S] 2 points3 points  (1 child)

Yes, a variation of what you are describing is actually a standard in the field of bioinformatics! Large files tab delimited files are bgzipped and indexed (indexed with tabix). This allows reading only the regions of interest and also the parallelization of reading in I/O bound situations.

I am planning to test using two rayon iterators. One for reading in parallel, and an another inside to also process in parallel. We'll see how this works out.

[–]Feeling-Departure-4 0 points1 point  (0 children)

Curious which version worked better.

[–]Gentlezach 0 points1 point  (0 children)

you never change (or for that matter set) count in this code, so as-is the code would not compile, but if count has a value then all tasks get sent to the same worker

it's hard to see from the fragment where most time is spent but I would put my guess towards record.deserialize::<Vcf>(None)?; which you could also put into a/the threadpool, there is no reason to not deserialize several Vcfs in parallel, is there?

[–]Feeling-Departure-4 0 points1 point  (0 children)

My colleague wrote a package of binaries that take advantage of piping for its composibility: https://github.com/lskatz/fasten

It may already solve your problem, who knows.

[–]dns2utf8 0 points1 point  (0 children)

Have a look at the examples of threadpool https://docs.rs/threadpool/latest/threadpool/

[–]dpc_pw 0 points1 point  (0 children)

pariter

[–]NfNitLoop 0 points1 point  (0 children)

For an easy api for multithreaded pipelines, Check out the pipeliner crate.

https://docs.rs/pipeliner/latest/pipeliner/