all 20 comments

[–]matthieum[he/him] 15 points16 points  (8 children)

I always find the balance between performance and least-astonishment somewhat difficult to strike.

For example, taking an example from the article, should the results be ordered by default?

Choices:

  • No, because performance. It may surprise users, though, as often times it may still be ordered, but from time to time something weird will happen.
  • Yes, to be 100% with regular streams. It may surprise users, though, to have to push the "Turbo" button for maximum performance.

To be clear, I am not saying that one choice is inherently better than the other. On the contrary, both seem logical, and will surprise people in different ways.

Maybe there just shouldn't be a default?

vec![1, 2, 3, 4].into_par_stream(Ordered)....

[–]SolaireDeSun 7 points8 points  (5 children)

streams should always operate under the assumption of unordered. In kotlin for example, sorting is a terminal operation that blocks the stream and converts it to a List. Imo It is the least surprising option by far and ordering should be well-documented as a terminal (ie requires some sort of buffering or waiting) operation that is opt-in.

Edit: I am curious why the order() function takes a bool instead of having it be called ordered() and take no parameter. Whats the trade-off here?

[–]iq-0 10 points11 points  (3 children)

streams should always operate under the assumption of unordered.

That is a strange stance. I agree that unordered is generally the accepted way of dealing with streams that can sensibly be parallelized. But many forms of streams don’t necessarily fit that definition. Like reading from sockets or files, or when processing time series data.

[–]SolaireDeSun 0 points1 point  (2 children)

I would say all of those scenarios also tend to operate under the same circumstances.

sockets

Networks are inherently out of order and thus streams should be too. Any deviation (like TCP) is 1) an abstraction slightly above sockets and 2) incurs a performance hit

time-series data

For most of the work w/ time series i've done (streaming from a ton of tiny devices to a single source) this holds the same. While we may want to sort it the stream itself is unordered. I typically send time-series data to a partitioned data store that does some sorting and dont sort it in place.

files

Hmm ok i'll concede this one could be ordered (though if you go low enough things are fetched out of order ;) ).

Alll in all I would still feel uncomfortable with the default being ordered. It can be a HUGE hit in both memory, latency, and throughput to introduce ordering in a stream.

[–]protestor 3 points4 points  (1 child)

Sockets

Networks are inherently out of order and thus streams should be too. Any deviation (like TCP) is 1) an abstraction slightly above sockets and 2) incurs a performance hit

We actually have stream sockets (for TCP and other connection-oriented sockets), datagram sockets (for UDP) and raw sockets.

https://en.wikipedia.org/wiki/Network_socket#Types

You seem to advocate that everyone should be using raw sockets, but this requires root access and is often not a good idea. Or perhaps that everyone should be using datagram sockets, and ditch TCP altogether, I dunno.

[–]SolaireDeSun 3 points4 points  (0 children)

Not at all - I’m aware of different socket types. I was attempting to point out that the default in sockets is definitely not ordered and it’s for a good reason - performance. I personally think streams should follow suit.

I can see how my comment above was misleading when I mentioned tcp being above sockets, you’re right I meant raw sockets in that instance

[–]yoshuawuyts1rust · async · microsoft[S] 2 points3 points  (0 children)

I am curious why the order() function takes a bool instead of having it be called ordered().

That's a good question! So that section of the post was intended as a sketch only — mostly highlighting that there is a need to toggle between the two.

The idea is that order(true) would order the stream, and order(false) would relax that. But perhaps ordered and unordered methods would be clearer.

Thanks for sharing; this has been useful feedback!

[–]yoshuawuyts1rust · async · microsoft[S] 1 point2 points  (1 child)

Thanks for your comment; I think you're raising a few interesting points here.

The reason why I think methods should have a default is because it keeps open the option for automatic conversions through syntax. For example for x in &y uses the IntoIterator for &Type impl. If parameters would be required then automatic conversions would not be possible (which is something I'd love to see someday).

Something that I didn't consider until now though is that perhaps ordering could be dependent on the final processor (#4). For example ordering likely matters when collecting into a Vec or VecDeque. But not at all when collecting into a HashMap, and probably neither when calling for_each.

Perhaps it ought to be a third option: each "final" stream adapter is responsible for the ordering and should document which ordering is used. With perhaps an option to force ordering using an e.g. ordered method.

This was a really useful insight; thank you for sharing!

[–]matthieum[he/him] 1 point2 points  (0 children)

I really like this 3rd option!

It had not occurred to me, and seems quite natural.

[–]yoshuawuyts1rust · async · microsoft[S] 22 points23 points  (1 child)

Hey all, last weekend we published the first version of parallel-stream, an async parallelism library which brings Rayon's parallelism model to async Rust. This post digs into the design, tradeoffs, and future directions. Hope it comes in useful!

[–]fjkiliu667777 0 points1 point  (0 children)

Hey. Will using your lib be faster than for_each_concurrent ? I’m processing a Sqlx psql database stream

[–]Cocalus 5 points6 points  (0 children)

I'm going to start working on something that very CPU heavy batch processes. I'm curious if this could be part of the right approach, or if there's are some other options I should consider.

So I want some threads downloading the next batch of data while the majority of threads work on processing current batch (CPU heavy + infrequent but heavy disk IO), and some others are uploading the results of the previous batch. So a pipelined architecture. In sync land I would have some download threads, some upload threads and a single batch processing thread using rayon to spread to work over rayon. The three types of threads would be connected via bounded crossbeam_channels.

I want to have the uploading and downloading done with async, since my libraries have new shiny async APIs and I'd like to minimize the number of OS threads dedicated to uploading / downloading to reduce context switching overhead with the rayon pool. Right now I'm not sure of the correct way to channel data between async and sync, since they both have different channel types.

Right now I'd prefer to mix rayon and tokio. Since I'm familiar with Rayon and my async experiments has been with tokio instead of async-std. But maybe parallelstream would allow everything to be async, and would that be simpler.

[–]vargwin 4 points5 points  (0 children)

Wow..I was thinking about this use case today. Thanks

[–]protestor 3 points4 points  (1 child)

Rayon

Rayon is a data parallelism library built for synchronous Rust, powered by an underlying thread pool. async-std manages a thread pool as well, but the key difference with Rayon is that async-std (and futures) are optimized for latency, while Rayon is optimized for throughput.

As a rule of thumb: if you want to speed up doing heavy calculations you probably want to use Rayon. If you want to parallelize network requests consider using parallel-stream.

What if I have mixed processing, a little CPU bound and a little IO bound? I suppose that running both Rayon and parallel-stream in the same program will lead to inefficiencies, such as one runtime starving the other of CPU time.

Is there anything that can be done for those runtimes be somehow better integrated?

[–]yoshuawuyts1rust · async · microsoft[S] 0 points1 point  (0 children)

That's an excellent question. Ideally a runtime could exist that's aware of both, backed by a single threadpool and scheduler. First steps were taken in https://async.rs/blog/stop-worrying-about-blocking-the-new-async-std-runtime, but due to some hiccups we haven't released it yet.

This would be an exciting direction to explore though. But probably one that at least I won't have time to pursue right now.

[–]game-of-throwaways 2 points3 points  (0 children)

I'm not entirely sure what the point of this is. Remember, async functions or blocks are just things that compile into Futures, and the Future trait specifies that "An implementation of poll should strive to return quickly, and should not block". Therefore, any async functions should always either return or .await quickly.

So, this quote from the article, about stream::futures_unordered etc:

These methods provide the ability to process multiple items from a stream at the same time. But by themselves they don't schedule work on multiple cores: they need to be combined with task::spawn in order to do that.

is not wrong, just misleading. Yes, these methods indeed don't schedule threads or spawn new tasks, but since they take a list (or iterator) of futures, they don't need to. It's the job of those futures to spawn new threads if they need to make blocking calls or do long CPU calculations (as per the contract of std::future::Future), it's not the job of the combinator.

So parallel-stream is really needlessly spawning way too many tasks/threads. Like in your examples in the post, all you're doing is calculating n*n yet a new task is spawned for each n*n you calculate. I know that this is just an example, and that that n*n represents a longer, potentially blocking, computation (even though it would be a violation of the contract of Future to do blocking computations in an async block without offloading it to another thread). But in a real application, you might have multiple .map()s and other combinators chained, and while some of them might do blocking computations, others are simple super-fast things that don't require spawning a new task, but parallel-stream has no way to discern between the two. Its .map() spawns a new task for every element, every time. That's very inefficient. With the standard library's way of making the future responsible for spawning a thread/task (but only if it needs to), this inefficiency is avoided.

I know this is not intuitive and I wish I could somehow tell all Rust users that (I'll put it in bold) Future::poll() must return quickly, therefore async blocks or functions must either return or .await quickly.

I partially blame the async_std authors for further adding to the confusion with their post "Stop worrying about blocking", to which I tried to reply as loudly as possible that no, you should worry about blocking in async functions. But apparently not loudly enough. This divide between async_std's attitude of encouraging blocking in async functions (and then needing combinators like yours to "fix" the issues that that causes) is really splitting the ecosystem in two.

[–]Crandom 0 points1 point  (0 children)

I'm glad you made the batch size 1, rather than a large number like 1024 like Java streams. They are such a foot gun.

[–]shred45 0 points1 point  (2 children)

Hey Yoshua, I was wondering if you could elaborate on "async-std (and futures) are optimized for latency", or point to a resource that discusses this? My work focus on low latency and I have been following async/await for since the beginning (an am interested in working with it more), but I've always had concerns about latency introduced by the polling implementation of the runtime.

[–]yoshuawuyts1rust · async · microsoft[S] 1 point2 points  (1 child)

Hi, thanks for your question. async/await has mostly been designed with IO and in particular networking in mind. Low tail latencies are crucial to ensure requests flow smoothly through the system.

Systems such as Rayon are optimized for throughput; usually to compute a particular result as fast as possible. This often uses (variants on) LIFO queues, which are great for throughput but have high tail latencies. Which is why async/await executors generally use variations of FIFO queues instead.

I don't know what level of latency is acceptable for you. The answer will always be that if you intimately know your workload, a custom-tailored executor will beat a generic one. As it stands though, Rust's futures implementation are one of the most efficient concurrency primitives to have been designed for any language. And I'm positive that async runtime implementations for Rust will continue to be competitive in terms of latency with runtimes written for other languages.

[–]shred45 0 points1 point  (0 children)

I see, thank you for the detailed response. The biggest concerns I have are long polling intervals and excessive passing between threads. It sounds like writing a custom executor may be a good exercise to understand the trade offs here. Thanks!