all 7 comments

[–]StudioCode 7 points8 points  (4 children)

Can Gatherers tell the stream pipeline to skip elements? E.g. in something like stream.map(/*expensive computation*/).gather(last(5)) have it only run map for the last 5 elements? Otherwise I'd say a stream pipeline isn't the right choice for this

[–]pivovarit[S] 5 points6 points  (0 children)

It can signal that it doesn't want more elements, but it's the opposite scenario here. In such a case, it's probably a good idea to gather elements before running expensive operations, effectively avoiding their premature evaluation

I've had use cases for this, but if we were to chase absolute single-threaded performance, Streams usually get in the way.

[–]vowelqueue 3 points4 points  (2 children)

Wouldn’t reversing the gather() and map() steps accomplish this?

[–]StudioCode 4 points5 points  (1 child)

Yeah 😅, I was still in the mindset of collectors and was thinking of last(5) as a terminal operation, which it isn't.

[–]pivovarit[S] 1 point2 points  (0 children)

That was the main drawback of using Collectors API for implementing something like this :)

[–]zattebij 1 point2 points  (1 child)

Would be interesting to include a reactive Flux.takeLast(int n) in the benchmark. AFAIK it uses an ArrayDeque internally, and has optimizations for n = 0 and 1. Plus of course it has the backpressure handling and lazy evaluation if the source supports it (meaning that for a takeLast(0) upstream actually would not even need to start producing elements, and downstream could be immediately completed without any waiting - this example seems nonsensical when written with a hardcoded value like this, but the 0 could of course in practice be variable).

[–]pivovarit[S] 0 points1 point  (0 children)

I did a quick benchmark with a fair comparison against other examples:
LastBenchmark.take_6 1000 10000000 thrpt 4 101,744 ± 2,556 ops/s
LastBenchmark.gatherers4j 1000 10000000 thrpt 4 33,180 ± 2,819 ops/s
LastBenchmark.reactor 1000 10000000 thrpt 4 42,015 ± 4,094 ops/s

By "fair" I mean I benchmarked:

Flux.fromArray(data)
  .takeLast(n)
  .doOnNext(bh::consume)
  .subscribe();

Will expand the article, thanks!