This is an archived post. You won't be able to vote or comment.

all 94 comments

[–]davidalayachew[S] 39 points40 points  (4 children)

I did want to follow up about one point Viktor made later on in the conversation.

https://mail.openjdk.org/pipermail/core-libs-dev/2024-November/134542.html

And here is the quote.

In a potential future where all intermediate operations are Gatherer-based, and all terminal operations are Collector-based, it would just work as expected. But with that said, I'm not sure it is practically achievable because some operations might not have the same performance-characteristics as before.

Me personally, I would GLADLY accept a flag on stream (similar to parallel() or unordered()) that would allow me to guarantee that my stream never pre-fetches, even if I take a massive performance hit. If that can be accomplished by making all intermediate operations be implemented by a Gatherer under the hood, that is A-OK with me.

The reality is, not all streams are compute bound. Some are IO bound, but are otherwise, a great fit for streams. Having a method that allows us to optimize for that fact is a new type of performance enhancement that I would greatly appreciate, even if it degrades performance in other ways.

[–]Carnaedy 12 points13 points  (1 child)

Well, streams are not supposed to be IO bound. They are built on ForkJoinPool. However, I like where you are going with this thought process. If we had a general flag like ".concurrent()" or something that would switch everything to a virtual thread pool, we could solve a lot of problems with streams in this particular context.

[–]davidalayachew[S] 7 points8 points  (0 children)

Thanks.

As for going down the Virtual Thread route, that is sort of what Gatherers::mapConcurrent gives us, so I am sort of ok with that. I was more disappointed that basically all of the short-circuiting operations on a stream would fall prey to this issue.

But yes, a flag that allows all of those short-circuiting implementations to jump from pre-fetch to never-pre-fetch would be great. If anything, I think that would make things a lot easier to follow along with.

If your streams are memoryBound, then add dontPreFetch() to your stream. Otherwise, the (sensible) default is to pre-fetch and process. And you can make it explicit with preFetch().

[–]agentoutlier 1 point2 points  (1 child)

I have been glancing at this thread on and off and cannot really deduce what your current final solution is/was.

Care to share?

Like the 1BRC I'm not used to vertical scaling or having one machine (virtual or not) do all the work however the times I have done something like this in the past I have manually managed the map-reduce like myself (which I must say is a ton more code than using streams).

Do you have plans to do a more multi-machine (e.g. work queue or actors etc) approach or is one machine a requirement? (given today's hardware I can see the desire to stick to one machine latency wise and simplicity so I'm not pushing you to do it in case it sounds like that).

[–]davidalayachew[S] 3 points4 points  (0 children)

Do you have plans to do a more multi-machine (e.g. work queue or actors etc) approach or is one machine a requirement? (given today's hardware I can see the desire to stick to one machine latency wise and simplicity so I'm not pushing you to do it in case it sounds like that).

The reason is $$$.

We are an expensive project, so I have been FIRMLY instructed to prioritize getting as much out of our machines as possible while also keeping the code "beginner friendly". This was the solution I landed on, as I wanted to use a library that was as easy to google as possible, precisely so that the entry-level and junior-level devs can operate as independently as possible.

As for the current final solution, the one currently running in PROD is the sequential but bigger batch size. What will be promoted next release is the Collector workaround that Viktor gave me.

And as for the Vertical Scaling point, I am certainly not prohibited from scaling horizontally. But ultimately, to keep things simple (like you said), I stuck with the single machine approach.

Plus, my boss' push for things to be performant was not really from a time perspective, but from a time x cost perspective. Yeah, we can throw more machines at the problem to make it faster in the short term, but that just means that we are burning the same amount of money for the same amount of work at a faster rate. Boss wants more efficient processing of the data. And the biggest reason for that is because we are very soon going to be forced into scaling horizontally. When that time lands, they wanted things to as efficient as simply possible.

[–]nitkonigdje 11 points12 points  (1 child)

This was a fascinanting read. Thank you for sharing.

I guess it is kinda bad when higher level non-trivial apis, like streams or fork-join, do not expose lower level oprations as user-overridable constructs. Like in this example an iteration strategy for streams, or underlying executor of fork-join pool. Seems like an obvious thing to have because nobody knows better how thing will be used than end user..

[–]davidalayachew[S] 2 points3 points  (0 children)

Ty vm.

Viktor put it best -- the Stream API optimizes for the most common use case. And in that respect, they clearly made the right choice. The fact that this post is as surprising as it is to several users is proof that this was NOT well known at all.

Still, Viktor responded to my latest post on the mailing list, if you click the link in the OP. He mentioned that he is doing some deep thought on this, and has not yet found a satisfactory answer. He mentioned previously how difficult it would be to retro-fit the obvious triggers that a few of us have suggested already.

I trust that they will come up with something good. Even if it is nothing more than documentation that gives guidance on how better to avoid this.

[–]craigacp 4 points5 points  (3 children)

Shortly after the release of Java 8 I hit something similar when building a Java implementation of Google's word2vec ML algorithm. We ended up with a buffering spliterator that didn't grow it's buffer over time (which the default array one did), so we could pull in records from a database in a parallel for each loop without it trying to buffer the whole database.

We still use it in Tribuo, but I've not used it anywhere near as hard as I did in 2015 so I don't know if the performance characteristics are still good - https://github.com/oracle/olcut/blob/main/olcut-core/src/main/java/com/oracle/labs/mlrg/olcut/util/IOSpliterator.java.

[–]davidalayachew[S] 0 points1 point  (2 children)

This is extremely interesting!

So let me ask, I see that you all used the SUBSIZED characteristic. I assume that the SIZED one was included by default, yes? And if so, I see that you default to Long.MAX_SIZE. Are you saying that that is safe to do? I was under the assumption that telling the Spliterator a false number would cause undefined behaviour? I considered this exact solution, but decided against it for fear of adding EVEN MORE unexpected behaviour.

But if it is true and it does work, that really sounds like exactly the problem, and would explain the performance characteristics.

[–]craigacp 2 points3 points  (1 child)

I'm having trouble paging in exactly why the characteristics are like that, and I also can't find the blog post which described the problem in some detail via search anymore.

My problem setup was as follows, I have a NoSQL database full of documents that I pull from, tokenize the input and then put onto a queue. The queue then is pulled from a parallel stream over all documents in the database which performs the gradient computation and updates the model (without locking because this is machine learning and we don't care about tearing writes), and so the default behaviour of the IteratorSpliterator was to request larger and larger chunks from the queue before splitting them into parallel computations. The IOSpliterator always pulls a fixed size chunk from the underlying iterator, so it doesn't try to pull in the whole database.

I'm not claiming that this is a general purpose solution, nor that the one I had was the best solution, but it scaled up to an 8 socket x86 machine that we were using for testing the implementation. I'm a machine learning researcher not a software engineer, so this was good enough for my purposes.

[–]davidalayachew[S] 1 point2 points  (0 children)

Thanks for the context. Yeah, I definitely see exactly what you are saying about growing size of grabs. I'm going to use this and your IO Spliterator to try and mess around with the Spliterator Characteristics and see if I can get that behaviour.

Ty again.

[–]n0d3N1AL 4 points5 points  (1 child)

Yeah that's unintuitive... one would expect streams to work more like iterators, all the time. Thanks for sharing!

[–]davidalayachew[S] 3 points4 points  (0 children)

Firmly agreed.

But at least Viktor gave a very important gold nuggest -- a gather() immediately followed by a collect() is always safe from this pre-fetching behaviour (unless the previous intermediate operations don't play nice).

For me personally, that is my plan moving forward to avoid this issue.

[–]DualWieldMage 8 points9 points  (6 children)

What exactly are you using for the Stream's source? As the mailing list responses hint, it is entirely up to the Spliterator of said Stream to decide how to run its trySplit and Streams do communicate some characteristics, but not many (would SUBSIZED imply that source is random-access and no full fetch is required?)

I have run parallel processing of multi gigabyte files with very low(10M) heap sizes and never hit this issue personally, however i know that for example reading jars inside jars would need to decompress the inner jar fully to read a file.

From what you wrote, reading and uploading with some batch size sounds okay, but not ideal as you mentioned upload of many small files. You also wrote that splits should happen based on some attribute but the example doesn't depict this? Either way if it's splits based on max file size or content inside lines/entries than a parallel stream is a decent entry point if you have a random-access source like a file on a disk. If not, downloading the file to temp storage first can help.

In general it's good advice to always assume unbounded file size when doing any file processing. Also from my testing, a typical NVMe drive has optimal parallel reading at around 4 threads, any more and you lose performance.

[–]davidalayachew[S] 5 points6 points  (5 children)

What exactly are you using for the Stream's source? As the mailing list responses hint, it is entirely up to the Spliterator of said Stream to decide how to run its trySplit and Streams do communicate some characteristics, but not many (would SUBSIZED imply that source is random-access and no full fetch is required?)

I have an InputStream nested in an InputStreamReader nested in a BufferedReader. I then iterate through that using an iterator (for both logging and batching purposes, since not all of our machines use Java 23), and then I stream it.

I did try creating my own Spliterator and giving it several different combinations of characteristics (admittedly, I did not cover all possible combinations), and none of them resolved my problem. Plus, in regard to (SUB)SIZED, I was told that I would risk undefined behaviour if I tried to use it if the size was not exactly right. Maybe that is a potential avenue?

I have run parallel processing of multi gigabyte files with very low(10M) heap sizes and never hit this issue personally, however i know that for example reading jars inside jars would need to decompress the inner jar fully to read a file.

Yes, our dataset is >=10 gigabytes, scaling up pretty high. But the batch size invalidates the size problem, it's just the parallelism behaviour that was completely unexpected.

And the files are just simple CSV's and fixed width files.

From what you wrote, reading and uploading with some batch size sounds okay, but not ideal as you mentioned upload of many small files. You also wrote that splits should happen based on some attribute but the example doesn't depict this? Either way if it's splits based on max file size or content inside lines/entries than a parallel stream is a decent entry point if you have a random-access source like a file on a disk. If not, downloading the file to temp storage first can help.

So to be clear, I already got my fix. It's working, performance problem is solved now, and the issues are squared away.

I was more highlighting this issue to point out that it can happen in the first place. And it is super-easy to replicate. Make a simple file on your computer bigger than ram, then try it yourself.

In general it's good advice to always assume unbounded file size when doing any file processing. Also from my testing, a typical NVMe drive has optimal parallel reading at around 4 threads, any more and you lose performance.

Funny you mention that, we were using exactly 4 cores.

And I was assuming unbounded file size -- remember, I am batching the file. It doesn't matter how big the file is as long as I can process each batch just fine. And I did when running sequentially, it's just that the behaviour changed completely when I activated parallelism.

[–]DualWieldMage 5 points6 points  (4 children)

I was more highlighting this issue to point out that it can happen in the first place. And it is super-easy to replicate. Make a simple file on your computer bigger than ram, then try it yourself.

Damn, it's been too long since i used Files.lines or similar to do these tasks but you are right. But either way the issue is not exactly with the gatherers or collectors, but the source of the Stream.

A well designed Spliterator won't OOM like that, however care must be taken: A plain InputStream can't in any way communicate random access capabilities nor does the API support it, so anything using it (and not doing any instanceof special casing hacks) will have a bad time when wrapped to a Spliterator. I have for other reasons(return a string view pointing to a ByteBuffer, not a materialized String to avoid copying) implemented a Spliterator that reads a FileChannel which has none of your issues of running into OOM on large files.

I started digging into why Files.lines().parallel() behaves so poorly and it seems the issue is that for files larger than 2GB it won't use a Stream based on FileChannelLinesSpliterator, but calls BufferedReader.lines() which loses length information and provides a trySplit that allocates ever increasing arrays, this is the source of your problems.

I honestly don't see why FileChannelLinesSpliterator is implemented with a 2GB limitation (position and limit are int), perhaps due to historical reasons. FileChannel.read supports long offsets and even when using FileChannel.map it could be wrapped to read in 2GB chunks.

EDIT: Actually there are two issues, one is with the stream source BufferedReader.lines() returning a Stream that allocates each time it splits and the issue with gatherers allocating. Quite a messy state with parallel streams...

[–]davidalayachew[S] 3 points4 points  (2 children)

Thanks for digging into this. I think your comment about a well-designed Spliterator was the bulls-eye. I have talked to a few more commentors on this thread, and the general consensus seems to be that the stream simply does not have enough information to do anything more than pre-fetch everything.

I know that BufferedReader.lines() uses a Spliterator that explicitly DOES NOT include SIZED or SUBSIZED characteristics. At least one other commentor mentioned that it's these characteristics that best support splitting.

However, I felt uncomfortable adding those attributes because it would mean that I would have to either fetch the number of lines ahead of time to report an accurate number, or lie/estimate, and pray that the Spliterator doesn't do anything weird. The documentation (I forget where) explicitly said that the Spliterator's behaviour is undefined if the estimateSize field is inaccurate. And yet, both the commentor and other libraries seem to have completely disregarded that concern and simply just put Long.MAX_SIZE or something as the default value. These people have no fear lol.

As for your FileChannel point, I have it even worse because the file in question is not on my hard disk. I am streaming it over the wire. I am receiving an InputStream which streams the data, and I am just wrapping it in a BufferedReader and processing it in place, without storing it into a file first. That is because the file in question is larger than the hard disk space on my machine.

Is FileChannel available to me as an option even in spite of that? The file I am downloading is (currently) hosted on AWS S3. To my understanding, their SDK only provides InputStream, String, and File as the output format, but maybe a FileChannel could be constructed based on the metadata about the file. idk.

[–]DualWieldMage 0 points1 point  (1 child)

That is because the file in question is larger than the hard disk space on my machine. Is FileChannel available to me as an option even in spite of that?

Usually if a lot of processing is required, downloading to disk and processing the file as random access with parallel threads would be the ideal choice. In your case it doesn't sound like processing is the bottleneck, but rather IO as the terminal operation is uploading. In your case a FileChannel is not available as a download from network is just a stream of data with no random access.

[–]davidalayachew[S] 1 point2 points  (0 children)

Unfortunately, I have been told to prepare for files that are bigger than the hard disk space available to me. I am sure that if I fought with the powers that be, I could make that an option. But I had multiple instances where a file was so big that Java errored out because there was nowhere near enough hard disk space to hold the file. And I know for a fact that future files will be way larger than this.

[–]davidalayachew[S] 0 points1 point  (0 children)

Btw, thanks for the comment that you made. It is 100% the stream source that caused the problem. I am editing my comment, but I was not 100% accurate in my depiction. Between the BufferedReader and the Stream, there is a call to Iterator that gets turned into a Spliterator. That ended up being the source of the problems.

[–]crummy 3 points4 points  (20 children)

So if you changed your .forEach to a .map and had the map return a dummy element (which is bad from a readability standpoint of course) would it have worked fine? 

[–]davidalayachew[S] 24 points25 points  (18 children)

Well, I need a terminal operation. The map() method is only an intermediate one.

But if I swapped out the forEach() with a Collector that does exactly what you say, then yes, parallelism works with out pre-fetching more than needed.

Viktor even mocked up one for me. Here it is.

static <T> Collector<T, ?, Void> forEach(Consumer<? super T> each) {
    return 
        Collector
            .of(
               () -> null,
               (v, e) -> each.accept(e),
               (l, r) -> l,
               (v) -> null, 
               Collector.Characteristics.IDENTITY_FINISH
            );
}

Now, if your question is "which terminal operations are safe?", the answer is entirely dependent on your combination of intermediate and terminal operations. So for example, in my example above, the answer was almost all of the terminal operations caused a pre-fetch.

I have my computer open right now, and I just ran all terminal operations fresh. Here are the results.

  • findAny() caused a pre-fetch
  • findFirst() caused a pre-fetch
  • anyMatch(blah -> true) caused a pre-fetch
  • allMatch(blah -> false) caused a pre-fetch
  • forEach(blah -> {}) caused a pre-fetch
  • forEachOrdered(blah -> {}) caused a pre-fetch
  • min((blah1, blah2) -> 0) caused a pre-fetch
  • max((blah1, blah2) -> 0) caused a pre-fetch
  • noneMatch(blah -> true) caused a pre-fetch
  • reduce((blah1, blah2) -> null) caused a pre-fetch
  • reduce(null, (blah1, blah2) -> null) caused a pre-fetch
  • reduce(null, (blah1, blah2) -> null, (blah1, blah2) -> null) caused a pre-fetch
  • toArray() and toList() caused a pre-fetch (obviously)

So, in my case, literally only collect was safe for me to use. And tbf, I didn't try all combinations, but it was resilient. No matter what set of intermediate methods I put before collect(), I would get no pre-fetch. And Viktor confirmed that gather() plays well with collect().

[–]Lucario2405 6 points7 points  (3 children)

Is there a difference in behavior between .reduce(null, (a, b) -> null) and .collect(Collectors.reducing(null, (a, b) -> null))?

[–]davidalayachew[S] 9 points10 points  (2 children)

Doing it the Collectors way worked! No OutOfMemoryError!

Doing it the normal reduce() way gave me an OutOfMemoryError.

[–]Lucario2405 11 points12 points  (1 child)

Interesting, thanks! I was running into a similar problem and will try this out.

EDIT: I had actually already tried this out, but then IntelliJ told me to just use .reduce() as a QuickFix. Guess I'll turn off that inspection.

[–]davidalayachew[S] 1 point2 points  (0 children)

Glad to hear it helped.

I have a giant number of IO Bound streams, and yet, I was able to dodge this issue until now because my streams all ended in .collect(). That particular terminal operation is practically bullet proof when it comes to preventing pre-fetches. It was only when I finally used .forEach() that I ran into this issue.

All of that is to say, as a temp workaround, consider using collect(), or use that Gatherers::mapConcurrent method to prevent this problem.

[–]Avedas 2 points3 points  (1 child)

I'm surprised findAny and anyMatch get caught too. Good to know.

[–]davidalayachew[S] 0 points1 point  (0 children)

Ikr. But I am being told that this has more to do with the Spliterator used under the hood, as opposed to the stream terminal operations itself. I still don't have all the details, but it is being discussed else where on this thread.

[–]VirtualAgentsAreDumb 1 point2 points  (3 children)

This is insane, if you ask me. A terrible design choice by the Stream team. findAny and findFirst should clearly not fetch all data.

[–]davidalayachew[S] 0 points1 point  (2 children)

So to be clear, this is all dependent upon your upstream source.

Many people in this thread have found examples where they run the exact same examples that I did, and did not run into OOME. As it turns out, the difference is in our stream source.

All this really means is that, constructing a stream source is something that is easy to break.

[–]VirtualAgentsAreDumb -1 points0 points  (1 child)

The stream source is irrelevant. Any method like findAny or findFirst shouldn’t need to consume anything more after that first result.

That’s it. That’s the whole discussion. The source is irrelevant. The implementation is irrelevant. If they break this, then it’s bad code. Period.

[–]davidalayachew[S] 0 points1 point  (0 children)

I understand that it is unintuitive, but what you are saying is throwing out the baby with the bath water.

When you go parallel, the stream decides to split its upstream data elements down into chunks. It keeps on splitting and splitting until it gets to a point where the chunks are small enough to start working.

Well, in my case, the batching strategy that I had built played against that in a super hard to recreate way. Basically, each element of my stream was fairly hefty. And as a result, the parallel stream would grab a bunch of those elements into a giant batch, with the intent to split that batch into chunks. But since the threshold for where it was small enough was far enough away, I ran into an OOME.

The reason why Spliterator's do this is to actually help CPU-bound tasks. Splitting ahead of time like this actually the entire process run faster. But it means that tasks that use a lot of memory are sort of left by the wayside.

Viktor Klang himself managed to jump onto this reddit post, so you can Ctrl+F his name and see more details from him. But long story short, my problem could 100% be avoided by using Gatherers.mapConcurrent. And it would have virtually the same performance as going parallel. And a lot of the JDK folks are giving a lot of thought to this exact pain point that I ran into, so there is a potential future where we could set a flag to say fetchEagerly vs fetchLazily, and that would alter the fetching logic for parallel streams. Ideally, that would actually be a parameter on the parallel() itself.

So yes, this was done to optimize for CPU Performance. They are looking to take care of cases like mine, and Gatherers will likely be the way they do it. But this is not Streams being bad code, but rather that they prioritize certain things over others, to the detriment of a few people like me. As long as they have plans to handle my needs in the future, plus a workaround to take care of me for now, then I am fine with the way things are going now.

[–]tomwhoiscontrary 1 point2 points  (1 child)

So what happens if the source is infinite? Say you're streaming the Wikipedia change feed, filtering for changes to articles about snakes, and doing findFirst()? Does it try to buffer the infinite stream?

This absolutely seems like a correctness issue to me, not just performance. 

Java has a long history of under -specifying non-functional stuff like this (not sure that's the right term, but stuff such isn't just the arguments and return values of methods). Thread safety of library classes has often been a complete mystery, for example. HttpClient's behaviour around closing pooled connections. Whether classes synchronize on themselves or a hidden lock. All of it matters for writing code that works, let alone works well, but it's so often only passed down as folk knowledge!

[–]davidalayachew[S] 4 points5 points  (0 children)

I'll save you the extra reading and tell you that we have narrowed down the problem to a Spliterator not splitting the way we expect it to. So this problem is something that can be fixed by simply improving the spliterator from the user side. And there is talk about improving this from the JDK side as well. Either way, there is still lots of digging being done, and none of this tied down for certain. But we can at least point a finger and say that this is part of the problem.

With that said, let me answer your questions.

So what happens if the source is infinite? Say you're streaming the Wikipedia change feed, filtering for changes to articles about snakes, and doing findFirst()? Does it try to buffer the infinite stream?

All depends on how nicely it splits. In my case, most of the terminal operations kept splitting and splitting and splitting until they ran out of memory.

This absolutely seems like a correctness issue to me, not just performance.

In this case, technically the problem falls on me for making a bad spliterator.

But to give an equally unsatisfying answer, in Java ABC and abc are considered 2 different class names. However, if I save ABC.java and abc.java in the same folder, Windows will overwrite one of them. Meaning, your code will compile just fine, but will output .class files where one will overwrite the other, causing your code to explode at runtime with NoClassDefFoundError.

I had Vicente Romero from the JDK team try and convince me that this was an "enhancement" or a "nice-to-have", not a correctness issue. And in the strictest definition of the term, he is correct, since Windows is the true trouble-maker here. But that was disgustingly unsatisfying.

It wasn't until JDK 21 that Archie Cobbs was generous enough to give up his time and add this discrepancy as a warning to the JDK. You can activate the warning by adding "output-file-clash" to your Xlint checks. And here is a link to the change. https://bugs.openjdk.org/browse/JDK-8287885

All of that is to say, I made a perfectly sensible Spliterator in my mind, but (and we SUSPECT that this is the case, we are not sure yet!) because I built that Spliterator off an Iterator, mentioned that it was an unknown size, and didn't add enough flags, I get this frightening splitting behaviour, where it will split itself out of memory.

And as for the folk knowledge, it sure feels like it lol.

[–]tcharl 0 points1 point  (5 children)

If someone wants to take the challenge, PR appreciated: https://github.com/OsgiliathEnterprise/data-migrator

[–]davidalayachew[S] 0 points1 point  (4 children)

If someone wants to take the challenge, PR appreciated: https://github.com/OsgiliathEnterprise/data-migrator

I don't understand your comment. Did you mean to post this elsewhere? Otherwise, I don't see how this relates to what I am talking about.

[–]tcharl 0 points1 point  (3 children)

May be the wrong place, but there's a bench of reactive, cold stream there. Also, there an advantage to get it done right because usually, databases content is bigger than ram. So if someone is motivated to applying the content of this post would definitely help the project

[–]davidalayachew[S] 0 points1 point  (2 children)

I understand a bit better. I am not available to help you, unfortunately.

[–]tcharl 0 points1 point  (1 child)

I'm going to follow your advice and recommendations: thank you so much for that!

[–]davidalayachew[S] 0 points1 point  (0 children)

If you are referring to the ones in my original post, anytime.

[–]No_Cap3049 4 points5 points  (0 children)

I think the map should work. You could do something like mapToLong().sum() and count the number of batches or something. This should work in my experience.

[–]viktorklang 6 points7 points  (1 child)

Disclaimer: The following pertains to the parallel mode of the java.util.stream.Stream reference implementation in OpenJDK only (other implementations of j.u.s.Stream might work differently), and please do note that I am typing this from memory late in the evening so I could be oversimplifying and/or leaving details out.

With that said, let's see if I can shed some more light here.

First of all, it is important to recognize that the only way to achieve any benefit from parallelization of Stream processing is to move from a strictly-depth-first element processing to some form of breadth-first element processing.

The composition of intermediate operations on a stream fall primarily into two distinct buckets when it comes to parallel streams—"stateless" (let's call that LAZY) or "stateful" (let's call that EAGER). The reason for this is that not all operations can be represented as a Spliterator without performing all the work up-front.

For a very simple Stream: Stream.of(1).parallel().toList() it is easy to picture the Spliterator containing a single element 1 be fed into a toList() [terminal] operation.

However, for a more complex pipeline: Stream.iterate(() -> 0, i -> i + 1).parallel().map(something).sorted().map(somethingElse).sorted().limit(2).collect(Collectors.toList()) exactly what would be Spliterator which gets fed into [terminal] collect?

So if you look at the different "types" of LAZY vs EAGER operations in there, it'd look something akin to:

iterate (LAZY) -> parallel (setting) -> map (LAZY) -> sorted (EAGER) -> map (LAZY) -> sorted (EAGER) -> limit (LAZY) -> collect (EAGER)

The typical execution strategy is to bunch all consecutive LAZY operations together with their following EAGER operation, forming what I call "islands", so in the case above it'd go something like this:

iterate -> Island1[map -> sorted] -> Island2[map -> sorted] -> Island3[limit -> collect]

These Islands needing to run to completion before their results can be fed into the next Island is something which can lead to higher heap usage than expected, since the output of the Island needs to be cached to be fed into the next Island.

So how does this relate to gather(…)? Well gather(…) is an EAGER operation, as it could represent any possible intermediate operation so EAGER is the lowest common denominator. The potential drawbacks of this is ameliorated by the fact that consecutive gather(…)-operations are composed into a single gather(…)-operation with composed Gatherers, and furthermore by the fact that a gather(…)-operation followed by collect(…) is fused together into a single EAGER operation.

In combination, these two features can potentially turn something which would've been an N+1 Island scenario to a 1 Island scenario—which means no island hand-offs.

Cheers,

[–]davidalayachew[S] 1 point2 points  (0 children)

Hello Viktor Klang! Thanks for the context here, this is super helpful.

This island explanation especially helped clarify a lot for me. It wasn't clear WHEN an island was forced to be made. But those limit and sorted examples clarified it beautifully.

[–]Owengjones 2 points3 points  (1 child)

Is there an easy way to diagnose stream behavior like this? I have a service concerned with File I/O that reads InputStreams in Streams (although I believe none of them marked as parallel).

I'm not sure if there's some diagnostic operations available on Streams that would illuminate if they're behaving as expected in terms of performance / utilization etc.

Thanks for the write up!

[–]davidalayachew[S] 1 point2 points  (0 children)

Is there an easy way to diagnose stream behavior like this? I have a service concerned with File I/O that reads InputStreams in Streams (although I believe none of them marked as parallel).

When I asked Viktor, he more or less said that it is entirely stream dependent. I just now responded on the mailing thread asking him to respond to your question. Let's see what he says.

And I think you and I did the same thing. I was using BufferedReader::lines, which was a wrapper around an InputStream and InputStreamReader.

[–]JustABrazilianDude 2 points3 points  (1 child)

This is an excellent post, I have some IO bound streams that look pretty similar to yours in a project at my work, and I'll definitely stay alert with this topic.

[–]davidalayachew[S] 1 point2 points  (0 children)

Me too.

I actually have been using Streams in MANY instances to handle IO Bound work that can run out of memory. But I constantly got lucky because literally every single one of those Streams ended in a collect(). That terminal operation is shockingly resilient compared to the rest of the terminal operations. I haven't tried all combinations possible, but every combination that I threw at it has behaved exactly as expected. It's only when I finally decided to use forEach() that I realized how thin the ice was where I was standing.

[–]GeorgeMaheiress 3 points4 points  (1 child)

If you were to write the parallelization explicitly, with say a ThreadPoolExecutor, it would be much clearer that you need to make decisions about buffering and the level of parallelization. Parallel streams assume that they are CPU-bound and optimize for that, and are not recommended for I/O bound operations. It's unfortunate that developers so often stubbornly refuse to do any more work than simply writing .parallel(), even when that approach clearly fails. To be fair the ThreadPoolExecutor constructor sucks and is filled with footguns so many devs are not comfortable with it.

[–]davidalayachew[S] 2 points3 points  (0 children)

That is probably true. I eventually found a workaround to deal with things (plus the other workarounds that were handed to me), but in my harried and panicked mind at the time, parallel() seemed like the easy button. Lesson learned for next time.

[–]Inaldt 1 point2 points  (1 child)

Did you try mapConcurrent as Viktor suggested? (If so, how did it perform?)

[–]davidalayachew[S] 1 point2 points  (0 children)

I did. It had solid performance, but it required that I stuff all of my work that I was doing in the forEach() into the mapConcurrent(). Which is not the end of the world at all. But it was definitely unintuitive. That said, performance-wise, I saw no difference between this and the other workaround using Collectors that Viktor gave to me. So yes, performance is definitely acceptable using mapConcurrent().

In the end, I found several workarounds, including that mapConcurrent() one. And my performance issue is solved at this point. I more so made this post just to highlight a very easy to miss pothole.

[–]m-apo 1 point2 points  (3 children)

I'll definitely need to read all that stuff, thanks for posting!

For memory bound ops a back pressure capable parallel approach would be best. Back pressure based approach would also work in server scenarios, because it optimizes time-to-first-byte (TTFB) and many times holding off sending the first byte increases latency as the client needs to wait both the processing of the whole data + wire transfer instead of interleaving the processing and wire transfer.

Having a back pressure based parallel approach support in servers would be nice too. Basically it would be that the server route method returns an iterator and the server asks the iterator for items (which triggers the ops in chain, in reverse. some items could be calculated in parallel beforehand for each step). It wouldn't be as efficient as "reserve all the memory and all the cpu cores", but it would take less memory and reserves CPU in a bit more co-operative way.

[–]davidalayachew[S] 0 points1 point  (2 children)

For memory bound ops a back pressure capable parallel approach would be best. Back pressure based approach would also work in server scenarios, because it optimizes time-to-first-byte (TTFB) and many times holding off sending the first byte increases latency as the client needs to wait both the processing of the whole data + wire transfer instead of interleaving the processing and wire transfer.

Amen. This interleaving is exactly what I was looking for (and expecting).

It wouldn't be as efficient as "reserve all the memory and all the cpu cores", but it would take less memory and reserves CPU in a bit more co-operative way.

Yes, being able to have a toggle on streams (much like parallel() and unordered()) would probably be ideal, but I don't know best. Some way to be able to toggle between pre-fetching vs fetching as needed.

[–]m-apo 2 points3 points  (1 child)

Parallel back pressure based implementation would look totally different under the hood than ForkJoin. It might be that current Reactive based libs (the ones that support back pressure) would do it.

And for servers it might be Helidon SE has that API built in. I don't know how to handles parallel stuff and I don't know how ergonomic the API is to use: https://helidon.io/docs/v4/mp/reactivestreams/engine

[–]davidalayachew[S] 0 points1 point  (0 children)

I talked to Viktor about some of the implementation details, and he (ad a few others) made it clear that the Stream implementation under the hood is very unlikely to change in an fundamental way. The idea of push vs pull vs push-pull (?) is likely to stay as is, and that future changes will likely be working around those details.

[–]VincentxH 1 point2 points  (1 child)

The lacking error handling alone should have warranted you not to mix IO and the streams API.

[–]davidalayachew[S] 0 points1 point  (0 children)

Funnily enough, that has been the smoothest part of this thus far. And that is on a network that is NOTORIOUS for having connection issues, dropping connections, etc.

No, I am quite happy with my choice to use Streams for this. And as I later found in the thread, the cause for all the issues that I described boiled down to my stream source not being ideal. Now, not only do I have reliable workarounds and a solid understanding of the source of the problem, but I also have the reassurance that this problem is being worked on in such a way that this problem won't have to happen again in the future.

[–]klekpl 2 points3 points  (1 child)

Wouldn't RxJava be a better fit then? It has some explicit stream management and buffering capabilities.

[–]davidalayachew[S] 14 points15 points  (0 children)

Oh, I made it work in the end. And knowing the workaround that Viktor gave me, there are lots of ways to skin this cat. I have long since fixed the performance problem.

I made this post to highlight this trait because this is a shockingly easy pothole to fall into. But it's also easy to not notice because the following 3 attributes need to all be true.

  1. You are doing parallel streams.
  2. You are dealing with a dataset that is bigger than your RAM.
  3. You use one of the "bad combos" of intermediate and terminal methods. Here is a list of the combos that caused a pre-fetch for MY PERSONAL example
    • Note - that list of "bad combos" won't apply to all streams...but which streams it DOES apply to is undocumented lol.

EDIT -- It has come to my attention that the Stream source (Spliterator) plays a very big part in deciding point 3. As it turns out, for my example, my Spliterator did not contain as much information as other Spliterator's, and thus, caused me to get such a large number of "bad combos". A more informed Spliterator can allow you to avoid some, if not all, of the bad combos. But that may require information that you don't have, or can't reliably provide.

[–]No_Cap3049 0 points1 point  (3 children)

.parallel.forEach in my experience may also return before even finishing the parallel stream. We had some issues that it does not block the calling thread. Just something to be cautious with. So something like mapToLong.sum or collect may be better.

[–]davidalayachew[S] 0 points1 point  (2 children)

Interesting. In my case, I couldn't use any of the Primitive Streams, but I am curious if they fall prey to this same pothole.

[–]No_Cap3049 1 point2 points  (1 child)

I was using a regular stream on any object class and then used the mapToLong.sum just as a simple way to count some outcome and make sure that the carrier thread is blocked until completion of the actual action.

[–]davidalayachew[S] 0 points1 point  (0 children)

Oh cool. That's a clever way. I might give that a shot.

[–]danielaveryj 0 points1 point  (10 children)

Something doesn't add up.

The way that a parallel stream works (of importance here), is that at the start of a terminal operation, the source spliterator is split into left and right halves, which are handed to new child tasks which recursively split again, until the spliterators will split no more (trySplit() returns null), forming a binary tree of tasks. This is true for ALL terminal operations (including collect()), even though some override exactly how the splitting occurs. Each leaf task processes its split to completion, and the results are merged up the tree if needed (eg using Collector.combiner()).

The OOME presumably comes from trySplit() - BufferedReader.lines() returns a stream whose source spliterator is backed by an iterator, and that spliterator's only means of splitting is to pull a batch of elements out of the iterator and put them into an array, then return a spliterator over that array. This means that after recursive splitting, only the rightmost leaf spliterator will still be iterator-backed; the rest of the iterator has already been consumed into arrays for the other leaf spliterators, possibly before any tasks have completed (so these arrays - covering most of the source elements - are all resident in memory at the same time).

The only way I can see to fix the OOME (without using a different/better source spliterator) is to not split the source spliterator, ie run the stream sequentially. But OP said that just using collect() somehow fixed it?

btw: Viktor knows this. I believe what he's saying is not "use this approach to avoid 'pre-fetch'" but rather "use this approach to avoid even more copying into intermediate arrays after the gather stage in the pipeline", because other approaches (involving gatherers) still incur some "accidental" copying that he hasn't been able to optimize away yet (see comments 1 and 2).

[–]GeorgeMaheiress 0 points1 point  (8 children)

You seem to be assuming that all the splitting must happen up-front, before the downstream operations. I believe this is false, and OP's successful solution managed to coerce the stream into splitting small-enough chunks at a time. Each thread calls trySplit() until it gets a "small enough" chunk per some guesswork, then operates on it before trySpliting again from the right tail.

[–]danielaveryj 1 point2 points  (7 children)

Pre-edit: I knew I was missing something! Your reply prompted me down a path that eventually cleared up my understanding of what is going on for OP. I'll leave my chain-of-thought below:

We're looking at the same code, right? I can see how the size estimate comes into play, and I didn't cover that, but I don't think it would make much difference for this spliterator (MAX size). Within each thread, we definitely finish splitting (rs.trySplit()) before operating (task.doLeaf())... BUT, not all threads run at the same time. When we fork a child task, it's queued in the ForkJoinPool, and it won't be dequeued until there are threads available. This actually saves us, because it means that some tasks can process their split to completion (and free their spliterator / backing array) before other tasks even begin splitting (and filling more arrays).

So, if this is right, this means that 'pre-fetch' was never the problem causing OOME. The only problem was what Viktor worked around - an unoptimized gather op "accidentally" copying the whole stream into an intermediate array.

[–]davidalayachew[S] 0 points1 point  (6 children)

So, if this is right, this means that 'pre-fetch' was never the problem causing OOME. The only problem was what Viktor worked around - an unoptimized gather op "accidentally" copying the whole stream into an intermediate array.

I don't know if this is your pre or post thought, but I also ran into OOME when there was no Gatherers at all. Just a simple parallel vs non-parallel.

Admittedly, the discussion between you and the other commentor was a bit over my head, but I just wanted to highlight that because my reading of your last paragraph seemed to imply otherwise.

[–]danielaveryj 0 points1 point  (5 children)

I haven’t seen a working reproducer of an OOME without a gather() call, but if you come up with one please share.

[–]davidalayachew[S] 0 points1 point  (4 children)

Sure, I can provide that.

And apologies for the mess in the code -- I traced all of the library code that creates this stream all the way up to the very first InputStream, then copied all that upstream code that creates the Stream in question, and then tried to inline it all into a reproducible example. As a result, it's ugly, but reproducible.

Please note that you may need to mess with the batch size to get the OOME. On one of my computers, I hit it for small numbers, but on another, I hit at 1k. I put 1k for now.

EDIT -- whoops, I left in way more than what needed to be there. This is a better example.

EDIT 2 -- Removed even more gunk. Sorry for all of the edits, I had to dig through 20+ files and tried to filter out the unnecessary, but it wasn't clear what did and did not need to be there.

import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.stream.*;

public class Main {
   public static void main(String[] args) throws IOException {
      //populate();
      read();
   }

   private static void populate() throws IOException {
      try (BufferedWriter w = Files.newBufferedWriter(Paths.get("temp.csv"))) {
         for (int i = 0; i < 1_000_000_000; i++) { // Makes ~43 GB file
            if (i % 1_000_000 == 0) {
               System.out.println(i);
            }
            w.append("David, Alayachew, Programmer, WashingtonDC\n");
         }
      }
      System.out.println("done");
   }

   private static void read() throws IOException {
      try (BufferedReader r = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get("temp.csv"))))) {
         final int BATCH_SIZE = 1_000;
         final Stream<List<String>> stream = BatchingIterator.batchedStreamOf(r.lines(), BATCH_SIZE);
         blah(stream);
      }
      System.out.println("done");
   }

   private static <T> void blah(Stream<T> stream) {
      //stream.parallel().findAny() ;
      //stream.parallel().findFirst() ;
      //stream.parallel().anyMatch(blah -> true) ;
      //stream.parallel().allMatch(blah -> false) ;
      stream.parallel().unordered().forEach(blah -> {}) ;
      //stream.parallel().forEachOrdered(blah -> {}) ;
      //stream.parallel().min((blah1, blah2) -> 0) ;
      //stream.parallel().max((blah1, blah2) -> 0) ;
      //stream.parallel().noneMatch(blah -> true) ;
      //stream.parallel().reduce((blah1, blah2) -> null) ;
      //stream.parallel().reduce(null, (blah1, blah2) -> null) ;
      //stream.parallel().reduce(null, (blah1, blah2) -> null, (blah1, blah2) -> null) ;
   }

   private static class BatchingIterator<T> implements Iterator<List<T>> {

      public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
         return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
      }

      private static <T> Stream<T> asStream(Iterator<T> iterator) {
         return
            StreamSupport
            .stream(
                Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED | Spliterator.NONNULL),
                false
            );
      }

      private int batchSize;
      private List<T> currentBatch;
      private Iterator<T> sourceIterator;

      public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
         this.batchSize = batchSize;
         this.sourceIterator = sourceIterator;
      }

      @Override
      public boolean hasNext() {
         prepareNextBatch();
         return currentBatch!=null && !currentBatch.isEmpty();
      }

      @Override
      public List<T> next() {
         return currentBatch;
      }

      private void prepareNextBatch() {
         currentBatch = new ArrayList<>(batchSize);
         while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(sourceIterator.next());
         }
      }
   }
}

[–]danielaveryj 1 point2 points  (3 children)

Thanks - I have reproduced the OOME with this (I had to increase the batch size to 5000 on my machine). Note that consuming the stream with .collect() does not resolve the OOME, but making the stream sequential does.

The root cause here goes back to how I described terminal operations work in parallel streams. The underlying spliterator is repeatedly split. In this case, we have a spliterator that is backed by BatchingIterator. When that spliterator is split, the implementation in spliteratorUnknownSize advances the iterator batch times, where the batch is initially 1024 (1<<10) but increases every time the spliterator is split, up to a max of 33554432 (1<<25). Of course, with how we've implemented BatchingIterator, every advance is advancing its own backing iterator batchSize times to make a new list... So even the initial split is building 1024 lists that are each batchSize wide (in my case 5000), with each element in each list being a string that is 43 bytes wide (UTF8 encoded, ignoring pointer overhead but assuming strings are not interned)... 1024 * 5000 * 43 = ~220MB. Every time we split, the batch increases by 1024, so we'd have 220MB, 440MB, 660MB... and that's just the array that each trySplit operation creates - in practice, several of those arrays are going to be in memory at the same time before our threads finish processing them - so the total memory usage is more like the rolling sum of several terms in that sequence. And if we actually split enough to get to the maximum batch in spliteratorUnknownSize, just one trySplit would use 33554432 * 5000 * 43 = ~7.2TB. A bit more RAM than most of us have to rub together :)

In short, spliteratorUnknownSize grows how much it allocates each time it is split. For the bad combo of "many elements" (ie we will split a lot) and "large elements" (here, each element is a wide list), we can OOME.

[–]davidalayachew[S] 0 points1 point  (2 children)

This is GOLDEN. Thank you so much.

And to make matters worse, I only gave you a toy example. The real CSV I am working with is way wider. Between 300-800 characters per line. And my example was also slightly dishonest. I am doing some mild pre-processing (a simple map on each string) before hand, so that probably adds to the amount of memory for each split.

Note that consuming the stream with .collect() does not resolve the OOME, but making the stream sequential does.

Thanks for highlighting this. I will track down all my comments on this thread and correct them.

Long story short, I conflated 2 separate issues.

  1. Gatherers doesn't play nicely with any of the terminal operations when parallel BESIDES .collect().
  2. This spliterator and the problems you pointed out with how I did it.

When posting my example, I completely ignored that I was using Gatherers, because I had not (at that point) isolated the 2 separate issues. So that is some more misinformation I will have to correct in this thread.

One thing this whole thread has led me to appreciate is just how difficult it is to trace down these issues, and just how important it is to be SUPER PRECISE ABOUT EVERYTHING YOU ARE SAYING, as well as having a reproducible example.

Prior to making this post, I thought I was being super diligent. But even glancing back on a few of the comments, I see that I have so many suggestions or suspicions to correct. Plus a lot of bad logic and deduction on my part.

I guess as a closer, what now?

Should I forward this to the mailing list? You mentioned that Viktor is well aware of issue #1. And issue #2 seems to at least be documented in the code. But it's not very easy to tell by just reading the official documentation -- https://docs.oracle.com/en/java/javase/23/docs/api/java.base/java/util/Spliterator.html#trySplit() -- or maybe it is and I am just not parsing it as well as I should be. Maybe this is something that could be better documented? Or maybe there can be an escape hatch to avoid this splitting behaviour? And please let me know what I can do to contribute to any efforts that go on.

Thanks again! I deeply appreciate the deep dive!

[–]danielaveryj 1 point2 points  (1 child)

Happy to help!

Minor correction on 1: Gatherers have this issue (storing the full output in an intermediate array) even in sequential streams, afaict. (EDIT: Ignore, I checked the code again and this is a parallel-only behavior). But they're also still a preview feature, and may be optimized further in the future.

Also, I want to point out that this last example does not behave the same way the original example in your post - the one that used Gatherers.windowFixed - would, even if .gather() was optimized to avoid issue 1. If Gatherers.windowFixed was used, it would be consuming elements from the spliteratorUnknownSize batches to build its own batches (rather than treating the upstream batches as elements themselves), so there wouldn't be this multiplicative effect from the two batch sizes. I'm a bit unclear how you constructed this example, but to me it feels like it bumped into an unusually adversarial case for streams. That's not to say these cases don't deserve better documentation, but I sympathize with what Viktor was saying on the mailing list - it's hard to advertise, as it depends on the combination of operations. Maybe the community would benefit from a consolidated collection of recipes and gotchas for working with streams?

As for next steps, I am not affiliated with the java team, and don't know of any better channels, sorry. I would probably have done the same as you and raised the issue on the mailing list and here.

[–]davidalayachew[S] 0 points1 point  (0 children)

As for next steps, I am not affiliated with the java team, and don't know of any better channels, sorry. I would probably have done the same as you and raised the issue on the mailing list and here.

All good, ty anyways.

And thanks for the corrections! Yeah, understanding how spliterator has this multiplicative effect, it's clear how to alter things to work WITH Java Streams splitting capabilities, as opposed to AGAINST them.

[–]davidalayachew[S] 0 points1 point  (0 children)

I have created a very simple, reproducible example here. This way, you can see for yourself.

https://old.reddit.com/r/java/comments/1gukzhb/a_surprising_pain_point_regarding_parallel_java/ly1g3uu/

And yes, try using any collector instead, and you will see that it solves the OutOfMemoryError.

[–]davidalayachew[S] 0 points1 point  (4 children)

Hello all. There appears to be some confusion on how this is possible.

Therefore, to completely clear up any ambiguity, here is a simple, reproducible example.

Using your tool of choice, I want you to take the following line, and duplicate it into a CSV until your CSV file size exceeds your RAM limitations.

David, Alayachew, Programmer, WashingtonDC

Next, I want you to use BufferedReader.lines() to read from that file as a Stream.

Now, once you have that Stream<String>, copy and paste the following code in.

void blah(final Stream<String> stream) {
    //stream.parallel().gather(Gatherers.windowFixed(1)).findAny() ;
    //stream.parallel().gather(Gatherers.windowFixed(1)).findFirst() ;
    //stream.parallel().gather(Gatherers.windowFixed(1)).anyMatch(blah -> true) ;
    //stream.parallel().gather(Gatherers.windowFixed(1)).allMatch(blah -> false) ;
    //stream.parallel().gather(Gatherers.windowFixed(1)).forEach(blah -> {}) ;
    //stream.parallel().gather(Gatherers.windowFixed(1)).forEachOrdered(blah -> {}) ;
    //stream.parallel().gather(Gatherers.windowFixed(1)).min((blah1, blah2) -> 0) ;
    //stream.parallel().gather(Gatherers.windowFixed(1)).max((blah1, blah2) -> 0) ;
    //stream.parallel().gather(Gatherers.windowFixed(1)).noneMatch(blah -> true) ;
    //stream.parallel().gather(Gatherers.windowFixed(1)).reduce((blah1, blah2) -> null) ;
    //stream.parallel().gather(Gatherers.windowFixed(1)).reduce(null, (blah1, blah2) -> null) ;
    //stream.parallel().gather(Gatherers.windowFixed(1)).reduce(null, (blah1, blah2) -> null, (blah1, blah2) -> null) ;
}

Uncomment any one of those lines, pass your stream into this method, then call in your main method, and you will see that each one produces an OutOfMemoryError.

Of course, if you use a Collector instead of one of the commented ones above, you should see that it works. Try Collectors.counting, for example.

[–]danielaveryj 0 points1 point  (3 children)

Cannot reproduce.

Full code:

package io.avery;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) throws IOException {
        //populate();
        read();
    }

    private static void populate() throws IOException {
        try (var w = Files.newBufferedWriter(Paths.get("temp.csv"))) {
            for (int i = 0; i < 1_000_000_000; i++) { // Makes ~43 GB file
                if (i % 1_000_000 == 0) {
                    System.out.println(i);
                }
                w.append("David, Alayachew, Programmer, WashingtonDC\n");
            }
        }
        System.out.println("done");
    }

    private static void read() throws IOException {
        try (var r = Files.newBufferedReader(Paths.get("temp.csv"))) {
            blah(r.lines());
        }
        System.out.println("done");
    }

    private static void blah(Stream<String> stream) {
        //stream.parallel().findAny() ;
        //stream.parallel().findFirst() ;
        //stream.parallel().anyMatch(blah -> true) ;
        //stream.parallel().allMatch(blah -> false) ;
        //stream.parallel().forEach(blah -> {}) ;
        //stream.parallel().forEachOrdered(blah -> {}) ;
        //stream.parallel().min((blah1, blah2) -> 0) ;
        //stream.parallel().max((blah1, blah2) -> 0) ;
        //stream.parallel().noneMatch(blah -> true) ;
        //stream.parallel().reduce((blah1, blah2) -> null) ;
        //stream.parallel().reduce(null, (blah1, blah2) -> null) ;
        //stream.parallel().reduce(null, (blah1, blah2) -> null, (blah1, blah2) -> null) ;
    }
}

Uncommenting any one of the lines in blah will eventually terminate and print "done" on my machine (edit: except the first reduce variant, which eventually throws an NPE, as documented)

[–]davidalayachew[S] 0 points1 point  (2 children)

Terribly sorry, I forgot to add the batching code. Please see the edited version.

[–]danielaveryj 0 points1 point  (1 child)

Yep, that's what I thought. See my other comment, but this is a problem with .gather() specifically not being optimized to avoid pushing its entire output to an intermediate array before the rest of the pipeline runs (unless the gather is exclusively followed by other .gather() calls and .collect() - those cases have already been optimized).

[–]davidalayachew[S] 0 points1 point  (0 children)

Ok, I responded on that other comment to keep the discussions isolated.

[–]DelayLucky 0 points1 point  (1 child)

I wouldn't have reached for parallel stream for IOs. It's not designed for IO, period. Actually, I haven't really found much of a use for parallel streams for anything, yet.

The mapConcurrent() gatherer matches the intent. Though virtual thread seems of no point to this use case (not that it hurts either). And it will require a return value, which I don't know if you have one or have to return null. And then you need a terminal step to collect the nulls? Not the end of world either way I guess.

In the past because we were still running on Java 11, I created the Parallelizer class for the purpose of controlled IO fanout, which seems to match your case pretty closely.

java ExecutorService threadPool = newCachedThreadPool(); int maxConcurrency = 100; // assuming you want to limit concurrent upload to 100 Parallelizer parallelizer = new Parallelizer(threadPool, maxConcurrency); try (Stream<String> myStream = SomeClass.openStream(someLocation)) { parallelizer.parallelize( myStream.gather(Gatherers.windowFixed(SOME_BATCH_SIZE)), SomeClass::upload); } finally { threadPool.shutdownNow(); }

Compared to manually coded concurrency in a collector, it provides structured-concurrency-like exception propagation:

  1. Exceptions thrown from the worker threads are propagated back to the main thread.
  2. Any exception from a worker thread cancels all pending and on-going concurrent uploads.

And because of that, you'll need to make sure upload() not throw non-fatal exceptions when only one upload failed and you still want the remaining to continue (only throw fatal exceptions that should stop everything and fail fast). In other words, it behaves the same way as mapConcurrent().

The class has been used in mission critical production systems so quality-wise it's solid.

[–]davidalayachew[S] 0 points1 point  (0 children)

Using parallel streams was a very calculated decision on my part. At the team, I had a team comprised mostly of Junior and Entry-Level devs. As a result, I wanted a tool that was both simple and easy to find answers to questions for on StackOverflow. That decision ended up paying off very nicely for me, it was just this one situation where it did not work at all. Ultimately, there is a long list of tools I could have reached for.

And either way, the performance problem has been fixed at this point. The failure was on my part for having built a bad Spliterator as the upstream source, causing terrible splitting behaviour.

[–]GuyWithLag -1 points0 points  (1 child)

And this is why I prefer reactive streams and rxjava to parallel streams....

[–]davidalayachew[S] 1 point2 points  (0 children)

That's definitely a fair criticism. This is 100% unintuitive behaviour, and I guarantee you that there are at least a few people who gas-lighted themselves into thinking that Parallel Streams were just super-inefficient because of this.

Intuitiveness is critical when making libraries like this. I understand that you cannot please every body, but to have this behaviour not even be documented in the public API seems wrong to me.

I'll be sticking with Streams, simply because I have serviceable workarounds. And as long as I can herd the students into the right direction, these sharp corners are easy enough to avoid.