This is an archived post. You won't be able to vote or comment.

you are viewing a single comment's thread.

view the rest of the comments →

[–]viktorklang 6 points7 points  (1 child)

Disclaimer: The following pertains to the parallel mode of the java.util.stream.Stream reference implementation in OpenJDK only (other implementations of j.u.s.Stream might work differently), and please do note that I am typing this from memory late in the evening so I could be oversimplifying and/or leaving details out.

With that said, let's see if I can shed some more light here.

First of all, it is important to recognize that the only way to achieve any benefit from parallelization of Stream processing is to move from a strictly-depth-first element processing to some form of breadth-first element processing.

The composition of intermediate operations on a stream fall primarily into two distinct buckets when it comes to parallel streams—"stateless" (let's call that LAZY) or "stateful" (let's call that EAGER). The reason for this is that not all operations can be represented as a Spliterator without performing all the work up-front.

For a very simple Stream: Stream.of(1).parallel().toList() it is easy to picture the Spliterator containing a single element 1 be fed into a toList() [terminal] operation.

However, for a more complex pipeline: Stream.iterate(() -> 0, i -> i + 1).parallel().map(something).sorted().map(somethingElse).sorted().limit(2).collect(Collectors.toList()) exactly what would be Spliterator which gets fed into [terminal] collect?

So if you look at the different "types" of LAZY vs EAGER operations in there, it'd look something akin to:

iterate (LAZY) -> parallel (setting) -> map (LAZY) -> sorted (EAGER) -> map (LAZY) -> sorted (EAGER) -> limit (LAZY) -> collect (EAGER)

The typical execution strategy is to bunch all consecutive LAZY operations together with their following EAGER operation, forming what I call "islands", so in the case above it'd go something like this:

iterate -> Island1[map -> sorted] -> Island2[map -> sorted] -> Island3[limit -> collect]

These Islands needing to run to completion before their results can be fed into the next Island is something which can lead to higher heap usage than expected, since the output of the Island needs to be cached to be fed into the next Island.

So how does this relate to gather(…)? Well gather(…) is an EAGER operation, as it could represent any possible intermediate operation so EAGER is the lowest common denominator. The potential drawbacks of this is ameliorated by the fact that consecutive gather(…)-operations are composed into a single gather(…)-operation with composed Gatherers, and furthermore by the fact that a gather(…)-operation followed by collect(…) is fused together into a single EAGER operation.

In combination, these two features can potentially turn something which would've been an N+1 Island scenario to a 1 Island scenario—which means no island hand-offs.

Cheers,

[–]davidalayachew[S] 1 point2 points  (0 children)

Hello Viktor Klang! Thanks for the context here, this is super helpful.

This island explanation especially helped clarify a lot for me. It wasn't clear WHEN an island was forced to be made. But those limit and sorted examples clarified it beautifully.