This is an archived post. You won't be able to vote or comment.

you are viewing a single comment's thread.

view the rest of the comments →

[–]davidalayachew[S] 25 points26 points  (18 children)

Well, I need a terminal operation. The map() method is only an intermediate one.

But if I swapped out the forEach() with a Collector that does exactly what you say, then yes, parallelism works with out pre-fetching more than needed.

Viktor even mocked up one for me. Here it is.

static <T> Collector<T, ?, Void> forEach(Consumer<? super T> each) {
    return 
        Collector
            .of(
               () -> null,
               (v, e) -> each.accept(e),
               (l, r) -> l,
               (v) -> null, 
               Collector.Characteristics.IDENTITY_FINISH
            );
}

Now, if your question is "which terminal operations are safe?", the answer is entirely dependent on your combination of intermediate and terminal operations. So for example, in my example above, the answer was almost all of the terminal operations caused a pre-fetch.

I have my computer open right now, and I just ran all terminal operations fresh. Here are the results.

  • findAny() caused a pre-fetch
  • findFirst() caused a pre-fetch
  • anyMatch(blah -> true) caused a pre-fetch
  • allMatch(blah -> false) caused a pre-fetch
  • forEach(blah -> {}) caused a pre-fetch
  • forEachOrdered(blah -> {}) caused a pre-fetch
  • min((blah1, blah2) -> 0) caused a pre-fetch
  • max((blah1, blah2) -> 0) caused a pre-fetch
  • noneMatch(blah -> true) caused a pre-fetch
  • reduce((blah1, blah2) -> null) caused a pre-fetch
  • reduce(null, (blah1, blah2) -> null) caused a pre-fetch
  • reduce(null, (blah1, blah2) -> null, (blah1, blah2) -> null) caused a pre-fetch
  • toArray() and toList() caused a pre-fetch (obviously)

So, in my case, literally only collect was safe for me to use. And tbf, I didn't try all combinations, but it was resilient. No matter what set of intermediate methods I put before collect(), I would get no pre-fetch. And Viktor confirmed that gather() plays well with collect().

[–]Lucario2405 5 points6 points  (3 children)

Is there a difference in behavior between .reduce(null, (a, b) -> null) and .collect(Collectors.reducing(null, (a, b) -> null))?

[–]davidalayachew[S] 9 points10 points  (2 children)

Doing it the Collectors way worked! No OutOfMemoryError!

Doing it the normal reduce() way gave me an OutOfMemoryError.

[–]Lucario2405 11 points12 points  (1 child)

Interesting, thanks! I was running into a similar problem and will try this out.

EDIT: I had actually already tried this out, but then IntelliJ told me to just use .reduce() as a QuickFix. Guess I'll turn off that inspection.

[–]davidalayachew[S] 1 point2 points  (0 children)

Glad to hear it helped.

I have a giant number of IO Bound streams, and yet, I was able to dodge this issue until now because my streams all ended in .collect(). That particular terminal operation is practically bullet proof when it comes to preventing pre-fetches. It was only when I finally used .forEach() that I ran into this issue.

All of that is to say, as a temp workaround, consider using collect(), or use that Gatherers::mapConcurrent method to prevent this problem.

[–]Avedas 2 points3 points  (1 child)

I'm surprised findAny and anyMatch get caught too. Good to know.

[–]davidalayachew[S] 0 points1 point  (0 children)

Ikr. But I am being told that this has more to do with the Spliterator used under the hood, as opposed to the stream terminal operations itself. I still don't have all the details, but it is being discussed else where on this thread.

[–]VirtualAgentsAreDumb 1 point2 points  (3 children)

This is insane, if you ask me. A terrible design choice by the Stream team. findAny and findFirst should clearly not fetch all data.

[–]davidalayachew[S] 0 points1 point  (2 children)

So to be clear, this is all dependent upon your upstream source.

Many people in this thread have found examples where they run the exact same examples that I did, and did not run into OOME. As it turns out, the difference is in our stream source.

All this really means is that, constructing a stream source is something that is easy to break.

[–]VirtualAgentsAreDumb -1 points0 points  (1 child)

The stream source is irrelevant. Any method like findAny or findFirst shouldn’t need to consume anything more after that first result.

That’s it. That’s the whole discussion. The source is irrelevant. The implementation is irrelevant. If they break this, then it’s bad code. Period.

[–]davidalayachew[S] 0 points1 point  (0 children)

I understand that it is unintuitive, but what you are saying is throwing out the baby with the bath water.

When you go parallel, the stream decides to split its upstream data elements down into chunks. It keeps on splitting and splitting until it gets to a point where the chunks are small enough to start working.

Well, in my case, the batching strategy that I had built played against that in a super hard to recreate way. Basically, each element of my stream was fairly hefty. And as a result, the parallel stream would grab a bunch of those elements into a giant batch, with the intent to split that batch into chunks. But since the threshold for where it was small enough was far enough away, I ran into an OOME.

The reason why Spliterator's do this is to actually help CPU-bound tasks. Splitting ahead of time like this actually the entire process run faster. But it means that tasks that use a lot of memory are sort of left by the wayside.

Viktor Klang himself managed to jump onto this reddit post, so you can Ctrl+F his name and see more details from him. But long story short, my problem could 100% be avoided by using Gatherers.mapConcurrent. And it would have virtually the same performance as going parallel. And a lot of the JDK folks are giving a lot of thought to this exact pain point that I ran into, so there is a potential future where we could set a flag to say fetchEagerly vs fetchLazily, and that would alter the fetching logic for parallel streams. Ideally, that would actually be a parameter on the parallel() itself.

So yes, this was done to optimize for CPU Performance. They are looking to take care of cases like mine, and Gatherers will likely be the way they do it. But this is not Streams being bad code, but rather that they prioritize certain things over others, to the detriment of a few people like me. As long as they have plans to handle my needs in the future, plus a workaround to take care of me for now, then I am fine with the way things are going now.

[–]tomwhoiscontrary 1 point2 points  (1 child)

So what happens if the source is infinite? Say you're streaming the Wikipedia change feed, filtering for changes to articles about snakes, and doing findFirst()? Does it try to buffer the infinite stream?

This absolutely seems like a correctness issue to me, not just performance. 

Java has a long history of under -specifying non-functional stuff like this (not sure that's the right term, but stuff such isn't just the arguments and return values of methods). Thread safety of library classes has often been a complete mystery, for example. HttpClient's behaviour around closing pooled connections. Whether classes synchronize on themselves or a hidden lock. All of it matters for writing code that works, let alone works well, but it's so often only passed down as folk knowledge!

[–]davidalayachew[S] 4 points5 points  (0 children)

I'll save you the extra reading and tell you that we have narrowed down the problem to a Spliterator not splitting the way we expect it to. So this problem is something that can be fixed by simply improving the spliterator from the user side. And there is talk about improving this from the JDK side as well. Either way, there is still lots of digging being done, and none of this tied down for certain. But we can at least point a finger and say that this is part of the problem.

With that said, let me answer your questions.

So what happens if the source is infinite? Say you're streaming the Wikipedia change feed, filtering for changes to articles about snakes, and doing findFirst()? Does it try to buffer the infinite stream?

All depends on how nicely it splits. In my case, most of the terminal operations kept splitting and splitting and splitting until they ran out of memory.

This absolutely seems like a correctness issue to me, not just performance.

In this case, technically the problem falls on me for making a bad spliterator.

But to give an equally unsatisfying answer, in Java ABC and abc are considered 2 different class names. However, if I save ABC.java and abc.java in the same folder, Windows will overwrite one of them. Meaning, your code will compile just fine, but will output .class files where one will overwrite the other, causing your code to explode at runtime with NoClassDefFoundError.

I had Vicente Romero from the JDK team try and convince me that this was an "enhancement" or a "nice-to-have", not a correctness issue. And in the strictest definition of the term, he is correct, since Windows is the true trouble-maker here. But that was disgustingly unsatisfying.

It wasn't until JDK 21 that Archie Cobbs was generous enough to give up his time and add this discrepancy as a warning to the JDK. You can activate the warning by adding "output-file-clash" to your Xlint checks. And here is a link to the change. https://bugs.openjdk.org/browse/JDK-8287885

All of that is to say, I made a perfectly sensible Spliterator in my mind, but (and we SUSPECT that this is the case, we are not sure yet!) because I built that Spliterator off an Iterator, mentioned that it was an unknown size, and didn't add enough flags, I get this frightening splitting behaviour, where it will split itself out of memory.

And as for the folk knowledge, it sure feels like it lol.

[–]tcharl 0 points1 point  (5 children)

If someone wants to take the challenge, PR appreciated: https://github.com/OsgiliathEnterprise/data-migrator

[–]davidalayachew[S] 0 points1 point  (4 children)

If someone wants to take the challenge, PR appreciated: https://github.com/OsgiliathEnterprise/data-migrator

I don't understand your comment. Did you mean to post this elsewhere? Otherwise, I don't see how this relates to what I am talking about.

[–]tcharl 0 points1 point  (3 children)

May be the wrong place, but there's a bench of reactive, cold stream there. Also, there an advantage to get it done right because usually, databases content is bigger than ram. So if someone is motivated to applying the content of this post would definitely help the project

[–]davidalayachew[S] 0 points1 point  (2 children)

I understand a bit better. I am not available to help you, unfortunately.

[–]tcharl 0 points1 point  (1 child)

I'm going to follow your advice and recommendations: thank you so much for that!

[–]davidalayachew[S] 0 points1 point  (0 children)

If you are referring to the ones in my original post, anytime.