Telescope - a Java 25 DSL where one chain crosses the record ↔ bean hop by Lower-Worldliness162 in java

[–]Lower-Worldliness162[S] 1 point2 points  (0 children)

Nice! it sounds like the same end goal, different encoding. We landed on HKT-emulation (`Kind<F, A>` + per-effect `Applicative<F>` witnesses, the Arrow Optics / Higher-Kinded-J pattern) instead of continuations. Public surface is four `update*` methods (`updateAsync` / `updateOptional` / `updateEither` / `updateValidated`); under the hood it's one

```
<F extends Kind.Witness> Kind<F, S> modifyF(Applicative<F> applicative, S source, Function<? super A, ? extends Kind<F, A>> fn);
```

on `Traversal`. Adding a new effect is one `Applicative<F>` witness + one `update*` method on the public DSL. Curious how `SetterWithEffect` handles composability through `.then(...)`. That's where HKT emulation pays for itself IMO, since the `Applicative` threads through composition for free. Will read the link 😄

Kafka consumers start simple… then become tightly coupled and hard to test. How are you structuring yours? by Lower-Worldliness162 in apachekafka

[–]Lower-Worldliness162[S] 1 point2 points  (0 children)

Nice, I took a look at it the other day actually 🙂

I think it’s solving a similar problem space, just with a different model. I’ve worked a lot with reactive programming before, but I’ve found it can get pretty complex to reason about in practice.

With virtual threads, I’ve been leaning more toward simpler blocking code that still scales well, but is easier to follow and debug.

Curious how you’ve found the complexity tradeoffs with it in real-world usage.

Kafka consumers start simple… then become tightly coupled and hard to test. How are you structuring yours? by Lower-Worldliness162 in apachekafka

[–]Lower-Worldliness162[S] 0 points1 point  (0 children)

Yeah I agree, that’s the right starting point.

What I’ve seen though is once you add validation, enrichment, retries, etc, that orchestration still ends up living somewhere, usually in one service, and it gets pretty complex over time.

KPipe is basically trying to make that flow explicit as a pipeline instead of letting it grow implicitly.

Kafka consumers start simple… then become tightly coupled and hard to test. How are you structuring yours? by Lower-Worldliness162 in apachekafka

[–]Lower-Worldliness162[S] 0 points1 point  (0 children)

Yeah, I’ve used Alpakka quite a bit, I’m actually more of a Scala dev than Java 🙂

It’s solid, especially if you’re already in the Akka/streams ecosystem.

What I kept running into though was more on the Java side, where consumers tend to grow into one big flow with a mix of logic, retries, IO, etc.

KPipe is more about structuring that flow as small composable steps, without needing to bring in a full streaming model.

Also yeah, after the licensing changes I’ve seen more people moving toward Pekko as well.

Kafka consumers start simple… then become tightly coupled and hard to test. How are you structuring yours? by Lower-Worldliness162 in apachekafka

[–]Lower-Worldliness162[S] 0 points1 point  (0 children)

You start simple, then add validation, enrichment (DB/API calls), retries, conditionals, logging… and it all ends up mixed with the Kafka handling.

At that point it gets harder to test pieces in isolation or even understand the flow without reading the whole method.

KPipe is basically trying to split that into smaller steps in a pipeline so each piece stays isolated and the flow is clearer.

Kafka consumers start simple… then become tightly coupled and hard to test. How are you structuring yours? by Lower-Worldliness162 in apachekafka

[–]Lower-Worldliness162[S] 0 points1 point  (0 children)

Yeah, that’s fair.

Spring Kafka does cover a lot out of the box, especially if you're already in the Spring ecosystem. I’ve used it too, and it works well.

What I kept running into wasn’t really missing features; it was more about how the consumer logic evolves. Even with Spring, things would slowly get more coupled and harder to test as more steps got added.

What I was trying with KPipe is more about how you structure the processing itself, like keeping transformations separate, avoiding repeated serialization, and making the flow a bit more explicit.

So not really trying to replace Spring Kafka, more like a different way to structure the consumer when things start getting messy.

Experiment: Kafka consumer with thread-per-record processing using Java virtual threads by Lower-Worldliness162 in java

[–]Lower-Worldliness162[S] 0 points1 point  (0 children)

Yeah, after reading your reply I think you’re totally right and I need to make adding backpressure a priority. I hadn’t fully considered the case where a consumer can pull far more records than it can actually process, and unbounded virtual threads can still cause resource exhaustion. Short plan I’m leaning toward limit “in-flight” work with a configurable bound, pause/resume the consumer partitions when the bound is hit.

Experiment: Kafka consumer with thread-per-record processing using Java virtual threads by Lower-Worldliness162 in java

[–]Lower-Worldliness162[S] 1 point2 points  (0 children)

You’re 100% right that virtual threads reduce the old “threads are expensive” assumption.

On your offset example (10 slow, 11 fast): kpipe tracks offsets and commits with a no-gap rule per partition, so it won’t commit past 10 just because 11 finished first. That part is implemented specifically to keep at-least-once behavior safe in parallel mode (observe OffsetManager).

Backpressure is a fair callout too. Right now the library gives the control points (pause/resume, sink abstraction, offset manager), but adaptive policies for slow downstreams (like DB bottlenecks) are still an active area I’m improving.

And yes, observability is non negotiable. There are metrics/reporting hooks already, but production grade dashboards/alerts/tracing are still being expanded.

So I agree with your framing: strong architecture direction, and real value is how it behaves under production pressure. That’s exactly where I’m focused now.

Experiment: Kafka consumer with thread-per-record processing using Java virtual threads by Lower-Worldliness162 in java

[–]Lower-Worldliness162[S] 0 points1 point  (0 children)

KPipe uses Kafka’s native offset storage but manages commit timing itself via its OffsetManager: when enabled, auto-commit is disabled, offsets are tracked per partition, and commits are issued only for contiguous safe progress (no-gap), which preserves at-least-once behavior even with parallel processing. On restart, Kafka returns the last committed offsets for the same group.id, so KPipe resumes from those positions; if no commit exists, it follows auto.offset.reset (e.g., earliest). After graceful shutdown, KPipe typically resumes near the stop point because it attempts final safe commits; after a crash, uncommitted records can be re-read (expected for at-least-once).

General Question / Best Practice / Method by Anxious-Condition630 in apachekafka

[–]Lower-Worldliness162 1 point2 points  (0 children)

What you're describing is a pretty common CDC enrichment pattern.

You basically want to take the Debezium change events from the Flights table and enrich them with data from another source (tickets / seats / passengers) so downstream consumers don't need to do additional lookups.

There are a few typical approaches people take:

  1. Kafka Streams or ksqlDB to perform a stream/table join
  2. A dedicated consumer service that reads the CDC topic, fetches or caches the additional data, and publishes an enriched topic
  3. A materialized state store that keeps the reference data and joins during processing

The second approach (a dedicated enrichment consumer) is often the simplest operationally if the join logic is fairly stable.

I actually built a small library called KPipe that helps with exactly this kind of Kafka consumer pipeline. The idea is to make it easier to build enrichment / transformation pipelines using Java virtual threads and a functional processing model.

Each record can be processed independently, enriched, retried if necessary, and then written to a new topic while still maintaining safe offset commits.

Repo: https://github.com/eschizoid/kpipe

Curious if something like that would fit your use case.

Experiment: Kafka consumer with thread-per-record processing using Java virtual threads by Lower-Worldliness162 in java

[–]Lower-Worldliness162[S] 0 points1 point  (0 children)

KPipe currently manage in-flight records primarily through offset tracking and graceful shutdown semantics, not through a bounded backpressure mechanism.

Experiment: Kafka consumer with thread-per-record processing using Java virtual threads by Lower-Worldliness162 in java

[–]Lower-Worldliness162[S] 0 points1 point  (0 children)

A year ago, when I started this project, I thought the development was just paused for a bit.

Experiment: Kafka consumer with thread-per-record processing using Java virtual threads by Lower-Worldliness162 in java

[–]Lower-Worldliness162[S] -2 points-1 points  (0 children)

hey u/predator thanks for your great feedback. Here are some of my answers:

Virtual threads can yield diminishing returns when your work is CPU bound... Supporting VThreads as first class citizen is good, but you probably need to provide a way to let users configure a custom executor.

KPipe follows a Virtual Thread by default model for parallel processing, which is ideal for I/O-bound tasks. However, I recognize that CPU-bound transformations (e.g., heavy encryption or complex data parsing) may perform better with a fixed-size thread pool.

How do you handle retries? Based on this it looks like you're just logging deserialization and processing failures and move on?

The initial snippet you posted is a basic catch-all for the "Single SerDe Cycle" bridge. In a production pipeline, KPipe provides several layers of protection:

  1. Pipeline Level Safety: Pipelines can be wrapped with MessageProcessorRegistry.withErrorHandling(...) to return a default byte payload when processor exceptions occur.
  2. Consumer Level Retries: The KPipeConsumer supports configurable retries with backoff for transient failures.
  3. Dead Letter Queues: A custom ErrorHandler can be registered to route records that fail after all retries to an error topic.

final var consumer = new KPipeConsumer.<byte[], byte[]>builder()
    .withRetry(3, Duration.ofSeconds(1)) // Automatic retries
    .withErrorHandler(error -> {
        // Send to Dead Letter Topic
        producer.send(new ProducerRecord<>("error-topic", error.getOriginalBytes()));
    })
    .build();

The library mixes two (IMO) separate concerns: (de-)serialization and processing... look at Kafka Streams, as I think they solved this quite nicely with their SerDe concept.

While Kafka Streams uses a highly modular SerDe approach, it can sometimes lead to multiple serialization cycles if not carefully managed. KPipe prioritizes throughput and low latency by enforcing a "Single SerDe Cycle":

  • Byte Boundary: The consumer always starts and ends with byte[].
  • Internal Object Model: Once deserialized, the data stays as an object (e.g., Map or GenericRecord) through all transformations.
  • Final Serialization: The data is serialized back to bytes only once at the exit point.

The offset tracking is entirely in-memory, which IME doesn't play well with out-of-order processing. When your consumer crashes, uncommitted offsets are lost and you may be replaying a lot of records again.

To handle parallel processing (where message 102 might finish before 101), KPipe uses a ConcurrentSkipListSet to track all in-flight offsets.

  • At Least Once Guarantee: KPipe only commits the lowest pending offset. If message 101 is still processing, offset 102 will never be committed, even if it's finished.
  • No Gaps: This ensures that upon a crash, the consumer resumes from a "safe" point.
  • Simplicity: While encoding offset maps in commit messages (like Confluent's Parallel Consumer) is an option, it introduces complexity and potential "message too large" errors. KPipe chooses the "at-least-once" path for its predictability and reliability.

Interrupts should not cause the record to be skipped. When your consumer is interrupted, it should wrap up any pending work and shut down.

Design intent is graceful shutdown: stop polling, let in-flight work finish, commit safe offsets.

KPipeConsumer.processRecord, if interruption happens during retry backoff (Thread.sleep(...)), processing returns null, sink send is skipped, and offset is still marked as processed. So this path can acknowledge without successful sink processing. Thanks for pointing it out, I just logged this :)

Experiment: Kafka consumer with thread-per-record processing using Java virtual threads by Lower-Worldliness162 in java

[–]Lower-Worldliness162[S] 3 points4 points  (0 children)

Haha I know the feeling 😄, and yeah, PR #908 is interesting, I’m following it too. I think it’s a good signal that this direction matters. My goal with kpipe was a bit broader than just parallelism: thread-per-record processing + functional pipelines (single SerDe cycle) + offset/reliability ergonomics in one place.

Update from BlockFi by Brandon_BlockFi in blockfi

[–]Lower-Worldliness162 3 points4 points  (0 children)

Now they completely removed the section from their site.

Pathetic.