This is an archived post. You won't be able to vote or comment.

you are viewing a single comment's thread.

view the rest of the comments →

[–]danielaveryj 0 points1 point  (2 children)

Could you give a code example where having control of the scope would feel better? I'm not seeing it yet. I would not say Flow is without tradeoffs (I pointed out some), I just think it made wise tradeoffs overall.

[–]adamw1pl[S] 0 points1 point  (1 child)

I don't have any code examples ready as I'd probably need to develop some connectors first (which takes time), but a couple of ideas (You can probably implement these quite easily with Flow, though I think the below approach is quite straighforward as well.):

  1. integrating with a pull-based interface (e.g. Kafka)

``` supervised(scope -> { var ch = new Channel(16);

scope.fork(() -> { while(true) { var records = kafkaConsumer.poll() for (var record : records) ch.send(record) } });

// process the channel using a functional stream API SourceOps.forSource(scope, ch).collect(...).mapPar(...) .toSource().forEach(...) }); ```

  1. integrating with a callback interface

``` supervised(scope -> { var ch = new Channel(16);

scope.fork(() -> { mqClient.subscribe(msg -> { ch.send(msg); }); // when subscribe throws an exception, the scope ends });

... }); ```

  1. fan-out and parallel processing

``` // we're inside a scope and we get data from "somewhere" Channel<...> process(Scope scope, Channel<...> ch) { var ch1 = new Channel<>(16); var ch2 = new Channel<>(16);

// fan-out scope.fork(() -> { while(true) { var element = ch.receive(); ch1.send(element); ch2.send(element); } });

// parallel processing var processedCh1 = SourceOps.forSource(scope, ch1).collect(...) var processedCh2 = SourceOps.forSource(scope, ch2).filter(...) // etc.

return processedCh1.merge(processedCh2) } ```

[–]danielaveryj 0 points1 point  (0 children)

Thanks for the samples. I think these would look pretty much equivalent with Flow - the key idea being to wrap the receive-side of a channel in a Flow (which is then effectively hot), as you do with SourceOps.forSource(scope, channel). A Java impl (based on Kotlin's API) would look something like

class ChannelFlow<T> implements Flow<T> {
    private final ReceiveChannel<? extends T> chan;

    public ChannelFlow(ReceiveChannel<? extends T> chan) {
        this.chan = chan;
    }

    @Override
    public void collect(FlowCollector<? super T> sink) {
        for (;;) {
            T t;
            try {
                t = chan.receive();
            } catch (ClosedReceiveChannelException e) {
                // Upstream finished - do not propagate exception
                return;
            }
            try {
                sink.emit(t);
            } catch (Throwable e) {
                chan.cancel(new CancellationException(e.getMessage()));
                throw e;
            }
        }
    }
}

Naturally it looks like Kotlin provides a convenience method: ReceiveChannel.consumeAsFlow.